You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by yu...@apache.org on 2022/10/31 09:20:03 UTC

[inlong] branch master updated: [INLONG-6322][Sort] Fix write data incorrect for doris connector with sink multiple scenario (#6323)

This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ca24d2ede [INLONG-6322][Sort] Fix write data incorrect for doris connector with sink multiple scenario (#6323)
ca24d2ede is described below

commit ca24d2edea046611f8c77d70b1465b550f146aaa
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Mon Oct 31 17:19:57 2022 +0800

    [INLONG-6322][Sort] Fix write data incorrect for doris connector with sink multiple scenario (#6323)
    
    * [INLONG-6322][Sort] Fix write data incorrect for doris connector with sink multiple scenario
    
    * [INLONG-6322][Sort] Fix flushing update error
    
    * [INLONG-6322][Sort] Optimize flush handle
    
    * [INLONG-6322][Sort] Fix comment error
---
 .../table/DorisDynamicSchemaOutputFormat.java      | 68 +++++++++++++++++-----
 1 file changed, 53 insertions(+), 15 deletions(-)

diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index c2db85789..9e8142ae7 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * DorisDynamicSchemaOutputFormat, copy from {@link org.apache.doris.flink.table.DorisDynamicOutputFormat}
@@ -76,11 +77,16 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
     private final String tablePattern;
     private final String dynamicSchemaFormat;
     private final boolean ignoreSingleTableErrors;
-    private final transient Map<String, Exception> flushExceptionMap = new HashMap<>();
+    private final Map<String, Exception> flushExceptionMap = new HashMap<>();
+    private final AtomicLong readInNum = new AtomicLong(0);
+    private final AtomicLong writeOutNum = new AtomicLong(0);
+    private final AtomicLong errorNum = new AtomicLong(0);
+    private final AtomicLong ddlNum = new AtomicLong(0);
     private long batchBytes = 0L;
     private int size;
     private DorisStreamLoad dorisStreamLoad;
     private transient volatile boolean closed = false;
+    private transient volatile boolean flushing = false;
     private transient ScheduledExecutorService scheduler;
     private transient ScheduledFuture<?> scheduledFuture;
     private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
@@ -131,10 +137,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
             this.scheduler = new ScheduledThreadPoolExecutor(1,
                     new ExecutorThreadFactory("doris-streamload-output-format"));
             this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
-                synchronized (DorisDynamicSchemaOutputFormat.this) {
-                    if (!closed) {
-                        flush();
-                    }
+                if (!closed && !flushing) {
+                    flush();
                 }
             }, executionOptions.getBatchIntervalMs(), executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
         }
@@ -158,7 +162,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
         addBatch(row);
         boolean valid = (executionOptions.getBatchSize() > 0 && size >= executionOptions.getBatchSize())
                 || batchBytes >= executionOptions.getMaxBatchBytes();
-        if (valid) {
+        if (valid && !flushing) {
             flush();
         }
     }
@@ -168,8 +172,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
         if (row instanceof RowData) {
             RowData rowData = (RowData) row;
             JsonNode rootNode = jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0));
+            readInNum.incrementAndGet();
             boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
             if (isDDL) {
+                ddlNum.incrementAndGet();
                 // Ignore ddl change for now
                 return;
             }
@@ -226,6 +232,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
                             }
                             break;
                         default:
+                            errorNum.incrementAndGet();
                             throw new RuntimeException("Unrecognized row kind:" + rowKind.toString());
                     }
                 }
@@ -239,12 +246,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
     public synchronized void close() throws IOException {
         if (!closed) {
             closed = true;
-
             if (this.scheduledFuture != null) {
                 scheduledFuture.cancel(false);
                 this.scheduler.shutdown();
             }
-
             try {
                 flush();
             } catch (Exception e) {
@@ -258,27 +263,61 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
 
     @SuppressWarnings({"rawtypes"})
     public synchronized void flush() {
-        if (batchMap.isEmpty()) {
+        flushing = true;
+        if (!hasRecords()) {
+            flushing = false;
             return;
         }
+        List<String> errorTables = new ArrayList<>();
         for (Entry<String, List> kvs : batchMap.entrySet()) {
-            if (checkFlushException(kvs.getKey())) {
+            if (checkFlushException(kvs.getKey()) || kvs.getValue().isEmpty()) {
                 continue;
             }
+            String loadValue = null;
             try {
-                load(kvs.getKey(), OBJECT_MAPPER.writeValueAsString(kvs.getValue()));
+                loadValue = OBJECT_MAPPER.writeValueAsString(kvs.getValue());
+                load(kvs.getKey(), loadValue);
+                LOG.info("load {} records to tableIdentifier: {}", kvs.getValue().size(), kvs.getKey());
+                writeOutNum.addAndGet(kvs.getValue().size());
+                // Clean the data that has been loaded.
+                kvs.getValue().clear();
             } catch (Exception e) {
                 flushExceptionMap.put(kvs.getKey(), e);
+                errorNum.getAndAdd(kvs.getValue().size());
                 if (!ignoreSingleTableErrors) {
                     throw new RuntimeException(
-                            String.format("Writing records to streamload of tableIdentifier:%s failed.", kvs.getKey()),
-                            e);
+                            String.format("Writing records to streamload of tableIdentifier:%s failed, the value: %s.",
+                                    kvs.getKey(), loadValue), e);
                 }
-                batchMap.remove(kvs.getKey());
+                errorTables.add(kvs.getKey());
+                LOG.warn("The tableIdentifier: {} load failed and the data will be throw away in the future"
+                        + " because the option 'sink.multiple.ignore-single-table-errors' is 'true'", kvs.getKey());
             }
         }
+        if (!errorTables.isEmpty()) {
+            // Clean the key that has errors
+            errorTables.forEach(batchMap::remove);
+        }
         batchBytes = 0;
         size = 0;
+        LOG.info("Doris sink statistics: readInNum: {}, writeOutNum: {}, errorNum: {}, ddlNum: {}",
+                readInNum.get(), writeOutNum.get(), errorNum.get(), ddlNum.get());
+        flushing = false;
+    }
+
+    @SuppressWarnings("rawtypes")
+    private boolean hasRecords() {
+        if (batchMap.isEmpty()) {
+            return false;
+        }
+        boolean hasRecords = false;
+        for (List value : batchMap.values()) {
+            if (!value.isEmpty()) {
+                hasRecords = true;
+                break;
+            }
+        }
+        return hasRecords;
     }
 
     private void load(String tableIdentifier, String result) throws IOException {
@@ -286,7 +325,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
         for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
             try {
                 dorisStreamLoad.load(tableWithDb[0], tableWithDb[1], result);
-                batchMap.remove(tableIdentifier);
                 break;
             } catch (StreamLoadException e) {
                 LOG.error("doris sink error, retry times = {}", i, e);