You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ca...@apache.org on 2022/07/09 10:07:24 UTC

[doris-spark-connector] branch master updated: [Enhancement] Add param max_filter_ratio for sink of spark connector (#41)

This is an automated email from the ASF dual-hosted git repository.

caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 41b612d  [Enhancement] Add param max_filter_ratio for sink of spark connector (#41)
41b612d is described below

commit 41b612d9d5fb359f6ef55c092b8f6479973fd1d3
Author: zhenhb <89...@qq.com>
AuthorDate: Sat Jul 9 18:07:20 2022 +0800

    [Enhancement] Add param max_filter_ratio for sink of spark connector (#41)
---
 .../main/java/org/apache/doris/spark/DorisStreamLoad.java   | 13 +++++++++++++
 .../org/apache/doris/spark/cfg/ConfigurationOptions.java    |  2 ++
 2 files changed, 15 insertions(+)

diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index db3ef7d..be3c938 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -68,6 +68,7 @@ public class DorisStreamLoad implements Serializable{
     private String authEncoding;
     private String columns;
     private String[] dfColumns;
+    private String maxFilterRatio;
 
     public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
         this.hostPort = hostPort;
@@ -90,6 +91,9 @@ public class DorisStreamLoad implements Serializable{
         this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
         this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
         this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
+
+        this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
+
     }
 
     public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws IOException, DorisException {
@@ -100,10 +104,14 @@ public class DorisStreamLoad implements Serializable{
         this.tbl = dbTable[1];
         this.user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
         this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
+
+
         this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
         this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
         this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
         this.dfColumns = dfColumns;
+
+        this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
     }
 
     public String getLoadUrlStr() {
@@ -132,6 +140,11 @@ public class DorisStreamLoad implements Serializable{
         if (columns != null && !columns.equals("")) {
             conn.addRequestProperty("columns", columns);
         }
+
+        if (maxFilterRatio != null && !maxFilterRatio.equals("")) {
+            conn.addRequestProperty("max_filter_ratio", maxFilterRatio);
+        }
+
         conn.setDoOutput(true);
         conn.setDoInput(true);
         conn.addRequestProperty("format", "json");
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 9a0cead..e3c55d6 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -71,4 +71,6 @@ public interface ConfigurationOptions {
 
     String DORIS_SINK_MAX_RETRIES = "doris.sink.max-retries";
     int SINK_MAX_RETRIES_DEFAULT = 1;
+
+    String DORIS_MAX_FILTER_RATIO = "doris.max.filter.ratio";
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org