You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2022/10/17 02:11:00 UTC

[doris-flink-connector] branch master updated: [Feature]Support 'sink.parallelism' configuration (#72)

This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bba8aa  [Feature]Support 'sink.parallelism' configuration (#72)
7bba8aa is described below

commit 7bba8aa4e69650ff61752c06a8aadaa26b0f12ad
Author: huyuanfeng2018 <73...@qq.com>
AuthorDate: Mon Oct 17 10:10:54 2022 +0800

    [Feature]Support 'sink.parallelism' configuration (#72)
    
    Co-authored-by: huyuanfeng <hu...@huya.com>
---
 .../org/apache/doris/flink/catalog/DorisCatalogFactory.java |  2 ++
 .../org/apache/doris/flink/table/DorisConfigOptions.java    |  3 +++
 .../apache/doris/flink/table/DorisDynamicTableFactory.java  |  7 ++++++-
 .../org/apache/doris/flink/table/DorisDynamicTableSink.java | 13 ++++++++-----
 4 files changed, 19 insertions(+), 6 deletions(-)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
index f23f0dc..9a80370 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
@@ -48,6 +48,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM;
 import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API;
 import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX;
 import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
@@ -102,6 +103,7 @@ public class DorisCatalogFactory implements CatalogFactory {
         options.add(SINK_LABEL_PREFIX);
         options.add(SINK_BUFFER_SIZE);
         options.add(SINK_BUFFER_COUNT);
+        options.add(SINK_PARALLELISM);
 
         options.add(SOURCE_USE_OLD_API);
         return options;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 02a4d22..391df7d 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.table;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.factories.FactoryUtil;
 
 import java.time.Duration;
 
@@ -163,6 +164,8 @@ public class DorisConfigOptions {
             .defaultValue(true)
             .withDescription("whether to enable the delete function");
 
+    public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;
+
     // Prefix for Doris StreamLoad specific properties.
     public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 5a11605..521724b 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -60,6 +60,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM;
 import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API;
 import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX;
 import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
@@ -117,6 +118,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
         options.add(SINK_LABEL_PREFIX);
         options.add(SINK_BUFFER_SIZE);
         options.add(SINK_BUFFER_COUNT);
+        options.add(SINK_PARALLELISM);
 
         options.add(SOURCE_USE_OLD_API);
         return options;
@@ -213,6 +215,8 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
 
         // validate all options
         helper.validateExcept(STREAM_LOAD_PROP_PREFIX);
+        // sink parallelism
+        final Integer parallelism = context.getConfiguration().getOptional(SINK_PARALLELISM).orElse(null);
 
         Properties streamLoadProp = getStreamLoadProp(context.getCatalogTable().getOptions());
         TableSchema physicalSchema =
@@ -222,7 +226,8 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
                 getDorisOptions(helper.getOptions()),
                 getDorisReadOptions(helper.getOptions()),
                 getDorisExecutionOptions(helper.getOptions(), streamLoadProp),
-                physicalSchema
+                physicalSchema,
+                parallelism
         );
     }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index 572e09c..2bba98b 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -14,6 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+
 package org.apache.doris.flink.table;
 
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -21,15 +22,14 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.sink.DorisSink;
-
 import org.apache.doris.flink.sink.writer.RowDataSerializer;
+
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkProvider;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.RowKind;
-
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,15 +54,18 @@ public class DorisDynamicTableSink implements DynamicTableSink {
     private final DorisReadOptions readOptions;
     private final DorisExecutionOptions executionOptions;
     private final TableSchema tableSchema;
+    private final Integer sinkParallelism;
 
     public DorisDynamicTableSink(DorisOptions options,
                                  DorisReadOptions readOptions,
                                  DorisExecutionOptions executionOptions,
-                                 TableSchema tableSchema) {
+                                 TableSchema tableSchema,
+                                 Integer sinkParallelism) {
         this.options = options;
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
         this.tableSchema = tableSchema;
+        this.sinkParallelism = sinkParallelism;
     }
 
     @Override
@@ -99,12 +102,12 @@ public class DorisDynamicTableSink implements DynamicTableSink {
                 .setDorisReadOptions(readOptions)
                 .setDorisExecutionOptions(executionOptions)
                 .setSerializer(serializerBuilder.build());
-        return SinkProvider.of(dorisSinkBuilder.build());
+        return SinkProvider.of(dorisSinkBuilder.build(), sinkParallelism);
     }
 
     @Override
     public DynamicTableSink copy() {
-        return new DorisDynamicTableSink(options, readOptions, executionOptions, tableSchema);
+        return new DorisDynamicTableSink(options, readOptions, executionOptions, tableSchema, sinkParallelism);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org