You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2023/01/28 03:16:46 UTC

[doris-spark-connector] branch master updated: [improvement] Adapt to the load format of csv (#65)

This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bb76ed  [improvement] Adapt to the load format of csv (#65)
0bb76ed is described below

commit 0bb76ed0480398583b2541596fece161ea402984
Author: Hong Liu <84...@qq.com>
AuthorDate: Sat Jan 28 11:16:40 2023 +0800

    [improvement] Adapt to the load format of csv (#65)
    
    Co-authored-by: smallhibiscus <844981280>
---
 .../org/apache/doris/spark/DorisStreamLoad.java    | 56 ++++++++++++++--------
 .../doris/spark/sql/DorisSourceProvider.scala      |  2 +-
 2 files changed, 36 insertions(+), 22 deletions(-)

diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 351ef23..3ada398 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -50,10 +50,10 @@ import java.util.concurrent.TimeUnit;
 /**
  * DorisStreamLoad
  **/
-public class DorisStreamLoad implements Serializable {
-    public static final String FIELD_DELIMITER = "\t";
-    public static final String LINE_DELIMITER = "\n";
-    public static final String NULL_VALUE = "\\N";
+public class DorisStreamLoad implements Serializable{
+    private String FIELD_DELIMITER;
+    private String LINE_DELIMITER;
+    private String NULL_VALUE = "\\N";
 
     private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
 
@@ -71,6 +71,7 @@ public class DorisStreamLoad implements Serializable {
     private Map<String, String> streamLoadProp;
     private static final long cacheExpireTimeout = 4 * 60;
     private LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
+    private String fileType;
 
     public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
         this.db = db;
@@ -114,6 +115,11 @@ public class DorisStreamLoad implements Serializable {
         cache = CacheBuilder.newBuilder()
                 .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
                 .build(new BackendCacheLoader(settings));
+        fileType = this.streamLoadProp.get("format") == null ? "csv" : this.streamLoadProp.get("format");
+        if (fileType.equals("csv")){
+            FIELD_DELIMITER = this.streamLoadProp.get("column_separator") == null ? "\t" : this.streamLoadProp.get("column_separator");
+            LINE_DELIMITER = this.streamLoadProp.get("line_delimiter") == null ? "\n" : this.streamLoadProp.get("line_delimiter");
+        }
     }
 
     public String getLoadUrlStr() {
@@ -156,8 +162,10 @@ public class DorisStreamLoad implements Serializable {
                 conn.addRequestProperty(k, v);
             });
         }
-        conn.addRequestProperty("format", "json");
-        conn.addRequestProperty("strip_outer_array", "true");
+        if (fileType.equals("json")){
+            conn.addRequestProperty("format", "json");
+            conn.addRequestProperty("strip_outer_array", "true");
+        }
         return conn;
     }
 
@@ -200,24 +208,30 @@ public class DorisStreamLoad implements Serializable {
 
 
     public void loadV2(List<List<Object>> rows) throws StreamLoadException, JsonProcessingException {
-        List<Map<Object, Object>> dataList = new ArrayList<>();
-        try {
-            for (List<Object> row : rows) {
-                Map<Object, Object> dataMap = new HashMap<>();
-                if (dfColumns.length == row.size()) {
-                    for (int i = 0; i < dfColumns.length; i++) {
-                        dataMap.put(dfColumns[i], row.get(i));
+        if (fileType.equals("csv")) {
+            load(listToString(rows));
+        } else if(fileType.equals("json")) {
+            List<Map<Object, Object>> dataList = new ArrayList<>();
+            try {
+                for (List<Object> row : rows) {
+                    Map<Object, Object> dataMap = new HashMap<>();
+                    if (dfColumns.length == row.size()) {
+                        for (int i = 0; i < dfColumns.length; i++) {
+                            dataMap.put(dfColumns[i], row.get(i));
+                        }
                     }
+                    dataList.add(dataMap);
                 }
-                dataList.add(dataMap);
+            } catch (Exception e) {
+                throw new StreamLoadException("The number of configured columns does not match the number of data columns.");
             }
-        } catch (Exception e) {
-            throw new StreamLoadException("The number of configured columns does not match the number of data columns.");
-        }
-        // splits large collections to normal collection to avoid the "Requested array size exceeds VM limit" exception
-        List<String> serializedList = ListUtils.getSerializedList(dataList);
-        for (String serializedRows : serializedList) {
-            load(serializedRows);
+            // splits large collections to normal collection to avoid the "Requested array size exceeds VM limit" exception
+            List<String> serializedList = ListUtils.getSerializedList(dataList);
+            for (String serializedRows : serializedList) {
+                load(serializedRows);
+            }
+        } else {
+            throw new StreamLoadException("Not supoort the file format in stream load.");
         }
     }
 
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 2922d63..0399acf 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -83,7 +83,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
           line.add(field.asInstanceOf[AnyRef])
         }
         rowsBuffer.add(line)
-        if (rowsBuffer.size > maxRowCount) {
+        if (rowsBuffer.size > maxRowCount - 1 ) {
           flush
         }
       })


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org