You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2022/10/31 01:49:26 UTC
[doris-spark-connector] branch master updated: spark-doris-connector supports stream load parameter (#52)
This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new d933102 spark-doris-connector supports stream load parameter (#52)
d933102 is described below
commit d9331029425ba6038845c407617e217bad708275
Author: caoliang-web <71...@users.noreply.github.com>
AuthorDate: Mon Oct 31 09:49:21 2022 +0800
spark-doris-connector supports stream load parameter (#52)
spark-doris-connector supports stream load parameter
---
.../org/apache/doris/spark/DorisStreamLoad.java | 32 ++++++++++++++++++++++
.../doris/spark/cfg/ConfigurationOptions.java | 2 ++
.../scala/org/apache/doris/spark/sql/Utils.scala | 8 +++++-
3 files changed, 41 insertions(+), 1 deletion(-)
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 68e513c..f375c76 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -45,6 +45,7 @@ import java.util.Map;
import java.util.Base64;
import java.util.HashMap;
import java.util.Calendar;
+import java.util.Properties;
/**
@@ -69,6 +70,7 @@ public class DorisStreamLoad implements Serializable{
private String columns;
private String[] dfColumns;
private String maxFilterRatio;
+ private Map<String,String> streamLoadProp;
public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
this.hostPort = hostPort;
@@ -93,6 +95,9 @@ public class DorisStreamLoad implements Serializable{
this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
+ this.streamLoadProp=getStreamLoadProp(settings);
+
+
}
@@ -112,6 +117,7 @@ public class DorisStreamLoad implements Serializable{
this.dfColumns = dfColumns;
this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
+ this.streamLoadProp=getStreamLoadProp(settings);
}
public String getLoadUrlStr() {
@@ -147,6 +153,20 @@ public class DorisStreamLoad implements Serializable{
conn.setDoOutput(true);
conn.setDoInput(true);
+ if(streamLoadProp != null ){
+ streamLoadProp.forEach((k,v) -> {
+ if(streamLoadProp.containsKey("format")){
+ return;
+ }
+ if(streamLoadProp.containsKey("strip_outer_array")) {
+ return;
+ }
+ if(streamLoadProp.containsKey("read_json_by_line")){
+ return;
+ }
+ conn.addRequestProperty(k, v);
+ });
+ }
conn.addRequestProperty("format", "json");
conn.addRequestProperty("strip_outer_array", "true");
return conn;
@@ -272,4 +292,16 @@ public class DorisStreamLoad implements Serializable{
}
}
}
+
+ public Map<String,String> getStreamLoadProp(SparkSettings sparkSettings){
+ Map<String,String> streamLoadPropMap = new HashMap<>();
+ Properties properties = sparkSettings.asProperties();
+ for (String key : properties.stringPropertyNames()) {
+ if( key.contains(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX)){
+ String subKey = key.substring(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX.length());
+ streamLoadPropMap.put(subKey,properties.getProperty(key));
+ }
+ }
+ return streamLoadPropMap;
+ }
}
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index e3c55d6..8cc4477 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -73,4 +73,6 @@ public interface ConfigurationOptions {
int SINK_MAX_RETRIES_DEFAULT = 1;
String DORIS_MAX_FILTER_RATIO = "doris.max.filter.ratio";
+
+ String STREAM_LOAD_PROP_PREFIX = "doris.sink.properties.";
}
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
index 6b66646..b0e2e15 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
@@ -103,7 +103,13 @@ private[sql] object Utils {
*/
def params(parameters: Map[String, String], logger: Logger) = {
// '.' seems to be problematic when specifying the options
- val dottedParams = parameters.map { case (k, v) => (k.replace('_', '.'), v)}
+ val dottedParams = parameters.map { case (k, v) =>
+ if (k.startsWith("sink.properties.")){
+ (k,v)
+ }else {
+ (k.replace('_', '.'), v)
+ }
+ }
val preferredTableIdentifier = dottedParams.get(ConfigurationOptions.DORIS_TABLE_IDENTIFIER)
.orElse(dottedParams.get(ConfigurationOptions.TABLE_IDENTIFIER))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org