You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2022/10/31 01:49:26 UTC

[doris-spark-connector] branch master updated: spark-doris-connector supports stream load parameter (#52)

This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new d933102  spark-doris-connector supports stream load parameter (#52)
d933102 is described below

commit d9331029425ba6038845c407617e217bad708275
Author: caoliang-web <71...@users.noreply.github.com>
AuthorDate: Mon Oct 31 09:49:21 2022 +0800

    spark-doris-connector supports stream load parameter (#52)
    
    spark-doris-connector supports stream load parameter
---
 .../org/apache/doris/spark/DorisStreamLoad.java    | 32 ++++++++++++++++++++++
 .../doris/spark/cfg/ConfigurationOptions.java      |  2 ++
 .../scala/org/apache/doris/spark/sql/Utils.scala   |  8 +++++-
 3 files changed, 41 insertions(+), 1 deletion(-)

diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 68e513c..f375c76 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -45,6 +45,7 @@ import java.util.Map;
 import java.util.Base64;
 import java.util.HashMap;
 import java.util.Calendar;
+import java.util.Properties;
 
 
 /**
@@ -69,6 +70,7 @@ public class DorisStreamLoad implements Serializable{
     private String columns;
     private String[] dfColumns;
     private String maxFilterRatio;
+    private Map<String,String> streamLoadProp;
 
     public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
         this.hostPort = hostPort;
@@ -93,6 +95,9 @@ public class DorisStreamLoad implements Serializable{
         this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
 
         this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
+        this.streamLoadProp=getStreamLoadProp(settings);
+
+
 
     }
 
@@ -112,6 +117,7 @@ public class DorisStreamLoad implements Serializable{
         this.dfColumns = dfColumns;
 
         this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
+        this.streamLoadProp=getStreamLoadProp(settings);
     }
 
     public String getLoadUrlStr() {
@@ -147,6 +153,20 @@ public class DorisStreamLoad implements Serializable{
 
         conn.setDoOutput(true);
         conn.setDoInput(true);
+        if(streamLoadProp != null ){
+            streamLoadProp.forEach((k,v) -> {
+                if(streamLoadProp.containsKey("format")){
+                    return;
+                }
+                if(streamLoadProp.containsKey("strip_outer_array")) {
+                    return;
+                }
+                if(streamLoadProp.containsKey("read_json_by_line")){
+                    return;
+                }
+                conn.addRequestProperty(k, v);
+            });
+        }
         conn.addRequestProperty("format", "json");
         conn.addRequestProperty("strip_outer_array", "true");
         return conn;
@@ -272,4 +292,16 @@ public class DorisStreamLoad implements Serializable{
             }
         }
     }
+
+    public Map<String,String> getStreamLoadProp(SparkSettings sparkSettings){
+        Map<String,String> streamLoadPropMap = new HashMap<>();
+        Properties properties = sparkSettings.asProperties();
+        for (String key : properties.stringPropertyNames()) {
+            if( key.contains(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX)){
+                String subKey = key.substring(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX.length());
+                streamLoadPropMap.put(subKey,properties.getProperty(key));
+            }
+        }
+        return streamLoadPropMap;
+    }
 }
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index e3c55d6..8cc4477 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -73,4 +73,6 @@ public interface ConfigurationOptions {
     int SINK_MAX_RETRIES_DEFAULT = 1;
 
     String DORIS_MAX_FILTER_RATIO = "doris.max.filter.ratio";
+
+    String STREAM_LOAD_PROP_PREFIX = "doris.sink.properties.";
 }
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
index 6b66646..b0e2e15 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
@@ -103,7 +103,13 @@ private[sql] object Utils {
    */
   def params(parameters: Map[String, String], logger: Logger) = {
     // '.' seems to be problematic when specifying the options
-    val dottedParams = parameters.map { case (k, v) => (k.replace('_', '.'), v)}
+    val dottedParams = parameters.map { case (k, v) =>
+      if (k.startsWith("sink.properties.")){
+        (k,v)
+      }else {
+        (k.replace('_', '.'), v)
+      }
+    }
 
     val preferredTableIdentifier = dottedParams.get(ConfigurationOptions.DORIS_TABLE_IDENTIFIER)
       .orElse(dottedParams.get(ConfigurationOptions.TABLE_IDENTIFIER))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org