You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/27 14:25:41 UTC
[inlong] branch master updated: [INLONG-6307][Sort] Add whether to ignore single-table error policy processing for multiple sink (#6308)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 501d5e63c [INLONG-6307][Sort] Add whether to ignore single-table error policy processing for multiple sink (#6308)
501d5e63c is described below
commit 501d5e63c15e649da442248d6ef026bf65ee22f7
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Thu Oct 27 22:25:35 2022 +0800
[INLONG-6307][Sort] Add whether to ignore single-table error policy processing for multiple sink (#6308)
---
.../org/apache/inlong/sort/base/Constants.java | 6 ++
.../table/DorisDynamicSchemaOutputFormat.java | 66 +++++++++++++++-------
.../sort/doris/table/DorisDynamicTableFactory.java | 7 ++-
.../sort/doris/table/DorisDynamicTableSink.java | 12 ++--
4 files changed, 64 insertions(+), 27 deletions(-)
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index fb542ef6d..19c58e9c1 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -157,4 +157,10 @@ public final class Constants {
.enumType(SchemaUpdateExceptionPolicy.class)
.defaultValue(SchemaUpdateExceptionPolicy.TRY_IT_BEST)
.withDescription("The action to deal with schema update in multiple sink.");
+
+ public static final ConfigOption<Boolean> SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS =
+ ConfigOptions.key("sink.multiple.ignore-single-table-errors")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether ignore the single table erros when multiple sink writing scenario.");
}
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 8782ab63e..c2db85789 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -75,13 +75,14 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
private final String databasePattern;
private final String tablePattern;
private final String dynamicSchemaFormat;
+ private final boolean ignoreSingleTableErrors;
+ private final transient Map<String, Exception> flushExceptionMap = new HashMap<>();
private long batchBytes = 0L;
private int size;
private DorisStreamLoad dorisStreamLoad;
private transient volatile boolean closed = false;
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
- private transient volatile Exception flushException;
private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
public DorisDynamicSchemaOutputFormat(DorisOptions option,
@@ -89,13 +90,15 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
DorisExecutionOptions executionOptions,
String dynamicSchemaFormat,
String databasePattern,
- String tablePattern) {
+ String tablePattern,
+ boolean ignoreSingleTableErrors) {
this.options = option;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
this.dynamicSchemaFormat = dynamicSchemaFormat;
this.databasePattern = databasePattern;
this.tablePattern = tablePattern;
+ this.ignoreSingleTableErrors = ignoreSingleTableErrors;
}
/**
@@ -130,26 +133,28 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
synchronized (DorisDynamicSchemaOutputFormat.this) {
if (!closed) {
- try {
- flush();
- } catch (Exception e) {
- flushException = e;
- }
+ flush();
}
}
}, executionOptions.getBatchIntervalMs(), executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
}
}
- private void checkFlushException() {
- if (flushException != null) {
- throw new RuntimeException("Writing records to streamload failed.", flushException);
+ private boolean checkFlushException(String tableIdentifier) {
+ Exception flushException = flushExceptionMap.get(tableIdentifier);
+ if (flushException == null) {
+ return false;
}
+ if (!ignoreSingleTableErrors) {
+ throw new RuntimeException(
+ String.format("Writing records to streamload of tableIdentifier:%s failed.", tableIdentifier),
+ flushException);
+ }
+ return true;
}
@Override
public synchronized void writeRecord(T row) throws IOException {
- checkFlushException();
addBatch(row);
boolean valid = (executionOptions.getBatchSize() > 0 && size >= executionOptions.getBatchSize())
|| batchBytes >= executionOptions.getMaxBatchBytes();
@@ -169,9 +174,11 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
return;
}
String tableIdentifier = StringUtils.join(
- jsonDynamicSchemaFormat.parse(rowData.getBinary(0), databasePattern),
- ".",
- jsonDynamicSchemaFormat.parse(rowData.getBinary(0), tablePattern));
+ jsonDynamicSchemaFormat.parse(rootNode, databasePattern), ".",
+ jsonDynamicSchemaFormat.parse(rootNode, tablePattern));
+ if (checkFlushException(tableIdentifier)) {
+ return;
+ }
List<RowKind> rowKinds = jsonDynamicSchemaFormat
.opType2RowKind(jsonDynamicSchemaFormat.getOpType(rootNode));
List<Map<String, String>> physicalDataList = jsonDynamicSchemaFormat.jsonNode2Map(
@@ -247,18 +254,31 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
this.dorisStreamLoad.close();
}
}
- checkFlushException();
}
@SuppressWarnings({"rawtypes"})
- public synchronized void flush() throws IOException {
- checkFlushException();
+ public synchronized void flush() {
if (batchMap.isEmpty()) {
return;
}
for (Entry<String, List> kvs : batchMap.entrySet()) {
- load(kvs.getKey(), OBJECT_MAPPER.writeValueAsString(kvs.getValue()));
+ if (checkFlushException(kvs.getKey())) {
+ continue;
+ }
+ try {
+ load(kvs.getKey(), OBJECT_MAPPER.writeValueAsString(kvs.getValue()));
+ } catch (Exception e) {
+ flushExceptionMap.put(kvs.getKey(), e);
+ if (!ignoreSingleTableErrors) {
+ throw new RuntimeException(
+ String.format("Writing records to streamload of tableIdentifier:%s failed.", kvs.getKey()),
+ e);
+ }
+ batchMap.remove(kvs.getKey());
+ }
}
+ batchBytes = 0;
+ size = 0;
}
private void load(String tableIdentifier, String result) throws IOException {
@@ -284,8 +304,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
}
}
}
- batchBytes = 0;
- size = 0;
}
private String getBackend() throws IOException {
@@ -309,6 +327,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
private String dynamicSchemaFormat;
private String databasePattern;
private String tablePattern;
+ private boolean ignoreSingleTableErrors;
public Builder() {
this.optionsBuilder = DorisOptions.builder().setTableIdentifier("");
@@ -355,11 +374,16 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
return this;
}
+ public DorisDynamicSchemaOutputFormat.Builder setIgnoreSingleTableErrors(boolean ignoreSingleTableErrors) {
+ this.ignoreSingleTableErrors = ignoreSingleTableErrors;
+ return this;
+ }
+
@SuppressWarnings({"rawtypes"})
public DorisDynamicSchemaOutputFormat build() {
return new DorisDynamicSchemaOutputFormat(
optionsBuilder.build(), readOptions, executionOptions,
- dynamicSchemaFormat, databasePattern, tablePattern);
+ dynamicSchemaFormat, databasePattern, tablePattern, ignoreSingleTableErrors);
}
}
}
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
index 271cc15b8..98c08c753 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
@@ -57,6 +57,7 @@ import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
/**
@@ -208,6 +209,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
options.add(SINK_MULTIPLE_DATABASE_PATTERN);
options.add(SINK_MULTIPLE_TABLE_PATTERN);
options.add(SINK_MULTIPLE_ENABLE);
+ options.add(SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS);
return options;
}
@@ -290,6 +292,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
String databasePattern = helper.getOptions().getOptional(SINK_MULTIPLE_DATABASE_PATTERN).orElse(null);
String tablePattern = helper.getOptions().getOptional(SINK_MULTIPLE_TABLE_PATTERN).orElse(null);
boolean multipleSink = helper.getOptions().get(SINK_MULTIPLE_ENABLE);
+ boolean ignoreSingleTableErrors = helper.getOptions().get(SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS);
String sinkMultipleFormat = helper.getOptions().getOptional(SINK_MULTIPLE_FORMAT).orElse(null);
validateSinkMultiple(physicalSchema.toPhysicalRowDataType(),
multipleSink, sinkMultipleFormat, databasePattern, tablePattern);
@@ -298,8 +301,8 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
getDorisExecutionOptions(helper.getOptions(), streamLoadProp),
- physicalSchema, multipleSink, sinkMultipleFormat, databasePattern, tablePattern
- );
+ physicalSchema, multipleSink, sinkMultipleFormat, databasePattern,
+ tablePattern, ignoreSingleTableErrors);
}
private void validateSinkMultiple(DataType physicalDataType, boolean multipleSink, String sinkMultipleFormat,
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
index b6499298c..bc847a398 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
@@ -41,6 +41,7 @@ public class DorisDynamicTableSink implements DynamicTableSink {
private final String sinkMultipleFormat;
private final String databasePattern;
private final String tablePattern;
+ private final boolean ignoreSingleTableErrors;
public DorisDynamicTableSink(DorisOptions options,
DorisReadOptions readOptions,
@@ -49,7 +50,8 @@ public class DorisDynamicTableSink implements DynamicTableSink {
boolean multipleSink,
String sinkMultipleFormat,
String databasePattern,
- String tablePattern) {
+ String tablePattern,
+ boolean ignoreSingleTableErrors) {
this.options = options;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
@@ -58,6 +60,7 @@ public class DorisDynamicTableSink implements DynamicTableSink {
this.sinkMultipleFormat = sinkMultipleFormat;
this.databasePattern = databasePattern;
this.tablePattern = tablePattern;
+ this.ignoreSingleTableErrors = ignoreSingleTableErrors;
}
@Override
@@ -92,14 +95,15 @@ public class DorisDynamicTableSink implements DynamicTableSink {
.setExecutionOptions(executionOptions)
.setDatabasePattern(databasePattern)
.setTablePattern(tablePattern)
- .setDynamicSchemaFormat(sinkMultipleFormat);
+ .setDynamicSchemaFormat(sinkMultipleFormat)
+ .setIgnoreSingleTableErrors(ignoreSingleTableErrors);
return OutputFormatProvider.of(builder.build());
}
@Override
public DynamicTableSink copy() {
- return new DorisDynamicTableSink(options, readOptions, executionOptions,
- tableSchema, multipleSink, sinkMultipleFormat, databasePattern, tablePattern);
+ return new DorisDynamicTableSink(options, readOptions, executionOptions, tableSchema,
+ multipleSink, sinkMultipleFormat, databasePattern, tablePattern, ignoreSingleTableErrors);
}
@Override