You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/09/24 13:38:43 UTC

[incubator-doris] branch master updated: [Flink] Fix bug of flink doris connector (#6655)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 68529d2  [Flink] Fix bug of flink doris connector (#6655)
68529d2 is described below

commit 68529d20f3e6827ab424bc3d88fb3826e3a7d308
Author: xhmz <ya...@163.com>
AuthorDate: Fri Sep 24 21:38:35 2021 +0800

    [Flink] Fix bug of flink doris connector (#6655)
    
    Flink-Doris-Connector do not support flink 1.13, refactor doris sink forma
    to not use GenericRowData. But to use RowData::FieldGetter.
---
 .../flink/table/DorisDynamicOutputFormat.java      |  45 ++++--
 .../flink/table/DorisDynamicTableFactory.java      | 165 ++++++++++-----------
 .../doris/flink/table/DorisDynamicTableSink.java   |  28 ++--
 3 files changed, 127 insertions(+), 111 deletions(-)

diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 77b53ba..6ee8834 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -25,22 +25,24 @@ import org.apache.doris.flink.rest.RestService;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
-import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
-import java.util.Properties;
 import java.util.StringJoiner;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
 
 /**
  * DorisDynamicOutputFormat
@@ -69,12 +71,18 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
     private transient ScheduledFuture<?> scheduledFuture;
     private transient volatile Exception flushException;
 
-    public DorisDynamicOutputFormat(DorisOptions option, DorisReadOptions readOptions, DorisExecutionOptions executionOptions) {
+    private final RowData.FieldGetter[] fieldGetters;
+
+    public DorisDynamicOutputFormat(DorisOptions option, DorisReadOptions readOptions, DorisExecutionOptions executionOptions, LogicalType[] logicalTypes) {
         this.options = option;
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
         this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT);
         this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
+        this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
+        for (int i = 0; i < logicalTypes.length; i++) {
+            fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
+        }
     }
 
     @Override
@@ -84,12 +92,12 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
     @Override
     public void open(int taskNumber, int numTasks) throws IOException {
         dorisStreamLoad = new DorisStreamLoad(
-                getBackend(),
-                options.getTableIdentifier().split("\\.")[0],
-                options.getTableIdentifier().split("\\.")[1],
-                options.getUsername(),
-                options.getPassword(),
-                executionOptions.getStreamLoadProp());
+            getBackend(),
+            options.getTableIdentifier().split("\\.")[0],
+            options.getTableIdentifier().split("\\.")[1],
+            options.getUsername(),
+            options.getPassword(),
+            executionOptions.getStreamLoadProp());
         LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr());
 
         if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
@@ -126,9 +134,8 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
 
     private void addBatch(RowData row) {
         StringJoiner value = new StringJoiner(this.fieldDelimiter);
-        GenericRowData rowData = (GenericRowData) row;
-        for (int i = 0; i < row.getArity(); ++i) {
-            Object field = rowData.getField(i);
+        for (int i = 0; i < row.getArity() && i < fieldGetters.length; ++i) {
+            Object field = fieldGetters[i].getFieldOrNull(row);
             if (field != null) {
                 value.add(field.toString());
             } else {
@@ -213,6 +220,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
         private DorisOptions.Builder optionsBuilder;
         private DorisReadOptions readOptions;
         private DorisExecutionOptions executionOptions;
+        private DataType[] fieldDataTypes;
 
         public Builder() {
             this.optionsBuilder = DorisOptions.builder();
@@ -248,9 +256,18 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
             return this;
         }
 
+        public Builder setFieldDataTypes(DataType[] fieldDataTypes) {
+            this.fieldDataTypes = fieldDataTypes;
+            return this;
+        }
+
         public DorisDynamicOutputFormat build() {
+            final LogicalType[] logicalTypes =
+                Arrays.stream(fieldDataTypes)
+                    .map(DataType::getLogicalType)
+                    .toArray(LogicalType[]::new);
             return new DorisDynamicOutputFormat(
-                    optionsBuilder.build(), readOptions, executionOptions
+                optionsBuilder.build(), readOptions, executionOptions, logicalTypes
             );
         }
     }
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 92d69e6..c0d9934 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -38,15 +38,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
+import static org.apache.doris.flink.cfg.ConfigurationOptions.*;
 
 /**
  * The {@link DorisDynamicTableFactory} translates the catalog table to a table source.
@@ -63,92 +55,92 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
 
     // doris options
     private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
-            .key("doris.read.field")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("List of column names in the Doris table, separated by commas");
+        .key("doris.read.field")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("List of column names in the Doris table, separated by commas");
 
     private static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions
-            .key("doris.filter.query")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
+        .key("doris.filter.query")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
 
     private static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions
-            .key("doris.request.tablet.size")
-            .intType()
-            .defaultValue(DORIS_TABLET_SIZE_DEFAULT)
-            .withDescription("");
+        .key("doris.request.tablet.size")
+        .intType()
+        .defaultValue(DORIS_TABLET_SIZE_DEFAULT)
+        .withDescription("");
 
     private static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions
-            .key("doris.request.connect.timeout.ms")
-            .intType()
-            .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
-            .withDescription("");
+        .key("doris.request.connect.timeout.ms")
+        .intType()
+        .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
+        .withDescription("");
 
     private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions
-            .key("doris.request.read.timeout.ms")
-            .intType()
-            .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
-            .withDescription("");
+        .key("doris.request.read.timeout.ms")
+        .intType()
+        .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
+        .withDescription("");
 
     private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions
-            .key("doris.request.query.timeout.s")
-            .intType()
-            .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
-            .withDescription("");
+        .key("doris.request.query.timeout.s")
+        .intType()
+        .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
+        .withDescription("");
 
     private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions
-            .key("doris.request.retries")
-            .intType()
-            .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
-            .withDescription("");
+        .key("doris.request.retries")
+        .intType()
+        .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
+        .withDescription("");
 
     private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions
-            .key("doris.deserialize.arrow.async")
-            .booleanType()
-            .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
-            .withDescription("");
+        .key("doris.deserialize.arrow.async")
+        .booleanType()
+        .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
+        .withDescription("");
 
     private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions
-            .key("doris.request.retriesdoris.deserialize.queue.size")
-            .intType()
-            .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
-            .withDescription("");
+        .key("doris.request.retriesdoris.deserialize.queue.size")
+        .intType()
+        .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
+        .withDescription("");
 
 
     private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions
-            .key("doris.batch.size")
-            .intType()
-            .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
-            .withDescription("");
+        .key("doris.batch.size")
+        .intType()
+        .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
+        .withDescription("");
 
     private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions
-            .key("doris.exec.mem.limit")
-            .longType()
-            .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
-            .withDescription("");
+        .key("doris.exec.mem.limit")
+        .longType()
+        .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
+        .withDescription("");
 
     // flink write config options
     private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions
-            .key("sink.batch.size")
-            .intType()
-            .defaultValue(100)
-            .withDescription("the flush max size (includes all append, upsert and delete records), over this number" +
-                    " of records, will flush data. The default value is 100.");
+        .key("sink.batch.size")
+        .intType()
+        .defaultValue(100)
+        .withDescription("the flush max size (includes all append, upsert and delete records), over this number" +
+            " of records, will flush data. The default value is 100.");
 
     private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
-            .key("sink.max-retries")
-            .intType()
-            .defaultValue(3)
-            .withDescription("the max retry times if writing records to database failed.");
+        .key("sink.max-retries")
+        .intType()
+        .defaultValue(3)
+        .withDescription("the max retry times if writing records to database failed.");
 
     private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
-            .key("sink.batch.interval")
-            .durationType()
-            .defaultValue(Duration.ofSeconds(1))
-            .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
-                    "default value is 1s.");
+        .key("sink.batch.interval")
+        .durationType()
+        .defaultValue(Duration.ofSeconds(1))
+        .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
+            "default value is 1s.");
 
 
     // Prefix for Doris StreamLoad specific properties.
@@ -207,16 +199,16 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
         TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
         // create and return dynamic table source
         return new DorisDynamicTableSource(
-                getDorisOptions(helper.getOptions()),
-                getDorisReadOptions(helper.getOptions()),
-                physicalSchema);
+            getDorisOptions(helper.getOptions()),
+            getDorisReadOptions(helper.getOptions()),
+            physicalSchema);
     }
 
     private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
         final String fenodes = readableConfig.get(FENODES);
         final DorisOptions.Builder builder = DorisOptions.builder()
-                .setFenodes(fenodes)
-                .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
+            .setFenodes(fenodes)
+            .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
 
         readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
         readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
@@ -226,16 +218,16 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
     private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
         final DorisReadOptions.Builder builder = DorisReadOptions.builder();
         builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
-                .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
-                .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
-                .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
-                .setReadFields(readableConfig.get(DORIS_READ_FIELD))
-                .setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
-                .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
-                .setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
-                .setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
-                .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
-                .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE));
+            .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
+            .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
+            .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
+            .setReadFields(readableConfig.get(DORIS_READ_FIELD))
+            .setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
+            .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
+            .setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
+            .setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
+            .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
+            .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE));
         return builder.build();
     }
 
@@ -267,11 +259,14 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
         helper.validateExcept(STREAM_LOAD_PROP_PREFIX);
 
         Properties streamLoadProp = getStreamLoadProp(context.getCatalogTable().getOptions());
+        TableSchema physicalSchema =
+            TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
         // create and return dynamic table source
         return new DorisDynamicTableSink(
-                getDorisOptions(helper.getOptions()),
-                getDorisReadOptions(helper.getOptions()),
-                getDorisExecutionOptions(helper.getOptions(), streamLoadProp)
+            getDorisOptions(helper.getOptions()),
+            getDorisReadOptions(helper.getOptions()),
+            getDorisExecutionOptions(helper.getOptions(), streamLoadProp),
+            physicalSchema
         );
     }
 }
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index dc710d7..0b69ea7 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.table;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.OutputFormatProvider;
@@ -32,38 +33,41 @@ public class DorisDynamicTableSink implements DynamicTableSink {
     private final DorisOptions options;
     private final DorisReadOptions readOptions;
     private final DorisExecutionOptions executionOptions;
+    private final TableSchema tableSchema;
 
-    public DorisDynamicTableSink(DorisOptions options, DorisReadOptions readOptions, DorisExecutionOptions executionOptions) {
+    public DorisDynamicTableSink(DorisOptions options, DorisReadOptions readOptions, DorisExecutionOptions executionOptions, TableSchema tableSchema) {
         this.options = options;
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
+        this.tableSchema = tableSchema;
     }
 
     @Override
     public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
         return ChangelogMode.newBuilder()
-                .addContainedKind(RowKind.INSERT)
-                .addContainedKind(RowKind.DELETE)
-                .addContainedKind(RowKind.UPDATE_AFTER)
-                .build();
+            .addContainedKind(RowKind.INSERT)
+            .addContainedKind(RowKind.DELETE)
+            .addContainedKind(RowKind.UPDATE_AFTER)
+            .build();
     }
 
     @Override
     public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
         DorisDynamicOutputFormat.Builder builder = DorisDynamicOutputFormat.builder()
-                .setFenodes(options.getFenodes())
-                .setUsername(options.getUsername())
-                .setPassword(options.getPassword())
-                .setTableIdentifier(options.getTableIdentifier())
-                .setReadOptions(readOptions)
-                .setExecutionOptions(executionOptions);
+            .setFenodes(options.getFenodes())
+            .setUsername(options.getUsername())
+            .setPassword(options.getPassword())
+            .setTableIdentifier(options.getTableIdentifier())
+            .setReadOptions(readOptions)
+            .setExecutionOptions(executionOptions)
+            .setFieldDataTypes(tableSchema.getFieldDataTypes());;
 
         return OutputFormatProvider.of(builder.build());
     }
 
     @Override
     public DynamicTableSink copy() {
-        return new DorisDynamicTableSink(options, readOptions, executionOptions);
+        return new DorisDynamicTableSink(options, readOptions, executionOptions, tableSchema);
     }
 
     @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org