You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2022/10/17 02:11:00 UTC
[doris-flink-connector] branch master updated: [Feature]Support 'sink.parallelism' configuration (#72)
This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 7bba8aa [Feature]Support 'sink.parallelism' configuration (#72)
7bba8aa is described below
commit 7bba8aa4e69650ff61752c06a8aadaa26b0f12ad
Author: huyuanfeng2018 <73...@qq.com>
AuthorDate: Mon Oct 17 10:10:54 2022 +0800
[Feature]Support 'sink.parallelism' configuration (#72)
Co-authored-by: huyuanfeng <hu...@huya.com>
---
.../org/apache/doris/flink/catalog/DorisCatalogFactory.java | 2 ++
.../org/apache/doris/flink/table/DorisConfigOptions.java | 3 +++
.../apache/doris/flink/table/DorisDynamicTableFactory.java | 7 ++++++-
.../org/apache/doris/flink/table/DorisDynamicTableSink.java | 13 ++++++++-----
4 files changed, 19 insertions(+), 6 deletions(-)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
index f23f0dc..9a80370 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
@@ -48,6 +48,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM;
import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API;
import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX;
import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
@@ -102,6 +103,7 @@ public class DorisCatalogFactory implements CatalogFactory {
options.add(SINK_LABEL_PREFIX);
options.add(SINK_BUFFER_SIZE);
options.add(SINK_BUFFER_COUNT);
+ options.add(SINK_PARALLELISM);
options.add(SOURCE_USE_OLD_API);
return options;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 02a4d22..391df7d 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.factories.FactoryUtil;
import java.time.Duration;
@@ -163,6 +164,8 @@ public class DorisConfigOptions {
.defaultValue(true)
.withDescription("whether to enable the delete function");
+ public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;
+
// Prefix for Doris StreamLoad specific properties.
public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 5a11605..521724b 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -60,6 +60,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM;
import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API;
import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX;
import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
@@ -117,6 +118,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
options.add(SINK_LABEL_PREFIX);
options.add(SINK_BUFFER_SIZE);
options.add(SINK_BUFFER_COUNT);
+ options.add(SINK_PARALLELISM);
options.add(SOURCE_USE_OLD_API);
return options;
@@ -213,6 +215,8 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
// validate all options
helper.validateExcept(STREAM_LOAD_PROP_PREFIX);
+ // sink parallelism
+ final Integer parallelism = context.getConfiguration().getOptional(SINK_PARALLELISM).orElse(null);
Properties streamLoadProp = getStreamLoadProp(context.getCatalogTable().getOptions());
TableSchema physicalSchema =
@@ -222,7 +226,8 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
getDorisExecutionOptions(helper.getOptions(), streamLoadProp),
- physicalSchema
+ physicalSchema,
+ parallelism
);
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index 572e09c..2bba98b 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -14,6 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+
package org.apache.doris.flink.table;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -21,15 +22,14 @@ import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.DorisSink;
-
import org.apache.doris.flink.sink.writer.RowDataSerializer;
+
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
-
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,15 +54,18 @@ public class DorisDynamicTableSink implements DynamicTableSink {
private final DorisReadOptions readOptions;
private final DorisExecutionOptions executionOptions;
private final TableSchema tableSchema;
+ private final Integer sinkParallelism;
public DorisDynamicTableSink(DorisOptions options,
DorisReadOptions readOptions,
DorisExecutionOptions executionOptions,
- TableSchema tableSchema) {
+ TableSchema tableSchema,
+ Integer sinkParallelism) {
this.options = options;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
this.tableSchema = tableSchema;
+ this.sinkParallelism = sinkParallelism;
}
@Override
@@ -99,12 +102,12 @@ public class DorisDynamicTableSink implements DynamicTableSink {
.setDorisReadOptions(readOptions)
.setDorisExecutionOptions(executionOptions)
.setSerializer(serializerBuilder.build());
- return SinkProvider.of(dorisSinkBuilder.build());
+ return SinkProvider.of(dorisSinkBuilder.build(), sinkParallelism);
}
@Override
public DynamicTableSink copy() {
- return new DorisDynamicTableSink(options, readOptions, executionOptions, tableSchema);
+ return new DorisDynamicTableSink(options, readOptions, executionOptions, tableSchema, sinkParallelism);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org