You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2023/04/11 10:51:24 UTC

[doris-spark-connector] branch master updated: [feature] format timestamp type (#95)

This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f5e9dd  [feature] format timestamp type  (#95)
9f5e9dd is described below

commit 9f5e9dddf696fdaf27e1d3ac1d8514558eae566f
Author: gnehil <ad...@gmail.com>
AuthorDate: Tue Apr 11 18:51:17 2023 +0800

    [feature] format timestamp type  (#95)
    
    * [feature] Add the date format parameter to control the output of timestamp type
    * using timestamp toString instead of simple date format
    * remove unused options
---
 .../doris/spark/CachedDorisStreamLoadClient.java   | 15 ++---
 .../org/apache/doris/spark/DorisStreamLoad.java    | 67 +++++++---------------
 .../org/apache/doris/spark/util/ListUtils.java     |  8 ++-
 3 files changed, 33 insertions(+), 57 deletions(-)

diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
index 01cada4..1d89126 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
@@ -39,25 +39,20 @@ public class CachedDorisStreamLoadClient {
     static {
         dorisStreamLoadLoadingCache = CacheBuilder.newBuilder()
                 .expireAfterWrite(cacheExpireTimeout, TimeUnit.SECONDS)
-                .removalListener(new RemovalListener<Object, Object>() {
-                    @Override
-                    public void onRemoval(RemovalNotification<Object, Object> removalNotification) {
-                        //do nothing
-                    }
+                .removalListener(removalNotification -> {
+                    //do nothing
                 })
                 .build(
                         new CacheLoader<SparkSettings, DorisStreamLoad>() {
                             @Override
-                            public DorisStreamLoad load(SparkSettings sparkSettings) throws IOException, DorisException {
-                                DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(sparkSettings);
-                                return dorisStreamLoad;
+                            public DorisStreamLoad load(SparkSettings sparkSettings) {
+                                return new DorisStreamLoad(sparkSettings);
                             }
                         }
                 );
     }
 
     public static DorisStreamLoad getOrCreate(SparkSettings settings) throws ExecutionException {
-        DorisStreamLoad dorisStreamLoad = dorisStreamLoadLoadingCache.get(settings);
-        return dorisStreamLoad;
+        return dorisStreamLoadLoadingCache.get(settings);
     }
 }
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 522e791..6738c09 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -25,13 +25,11 @@ import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.spark.cfg.ConfigurationOptions;
 import org.apache.doris.spark.cfg.SparkSettings;
-import org.apache.doris.spark.exception.DorisException;
 import org.apache.doris.spark.exception.StreamLoadException;
 import org.apache.doris.spark.rest.RestService;
 import org.apache.doris.spark.rest.models.BackendV2;
 import org.apache.doris.spark.rest.models.RespContent;
 import org.apache.doris.spark.util.ListUtils;
-import org.apache.http.HttpEntity;
 import org.apache.http.HttpHeaders;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
@@ -47,6 +45,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -59,7 +60,7 @@ import java.util.stream.Collectors;
 public class DorisStreamLoad implements Serializable {
     private String FIELD_DELIMITER;
     private String LINE_DELIMITER;
-    private String NULL_VALUE = "\\N";
+    private static final String NULL_VALUE = "\\N";
 
     private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
 
@@ -76,19 +77,11 @@ public class DorisStreamLoad implements Serializable {
     private String maxFilterRatio;
     private Map<String, String> streamLoadProp;
     private static final long cacheExpireTimeout = 4 * 60;
-    private LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
-    private String fileType;
+    private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
+    private final String fileType;
+    private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
 
-    public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
-        this.db = db;
-        this.tbl = tbl;
-        this.user = user;
-        this.passwd = passwd;
-        this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
-        this.authEncoded = getAuthEncoded(user, passwd);
-    }
-
-    public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException {
+    public DorisStreamLoad(SparkSettings settings) {
         String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
         this.db = dbTable[0];
         this.tbl = dbTable[1];
@@ -96,41 +89,21 @@ public class DorisStreamLoad implements Serializable {
         this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
         this.authEncoded = getAuthEncoded(user, passwd);
         this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
-
         this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
         this.streamLoadProp = getStreamLoadProp(settings);
         cache = CacheBuilder.newBuilder()
                 .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
                 .build(new BackendCacheLoader(settings));
-        fileType = this.streamLoadProp.get("format") == null ? "csv" : this.streamLoadProp.get("format");
-        if (fileType.equals("csv")){
-            FIELD_DELIMITER = this.streamLoadProp.get("column_separator") == null ? "\t" : this.streamLoadProp.get("column_separator");
-            LINE_DELIMITER = this.streamLoadProp.get("line_delimiter") == null ? "\n" : this.streamLoadProp.get("line_delimiter");
+        fileType = streamLoadProp.getOrDefault("format", "csv");
+        if ("csv".equals(fileType)){
+            FIELD_DELIMITER = streamLoadProp.getOrDefault("column_separator", "\t");
+            LINE_DELIMITER = streamLoadProp.getOrDefault("line_delimiter", "\n");
         }
     }
 
-    public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws IOException, DorisException {
-        String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
-        this.db = dbTable[0];
-        this.tbl = dbTable[1];
-        this.user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
-        this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
-
-
-        this.authEncoded = getAuthEncoded(user, passwd);
-        this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
+    public DorisStreamLoad(SparkSettings settings, String[] dfColumns) {
+        this(settings);
         this.dfColumns = dfColumns;
-
-        this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
-        this.streamLoadProp = getStreamLoadProp(settings);
-        cache = CacheBuilder.newBuilder()
-                .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
-                .build(new BackendCacheLoader(settings));
-        fileType = this.streamLoadProp.get("format") == null ? "csv" : this.streamLoadProp.get("format");
-        if ("csv".equals(fileType)) {
-            FIELD_DELIMITER = this.streamLoadProp.get("column_separator") == null ? "\t" : this.streamLoadProp.get("column_separator");
-            LINE_DELIMITER = this.streamLoadProp.get("line_delimiter") == null ? "\n" : this.streamLoadProp.get("line_delimiter");
-        }
     }
 
     public String getLoadUrlStr() {
@@ -189,9 +162,8 @@ public class DorisStreamLoad implements Serializable {
 
     public String listToString(List<List<Object>> rows) {
         return rows.stream().map(row ->
-                row.stream().map(field ->
-                        (field == null) ? NULL_VALUE : field.toString()
-                ).collect(Collectors.joining(FIELD_DELIMITER))
+                row.stream().map(field -> field == null ? NULL_VALUE : field.toString())
+                        .collect(Collectors.joining(FIELD_DELIMITER))
         ).collect(Collectors.joining(LINE_DELIMITER));
     }
 
@@ -206,7 +178,12 @@ public class DorisStreamLoad implements Serializable {
                     Map<Object, Object> dataMap = new HashMap<>();
                     if (dfColumns.length == row.size()) {
                         for (int i = 0; i < dfColumns.length; i++) {
-                            dataMap.put(dfColumns[i], row.get(i));
+                            Object col = row.get(i);
+                            if (col instanceof Timestamp) {
+                                dataMap.put(dfColumns[i], col.toString());
+                                continue;
+                            }
+                            dataMap.put(dfColumns[i], col);
                         }
                     }
                     dataList.add(dataMap);
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java
index 43f5b77..46a37ff 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java
@@ -20,9 +20,11 @@ package org.apache.doris.spark.util;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -30,6 +32,8 @@ import java.util.Map;
 public class ListUtils {
     private static final Logger LOG = LoggerFactory.getLogger(ListUtils.class);
 
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
     public static List<String> getSerializedList(List<Map<Object, Object>> batch) throws JsonProcessingException {
         List<String> result = new ArrayList<>();
         divideAndSerialize(batch, result);
@@ -43,7 +47,7 @@ public class ListUtils {
      * @throws JsonProcessingException
      */
     public static void divideAndSerialize(List<Map<Object, Object>> batch, List<String> result) throws JsonProcessingException {
-        String serializedResult = (new ObjectMapper()).writeValueAsString(batch);
+        String serializedResult = MAPPER.writeValueAsString(batch);
         // if an error occurred in the batch call to getBytes ,average divide the batch
         try {
             //the "Requested array size exceeds VM limit" exception occurs when the collection is large
@@ -51,7 +55,7 @@ public class ListUtils {
             result.add(serializedResult);
             return;
         } catch (Throwable error) {
-            LOG.error("getBytes error:{} ,average divide the collection", error);
+            LOG.error("getBytes error:{} ,average divide the collection", ExceptionUtils.getStackTrace(error));
         }
         for (List<Map<Object, Object>> avgSubCollection : getAvgSubCollections(batch)) {
             divideAndSerialize(avgSubCollection, result);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org