You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/27 14:25:41 UTC

[inlong] branch master updated: [INLONG-6307][Sort] Add whether to ignore single-table error policy processing for multiple sink (#6308)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 501d5e63c [INLONG-6307][Sort] Add whether to ignore single-table error policy processing for multiple sink (#6308)
501d5e63c is described below

commit 501d5e63c15e649da442248d6ef026bf65ee22f7
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Thu Oct 27 22:25:35 2022 +0800

    [INLONG-6307][Sort] Add whether to ignore single-table error policy processing for multiple sink (#6308)
---
 .../org/apache/inlong/sort/base/Constants.java     |  6 ++
 .../table/DorisDynamicSchemaOutputFormat.java      | 66 +++++++++++++++-------
 .../sort/doris/table/DorisDynamicTableFactory.java |  7 ++-
 .../sort/doris/table/DorisDynamicTableSink.java    | 12 ++--
 4 files changed, 64 insertions(+), 27 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index fb542ef6d..19c58e9c1 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -157,4 +157,10 @@ public final class Constants {
                     .enumType(SchemaUpdateExceptionPolicy.class)
                     .defaultValue(SchemaUpdateExceptionPolicy.TRY_IT_BEST)
                     .withDescription("The action to deal with schema update in multiple sink.");
+
+    public static final ConfigOption<Boolean> SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS =
+            ConfigOptions.key("sink.multiple.ignore-single-table-errors")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Whether ignore the single table erros when multiple sink writing scenario.");
 }
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 8782ab63e..c2db85789 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -75,13 +75,14 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
     private final String databasePattern;
     private final String tablePattern;
     private final String dynamicSchemaFormat;
+    private final boolean ignoreSingleTableErrors;
+    private final transient Map<String, Exception> flushExceptionMap = new HashMap<>();
     private long batchBytes = 0L;
     private int size;
     private DorisStreamLoad dorisStreamLoad;
     private transient volatile boolean closed = false;
     private transient ScheduledExecutorService scheduler;
     private transient ScheduledFuture<?> scheduledFuture;
-    private transient volatile Exception flushException;
     private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
 
     public DorisDynamicSchemaOutputFormat(DorisOptions option,
@@ -89,13 +90,15 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
             DorisExecutionOptions executionOptions,
             String dynamicSchemaFormat,
             String databasePattern,
-            String tablePattern) {
+            String tablePattern,
+            boolean ignoreSingleTableErrors) {
         this.options = option;
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
         this.dynamicSchemaFormat = dynamicSchemaFormat;
         this.databasePattern = databasePattern;
         this.tablePattern = tablePattern;
+        this.ignoreSingleTableErrors = ignoreSingleTableErrors;
     }
 
     /**
@@ -130,26 +133,28 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
             this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
                 synchronized (DorisDynamicSchemaOutputFormat.this) {
                     if (!closed) {
-                        try {
-                            flush();
-                        } catch (Exception e) {
-                            flushException = e;
-                        }
+                        flush();
                     }
                 }
             }, executionOptions.getBatchIntervalMs(), executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
         }
     }
 
-    private void checkFlushException() {
-        if (flushException != null) {
-            throw new RuntimeException("Writing records to streamload failed.", flushException);
+    private boolean checkFlushException(String tableIdentifier) {
+        Exception flushException = flushExceptionMap.get(tableIdentifier);
+        if (flushException == null) {
+            return false;
         }
+        if (!ignoreSingleTableErrors) {
+            throw new RuntimeException(
+                    String.format("Writing records to streamload of tableIdentifier:%s failed.", tableIdentifier),
+                    flushException);
+        }
+        return true;
     }
 
     @Override
     public synchronized void writeRecord(T row) throws IOException {
-        checkFlushException();
         addBatch(row);
         boolean valid = (executionOptions.getBatchSize() > 0 && size >= executionOptions.getBatchSize())
                 || batchBytes >= executionOptions.getMaxBatchBytes();
@@ -169,9 +174,11 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
                 return;
             }
             String tableIdentifier = StringUtils.join(
-                    jsonDynamicSchemaFormat.parse(rowData.getBinary(0), databasePattern),
-                    ".",
-                    jsonDynamicSchemaFormat.parse(rowData.getBinary(0), tablePattern));
+                    jsonDynamicSchemaFormat.parse(rootNode, databasePattern), ".",
+                    jsonDynamicSchemaFormat.parse(rootNode, tablePattern));
+            if (checkFlushException(tableIdentifier)) {
+                return;
+            }
             List<RowKind> rowKinds = jsonDynamicSchemaFormat
                     .opType2RowKind(jsonDynamicSchemaFormat.getOpType(rootNode));
             List<Map<String, String>> physicalDataList = jsonDynamicSchemaFormat.jsonNode2Map(
@@ -247,18 +254,31 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
                 this.dorisStreamLoad.close();
             }
         }
-        checkFlushException();
     }
 
     @SuppressWarnings({"rawtypes"})
-    public synchronized void flush() throws IOException {
-        checkFlushException();
+    public synchronized void flush() {
         if (batchMap.isEmpty()) {
             return;
         }
         for (Entry<String, List> kvs : batchMap.entrySet()) {
-            load(kvs.getKey(), OBJECT_MAPPER.writeValueAsString(kvs.getValue()));
+            if (checkFlushException(kvs.getKey())) {
+                continue;
+            }
+            try {
+                load(kvs.getKey(), OBJECT_MAPPER.writeValueAsString(kvs.getValue()));
+            } catch (Exception e) {
+                flushExceptionMap.put(kvs.getKey(), e);
+                if (!ignoreSingleTableErrors) {
+                    throw new RuntimeException(
+                            String.format("Writing records to streamload of tableIdentifier:%s failed.", kvs.getKey()),
+                            e);
+                }
+                batchMap.remove(kvs.getKey());
+            }
         }
+        batchBytes = 0;
+        size = 0;
     }
 
     private void load(String tableIdentifier, String result) throws IOException {
@@ -284,8 +304,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
                 }
             }
         }
-        batchBytes = 0;
-        size = 0;
     }
 
     private String getBackend() throws IOException {
@@ -309,6 +327,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
         private String dynamicSchemaFormat;
         private String databasePattern;
         private String tablePattern;
+        private boolean ignoreSingleTableErrors;
 
         public Builder() {
             this.optionsBuilder = DorisOptions.builder().setTableIdentifier("");
@@ -355,11 +374,16 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
             return this;
         }
 
+        public DorisDynamicSchemaOutputFormat.Builder setIgnoreSingleTableErrors(boolean ignoreSingleTableErrors) {
+            this.ignoreSingleTableErrors = ignoreSingleTableErrors;
+            return this;
+        }
+
         @SuppressWarnings({"rawtypes"})
         public DorisDynamicSchemaOutputFormat build() {
             return new DorisDynamicSchemaOutputFormat(
                     optionsBuilder.build(), readOptions, executionOptions,
-                    dynamicSchemaFormat, databasePattern, tablePattern);
+                    dynamicSchemaFormat, databasePattern, tablePattern, ignoreSingleTableErrors);
         }
     }
 }
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
index 271cc15b8..98c08c753 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
@@ -57,6 +57,7 @@ import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 
 /**
@@ -208,6 +209,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
         options.add(SINK_MULTIPLE_DATABASE_PATTERN);
         options.add(SINK_MULTIPLE_TABLE_PATTERN);
         options.add(SINK_MULTIPLE_ENABLE);
+        options.add(SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS);
         return options;
     }
 
@@ -290,6 +292,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
         String databasePattern = helper.getOptions().getOptional(SINK_MULTIPLE_DATABASE_PATTERN).orElse(null);
         String tablePattern = helper.getOptions().getOptional(SINK_MULTIPLE_TABLE_PATTERN).orElse(null);
         boolean multipleSink = helper.getOptions().get(SINK_MULTIPLE_ENABLE);
+        boolean ignoreSingleTableErrors = helper.getOptions().get(SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS);
         String sinkMultipleFormat = helper.getOptions().getOptional(SINK_MULTIPLE_FORMAT).orElse(null);
         validateSinkMultiple(physicalSchema.toPhysicalRowDataType(),
                 multipleSink, sinkMultipleFormat, databasePattern, tablePattern);
@@ -298,8 +301,8 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
                 getDorisOptions(helper.getOptions()),
                 getDorisReadOptions(helper.getOptions()),
                 getDorisExecutionOptions(helper.getOptions(), streamLoadProp),
-                physicalSchema, multipleSink, sinkMultipleFormat, databasePattern, tablePattern
-        );
+                physicalSchema, multipleSink, sinkMultipleFormat, databasePattern,
+                tablePattern, ignoreSingleTableErrors);
     }
 
     private void validateSinkMultiple(DataType physicalDataType, boolean multipleSink, String sinkMultipleFormat,
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
index b6499298c..bc847a398 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
@@ -41,6 +41,7 @@ public class DorisDynamicTableSink implements DynamicTableSink {
     private final String sinkMultipleFormat;
     private final String databasePattern;
     private final String tablePattern;
+    private final boolean ignoreSingleTableErrors;
 
     public DorisDynamicTableSink(DorisOptions options,
             DorisReadOptions readOptions,
@@ -49,7 +50,8 @@ public class DorisDynamicTableSink implements DynamicTableSink {
             boolean multipleSink,
             String sinkMultipleFormat,
             String databasePattern,
-            String tablePattern) {
+            String tablePattern,
+            boolean ignoreSingleTableErrors) {
         this.options = options;
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
@@ -58,6 +60,7 @@ public class DorisDynamicTableSink implements DynamicTableSink {
         this.sinkMultipleFormat = sinkMultipleFormat;
         this.databasePattern = databasePattern;
         this.tablePattern = tablePattern;
+        this.ignoreSingleTableErrors = ignoreSingleTableErrors;
     }
 
     @Override
@@ -92,14 +95,15 @@ public class DorisDynamicTableSink implements DynamicTableSink {
                 .setExecutionOptions(executionOptions)
                 .setDatabasePattern(databasePattern)
                 .setTablePattern(tablePattern)
-                .setDynamicSchemaFormat(sinkMultipleFormat);
+                .setDynamicSchemaFormat(sinkMultipleFormat)
+                .setIgnoreSingleTableErrors(ignoreSingleTableErrors);
         return OutputFormatProvider.of(builder.build());
     }
 
     @Override
     public DynamicTableSink copy() {
-        return new DorisDynamicTableSink(options, readOptions, executionOptions,
-                tableSchema, multipleSink, sinkMultipleFormat, databasePattern, tablePattern);
+        return new DorisDynamicTableSink(options, readOptions, executionOptions, tableSchema,
+                multipleSink, sinkMultipleFormat, databasePattern, tablePattern, ignoreSingleTableErrors);
     }
 
     @Override