You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2023/04/11 10:51:24 UTC
[doris-spark-connector] branch master updated: [feature] format timestamp type (#95)
This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 9f5e9dd [feature] format timestamp type (#95)
9f5e9dd is described below
commit 9f5e9dddf696fdaf27e1d3ac1d8514558eae566f
Author: gnehil <ad...@gmail.com>
AuthorDate: Tue Apr 11 18:51:17 2023 +0800
[feature] format timestamp type (#95)
* [feature] Add the date format parameter to control the output of timestamp type
* using timestamp toString instead of simple date format
* remove unused options
---
.../doris/spark/CachedDorisStreamLoadClient.java | 15 ++---
.../org/apache/doris/spark/DorisStreamLoad.java | 67 +++++++---------------
.../org/apache/doris/spark/util/ListUtils.java | 8 ++-
3 files changed, 33 insertions(+), 57 deletions(-)
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
index 01cada4..1d89126 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
@@ -39,25 +39,20 @@ public class CachedDorisStreamLoadClient {
static {
dorisStreamLoadLoadingCache = CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireTimeout, TimeUnit.SECONDS)
- .removalListener(new RemovalListener<Object, Object>() {
- @Override
- public void onRemoval(RemovalNotification<Object, Object> removalNotification) {
- //do nothing
- }
+ .removalListener(removalNotification -> {
+ //do nothing
})
.build(
new CacheLoader<SparkSettings, DorisStreamLoad>() {
@Override
- public DorisStreamLoad load(SparkSettings sparkSettings) throws IOException, DorisException {
- DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(sparkSettings);
- return dorisStreamLoad;
+ public DorisStreamLoad load(SparkSettings sparkSettings) {
+ return new DorisStreamLoad(sparkSettings);
}
}
);
}
public static DorisStreamLoad getOrCreate(SparkSettings settings) throws ExecutionException {
- DorisStreamLoad dorisStreamLoad = dorisStreamLoadLoadingCache.get(settings);
- return dorisStreamLoad;
+ return dorisStreamLoadLoadingCache.get(settings);
}
}
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 522e791..6738c09 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -25,13 +25,11 @@ import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.SparkSettings;
-import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.StreamLoadException;
import org.apache.doris.spark.rest.RestService;
import org.apache.doris.spark.rest.models.BackendV2;
import org.apache.doris.spark.rest.models.RespContent;
import org.apache.doris.spark.util.ListUtils;
-import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
@@ -47,6 +45,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -59,7 +60,7 @@ import java.util.stream.Collectors;
public class DorisStreamLoad implements Serializable {
private String FIELD_DELIMITER;
private String LINE_DELIMITER;
- private String NULL_VALUE = "\\N";
+ private static final String NULL_VALUE = "\\N";
private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
@@ -76,19 +77,11 @@ public class DorisStreamLoad implements Serializable {
private String maxFilterRatio;
private Map<String, String> streamLoadProp;
private static final long cacheExpireTimeout = 4 * 60;
- private LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
- private String fileType;
+ private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
+ private final String fileType;
+ private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
- public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
- this.db = db;
- this.tbl = tbl;
- this.user = user;
- this.passwd = passwd;
- this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
- this.authEncoded = getAuthEncoded(user, passwd);
- }
-
- public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException {
+ public DorisStreamLoad(SparkSettings settings) {
String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
this.db = dbTable[0];
this.tbl = dbTable[1];
@@ -96,41 +89,21 @@ public class DorisStreamLoad implements Serializable {
this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
this.authEncoded = getAuthEncoded(user, passwd);
this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
-
this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
this.streamLoadProp = getStreamLoadProp(settings);
cache = CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
.build(new BackendCacheLoader(settings));
- fileType = this.streamLoadProp.get("format") == null ? "csv" : this.streamLoadProp.get("format");
- if (fileType.equals("csv")){
- FIELD_DELIMITER = this.streamLoadProp.get("column_separator") == null ? "\t" : this.streamLoadProp.get("column_separator");
- LINE_DELIMITER = this.streamLoadProp.get("line_delimiter") == null ? "\n" : this.streamLoadProp.get("line_delimiter");
+ fileType = streamLoadProp.getOrDefault("format", "csv");
+ if ("csv".equals(fileType)){
+ FIELD_DELIMITER = streamLoadProp.getOrDefault("column_separator", "\t");
+ LINE_DELIMITER = streamLoadProp.getOrDefault("line_delimiter", "\n");
}
}
- public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws IOException, DorisException {
- String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
- this.db = dbTable[0];
- this.tbl = dbTable[1];
- this.user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
- this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
-
-
- this.authEncoded = getAuthEncoded(user, passwd);
- this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
+ public DorisStreamLoad(SparkSettings settings, String[] dfColumns) {
+ this(settings);
this.dfColumns = dfColumns;
-
- this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
- this.streamLoadProp = getStreamLoadProp(settings);
- cache = CacheBuilder.newBuilder()
- .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
- .build(new BackendCacheLoader(settings));
- fileType = this.streamLoadProp.get("format") == null ? "csv" : this.streamLoadProp.get("format");
- if ("csv".equals(fileType)) {
- FIELD_DELIMITER = this.streamLoadProp.get("column_separator") == null ? "\t" : this.streamLoadProp.get("column_separator");
- LINE_DELIMITER = this.streamLoadProp.get("line_delimiter") == null ? "\n" : this.streamLoadProp.get("line_delimiter");
- }
}
public String getLoadUrlStr() {
@@ -189,9 +162,8 @@ public class DorisStreamLoad implements Serializable {
public String listToString(List<List<Object>> rows) {
return rows.stream().map(row ->
- row.stream().map(field ->
- (field == null) ? NULL_VALUE : field.toString()
- ).collect(Collectors.joining(FIELD_DELIMITER))
+ row.stream().map(field -> field == null ? NULL_VALUE : field.toString())
+ .collect(Collectors.joining(FIELD_DELIMITER))
).collect(Collectors.joining(LINE_DELIMITER));
}
@@ -206,7 +178,12 @@ public class DorisStreamLoad implements Serializable {
Map<Object, Object> dataMap = new HashMap<>();
if (dfColumns.length == row.size()) {
for (int i = 0; i < dfColumns.length; i++) {
- dataMap.put(dfColumns[i], row.get(i));
+ Object col = row.get(i);
+ if (col instanceof Timestamp) {
+ dataMap.put(dfColumns[i], col.toString());
+ continue;
+ }
+ dataMap.put(dfColumns[i], col);
}
}
dataList.add(dataMap);
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java
index 43f5b77..46a37ff 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java
@@ -20,9 +20,11 @@ package org.apache.doris.spark.util;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -30,6 +32,8 @@ import java.util.Map;
public class ListUtils {
private static final Logger LOG = LoggerFactory.getLogger(ListUtils.class);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
public static List<String> getSerializedList(List<Map<Object, Object>> batch) throws JsonProcessingException {
List<String> result = new ArrayList<>();
divideAndSerialize(batch, result);
@@ -43,7 +47,7 @@ public class ListUtils {
* @throws JsonProcessingException
*/
public static void divideAndSerialize(List<Map<Object, Object>> batch, List<String> result) throws JsonProcessingException {
- String serializedResult = (new ObjectMapper()).writeValueAsString(batch);
+ String serializedResult = MAPPER.writeValueAsString(batch);
// if an error occurred in the batch call to getBytes ,average divide the batch
try {
//the "Requested array size exceeds VM limit" exception occurs when the collection is large
@@ -51,7 +55,7 @@ public class ListUtils {
result.add(serializedResult);
return;
} catch (Throwable error) {
- LOG.error("getBytes error:{} ,average divide the collection", error);
+ LOG.error("getBytes error:{} ,average divide the collection", ExceptionUtils.getStackTrace(error));
}
for (List<Map<Object, Object>> avgSubCollection : getAvgSubCollections(batch)) {
divideAndSerialize(avgSubCollection, result);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org