You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/04 04:43:29 UTC

[inlong] 02/02: [INLONG-7126][Sort] Support multiple dirty sink archive helper (#7127)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 5c37f1efafecebb239ea23bf60cf9b1c45a713cf
Author: Yizhou Yang <32...@users.noreply.github.com>
AuthorDate: Wed Jan 4 10:16:19 2023 +0800

    [INLONG-7126][Sort] Support multiple dirty sink archive helper (#7127)
---
 .../inlong/sort/base/dirty/DirtySinkHelper.java    | 43 ++++++++++++++++++++++
 .../internal/JdbcMultiBatchingOutputFormat.java    |  3 +-
 .../starrocks/manager/StarRocksSinkManager.java    | 16 ++++----
 .../table/sink/StarRocksDynamicSinkFunction.java   |  2 +-
 4 files changed, 55 insertions(+), 9 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
index a962b974e..e815db856 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
@@ -18,8 +18,12 @@
 package org.apache.inlong.sort.base.dirty;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Preconditions;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,6 +97,45 @@ public class DirtySinkHelper<T> implements Serializable {
         }
     }
 
+    public void invokeMultiple(T dirtyData, DirtyType dirtyType, Throwable e,
+            String sinkMultipleFormat) {
+        JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
+                (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+        if (!dirtyOptions.ignoreDirty()) {
+            RuntimeException ex;
+            if (e instanceof RuntimeException) {
+                ex = (RuntimeException) e;
+            } else {
+                ex = new RuntimeException(e);
+            }
+            throw ex;
+        }
+        if (dirtySink != null) {
+            JsonNode rootNode;
+            DirtyData.Builder<T> builder = DirtyData.builder();
+            try {
+                rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0));
+            } catch (Exception ex) {
+                invoke(dirtyData, DirtyType.DESERIALIZE_ERROR, e);
+                return;
+            }
+            try {
+                builder.setData(dirtyData)
+                        .setDirtyType(dirtyType)
+                        .setLabels(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLabels()))
+                        .setLogTag(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLogTag()))
+                        .setDirtyMessage(e.getMessage())
+                        .setIdentifier(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getIdentifier()));
+                dirtySink.invoke(builder.build());
+            } catch (Exception ex) {
+                if (!dirtyOptions.ignoreSideOutputErrors()) {
+                    throw new RuntimeException(ex);
+                }
+                LOGGER.warn("Dirty sink failed", ex);
+            }
+        }
+    }
+
     public void setDirtyOptions(DirtyOptions dirtyOptions) {
         this.dirtyOptions = dirtyOptions;
     }
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
index dd4be4d5c..67e9adac0 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
@@ -566,7 +566,8 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc
                         outputMetrics(tableIdentifier, Long.valueOf(tableIdRecordList.size()),
                                 1L, true);
                         if (!schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.THROW_WITH_STOP)) {
-                            dirtySinkHelper.invoke(record, DirtyType.RETRY_LOAD_ERROR, tableException);
+                            dirtySinkHelper.invokeMultiple(record, DirtyType.RETRY_LOAD_ERROR, tableException,
+                                    sinkMultipleFormat);
                         }
                         tableExceptionMap.put(tableIdentifier, tableException);
                         if (stopWritingWhenTableException) {
diff --git a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
index d6a0f0f4e..19c762192 100644
--- a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
+++ b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
@@ -122,7 +122,8 @@ public class StarRocksSinkManager implements Serializable {
     private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
     private transient SinkTableMetricData metricData;
 
-    private final DirtySinkHelper<Object> dirtySinkHelper;;
+    private final DirtySinkHelper<Object> dirtySinkHelper;
+    private String sinkMultipleFormat;
 
     /**
      * If a table writing throws exception, ignore it when receiving data later again
@@ -149,7 +150,6 @@ public class StarRocksSinkManager implements Serializable {
         this.schemaUpdatePolicy = schemaUpdatePolicy;
 
         this.dirtySinkHelper = dirtySinkHelper;
-
         init(flinkSchema);
     }
 
@@ -159,7 +159,8 @@ public class StarRocksSinkManager implements Serializable {
             StarRocksQueryVisitor starrocksQueryVisitor,
             boolean multipleSink,
             SchemaUpdateExceptionPolicy schemaUpdatePolicy,
-            DirtySinkHelper<Object> dirtySinkHelper) {
+            DirtySinkHelper<Object> dirtySinkHelper,
+            String multipleformat) {
         this.sinkOptions = sinkOptions;
         this.jdbcConnProvider = jdbcConnProvider;
         this.starrocksQueryVisitor = starrocksQueryVisitor;
@@ -168,7 +169,7 @@ public class StarRocksSinkManager implements Serializable {
         this.schemaUpdatePolicy = schemaUpdatePolicy;
 
         this.dirtySinkHelper = dirtySinkHelper;
-
+        this.sinkMultipleFormat = multipleformat;
         init(flinkSchema);
     }
 
@@ -450,12 +451,13 @@ public class StarRocksSinkManager implements Serializable {
         // archive dirty data
         if (StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat())) {
             for (byte[] row : flushData.getBuffer()) {
-                dirtySinkHelper.invoke(new String(row, StandardCharsets.UTF_8), DirtyType.BATCH_LOAD_ERROR, e);
+                dirtySinkHelper.invokeMultiple(new String(row, StandardCharsets.UTF_8), DirtyType.BATCH_LOAD_ERROR, e,
+                        sinkMultipleFormat);
             }
         } else if (StarRocksSinkOptions.StreamLoadFormat.JSON.equals(sinkOptions.getStreamLoadFormat())) {
             for (byte[] row : flushData.getBuffer()) {
-                dirtySinkHelper.invoke(OBJECT_MAPPER.readTree(new String(row, StandardCharsets.UTF_8)),
-                        DirtyType.BATCH_LOAD_ERROR, e);
+                dirtySinkHelper.invokeMultiple(OBJECT_MAPPER.readTree(new String(row, StandardCharsets.UTF_8)),
+                        DirtyType.BATCH_LOAD_ERROR, e, sinkMultipleFormat);
             }
         }
 
diff --git a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
index 16b985595..089c86e84 100644
--- a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
+++ b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
@@ -124,7 +124,7 @@ public class StarRocksDynamicSinkFunction<T> extends RichSinkFunction<T> impleme
         StarRocksQueryVisitor starrocksQueryVisitor = new StarRocksQueryVisitor(jdbcConnProvider,
                 sinkOptions.getDatabaseName(), sinkOptions.getTableName());
         this.sinkManager = new StarRocksSinkManager(sinkOptions, schema, jdbcConnProvider, starrocksQueryVisitor,
-                multipleSink, schemaUpdatePolicy, dirtySinkHelper);
+                multipleSink, schemaUpdatePolicy, dirtySinkHelper, sinkMultipleFormat);
 
         rowTransformer.setStarRocksColumns(starrocksQueryVisitor.getFieldMapping());
         rowTransformer.setTableSchema(schema);