You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/09/24 13:38:43 UTC
[incubator-doris] branch master updated: [Flink] Fix bug of flink
doris connector (#6655)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 68529d2 [Flink] Fix bug of flink doris connector (#6655)
68529d2 is described below
commit 68529d20f3e6827ab424bc3d88fb3826e3a7d308
Author: xhmz <ya...@163.com>
AuthorDate: Fri Sep 24 21:38:35 2021 +0800
[Flink] Fix bug of flink doris connector (#6655)
Flink-Doris-Connector do not support flink 1.13, refactor doris sink forma
to not use GenericRowData. But to use RowData::FieldGetter.
---
.../flink/table/DorisDynamicOutputFormat.java | 45 ++++--
.../flink/table/DorisDynamicTableFactory.java | 165 ++++++++++-----------
.../doris/flink/table/DorisDynamicTableSink.java | 28 ++--
3 files changed, 127 insertions(+), 111 deletions(-)
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 77b53ba..6ee8834 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -25,22 +25,24 @@ import org.apache.doris.flink.rest.RestService;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
-import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
-import java.util.Properties;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
/**
* DorisDynamicOutputFormat
@@ -69,12 +71,18 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
private transient ScheduledFuture<?> scheduledFuture;
private transient volatile Exception flushException;
- public DorisDynamicOutputFormat(DorisOptions option, DorisReadOptions readOptions, DorisExecutionOptions executionOptions) {
+ private final RowData.FieldGetter[] fieldGetters;
+
+ public DorisDynamicOutputFormat(DorisOptions option, DorisReadOptions readOptions, DorisExecutionOptions executionOptions, LogicalType[] logicalTypes) {
this.options = option;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT);
this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
+ this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
+ for (int i = 0; i < logicalTypes.length; i++) {
+ fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
+ }
}
@Override
@@ -84,12 +92,12 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
@Override
public void open(int taskNumber, int numTasks) throws IOException {
dorisStreamLoad = new DorisStreamLoad(
- getBackend(),
- options.getTableIdentifier().split("\\.")[0],
- options.getTableIdentifier().split("\\.")[1],
- options.getUsername(),
- options.getPassword(),
- executionOptions.getStreamLoadProp());
+ getBackend(),
+ options.getTableIdentifier().split("\\.")[0],
+ options.getTableIdentifier().split("\\.")[1],
+ options.getUsername(),
+ options.getPassword(),
+ executionOptions.getStreamLoadProp());
LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr());
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
@@ -126,9 +134,8 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
private void addBatch(RowData row) {
StringJoiner value = new StringJoiner(this.fieldDelimiter);
- GenericRowData rowData = (GenericRowData) row;
- for (int i = 0; i < row.getArity(); ++i) {
- Object field = rowData.getField(i);
+ for (int i = 0; i < row.getArity() && i < fieldGetters.length; ++i) {
+ Object field = fieldGetters[i].getFieldOrNull(row);
if (field != null) {
value.add(field.toString());
} else {
@@ -213,6 +220,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
private DorisOptions.Builder optionsBuilder;
private DorisReadOptions readOptions;
private DorisExecutionOptions executionOptions;
+ private DataType[] fieldDataTypes;
public Builder() {
this.optionsBuilder = DorisOptions.builder();
@@ -248,9 +256,18 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
return this;
}
+ public Builder setFieldDataTypes(DataType[] fieldDataTypes) {
+ this.fieldDataTypes = fieldDataTypes;
+ return this;
+ }
+
public DorisDynamicOutputFormat build() {
+ final LogicalType[] logicalTypes =
+ Arrays.stream(fieldDataTypes)
+ .map(DataType::getLogicalType)
+ .toArray(LogicalType[]::new);
return new DorisDynamicOutputFormat(
- optionsBuilder.build(), readOptions, executionOptions
+ optionsBuilder.build(), readOptions, executionOptions, logicalTypes
);
}
}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 92d69e6..c0d9934 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -38,15 +38,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
+import static org.apache.doris.flink.cfg.ConfigurationOptions.*;
/**
* The {@link DorisDynamicTableFactory} translates the catalog table to a table source.
@@ -63,92 +55,92 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
// doris options
private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
- .key("doris.read.field")
- .stringType()
- .noDefaultValue()
- .withDescription("List of column names in the Doris table, separated by commas");
+ .key("doris.read.field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("List of column names in the Doris table, separated by commas");
private static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions
- .key("doris.filter.query")
- .stringType()
- .noDefaultValue()
- .withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
+ .key("doris.filter.query")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
private static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions
- .key("doris.request.tablet.size")
- .intType()
- .defaultValue(DORIS_TABLET_SIZE_DEFAULT)
- .withDescription("");
+ .key("doris.request.tablet.size")
+ .intType()
+ .defaultValue(DORIS_TABLET_SIZE_DEFAULT)
+ .withDescription("");
private static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions
- .key("doris.request.connect.timeout.ms")
- .intType()
- .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
- .withDescription("");
+ .key("doris.request.connect.timeout.ms")
+ .intType()
+ .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
+ .withDescription("");
private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions
- .key("doris.request.read.timeout.ms")
- .intType()
- .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
- .withDescription("");
+ .key("doris.request.read.timeout.ms")
+ .intType()
+ .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
+ .withDescription("");
private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions
- .key("doris.request.query.timeout.s")
- .intType()
- .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
- .withDescription("");
+ .key("doris.request.query.timeout.s")
+ .intType()
+ .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
+ .withDescription("");
private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions
- .key("doris.request.retries")
- .intType()
- .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
- .withDescription("");
+ .key("doris.request.retries")
+ .intType()
+ .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
+ .withDescription("");
private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions
- .key("doris.deserialize.arrow.async")
- .booleanType()
- .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
- .withDescription("");
+ .key("doris.deserialize.arrow.async")
+ .booleanType()
+ .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
+ .withDescription("");
private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions
- .key("doris.request.retriesdoris.deserialize.queue.size")
- .intType()
- .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
- .withDescription("");
+ .key("doris.request.retriesdoris.deserialize.queue.size")
+ .intType()
+ .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
+ .withDescription("");
private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions
- .key("doris.batch.size")
- .intType()
- .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
- .withDescription("");
+ .key("doris.batch.size")
+ .intType()
+ .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
+ .withDescription("");
private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions
- .key("doris.exec.mem.limit")
- .longType()
- .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
- .withDescription("");
+ .key("doris.exec.mem.limit")
+ .longType()
+ .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
+ .withDescription("");
// flink write config options
private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions
- .key("sink.batch.size")
- .intType()
- .defaultValue(100)
- .withDescription("the flush max size (includes all append, upsert and delete records), over this number" +
- " of records, will flush data. The default value is 100.");
+ .key("sink.batch.size")
+ .intType()
+ .defaultValue(100)
+ .withDescription("the flush max size (includes all append, upsert and delete records), over this number" +
+ " of records, will flush data. The default value is 100.");
private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
- .key("sink.max-retries")
- .intType()
- .defaultValue(3)
- .withDescription("the max retry times if writing records to database failed.");
+ .key("sink.max-retries")
+ .intType()
+ .defaultValue(3)
+ .withDescription("the max retry times if writing records to database failed.");
private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
- .key("sink.batch.interval")
- .durationType()
- .defaultValue(Duration.ofSeconds(1))
- .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
- "default value is 1s.");
+ .key("sink.batch.interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(1))
+ .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
+ "default value is 1s.");
// Prefix for Doris StreamLoad specific properties.
@@ -207,16 +199,16 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
// create and return dynamic table source
return new DorisDynamicTableSource(
- getDorisOptions(helper.getOptions()),
- getDorisReadOptions(helper.getOptions()),
- physicalSchema);
+ getDorisOptions(helper.getOptions()),
+ getDorisReadOptions(helper.getOptions()),
+ physicalSchema);
}
private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
final String fenodes = readableConfig.get(FENODES);
final DorisOptions.Builder builder = DorisOptions.builder()
- .setFenodes(fenodes)
- .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
+ .setFenodes(fenodes)
+ .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
@@ -226,16 +218,16 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
final DorisReadOptions.Builder builder = DorisReadOptions.builder();
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
- .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
- .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
- .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
- .setReadFields(readableConfig.get(DORIS_READ_FIELD))
- .setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
- .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
- .setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
- .setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
- .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
- .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE));
+ .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
+ .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
+ .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
+ .setReadFields(readableConfig.get(DORIS_READ_FIELD))
+ .setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
+ .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
+ .setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
+ .setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
+ .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
+ .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE));
return builder.build();
}
@@ -267,11 +259,14 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
helper.validateExcept(STREAM_LOAD_PROP_PREFIX);
Properties streamLoadProp = getStreamLoadProp(context.getCatalogTable().getOptions());
+ TableSchema physicalSchema =
+ TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
// create and return dynamic table source
return new DorisDynamicTableSink(
- getDorisOptions(helper.getOptions()),
- getDorisReadOptions(helper.getOptions()),
- getDorisExecutionOptions(helper.getOptions(), streamLoadProp)
+ getDorisOptions(helper.getOptions()),
+ getDorisReadOptions(helper.getOptions()),
+ getDorisExecutionOptions(helper.getOptions(), streamLoadProp),
+ physicalSchema
);
}
}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index dc710d7..0b69ea7 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.table;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
@@ -32,38 +33,41 @@ public class DorisDynamicTableSink implements DynamicTableSink {
private final DorisOptions options;
private final DorisReadOptions readOptions;
private final DorisExecutionOptions executionOptions;
+ private final TableSchema tableSchema;
- public DorisDynamicTableSink(DorisOptions options, DorisReadOptions readOptions, DorisExecutionOptions executionOptions) {
+ public DorisDynamicTableSink(DorisOptions options, DorisReadOptions readOptions, DorisExecutionOptions executionOptions, TableSchema tableSchema) {
this.options = options;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
+ this.tableSchema = tableSchema;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
return ChangelogMode.newBuilder()
- .addContainedKind(RowKind.INSERT)
- .addContainedKind(RowKind.DELETE)
- .addContainedKind(RowKind.UPDATE_AFTER)
- .build();
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.DELETE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .build();
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
DorisDynamicOutputFormat.Builder builder = DorisDynamicOutputFormat.builder()
- .setFenodes(options.getFenodes())
- .setUsername(options.getUsername())
- .setPassword(options.getPassword())
- .setTableIdentifier(options.getTableIdentifier())
- .setReadOptions(readOptions)
- .setExecutionOptions(executionOptions);
+ .setFenodes(options.getFenodes())
+ .setUsername(options.getUsername())
+ .setPassword(options.getPassword())
+ .setTableIdentifier(options.getTableIdentifier())
+ .setReadOptions(readOptions)
+ .setExecutionOptions(executionOptions)
+ .setFieldDataTypes(tableSchema.getFieldDataTypes());;
return OutputFormatProvider.of(builder.build());
}
@Override
public DynamicTableSink copy() {
- return new DorisDynamicTableSink(options, readOptions, executionOptions);
+ return new DorisDynamicTableSink(options, readOptions, executionOptions, tableSchema);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org