You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/04 04:43:27 UTC

[inlong] branch branch-1.5 updated (44feb7517 -> 5c37f1efa)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from 44feb7517 [INLONG-7135][Manager] Support the connection test of data nodes and clusters (#7136)
     new 13459e233 [INLONG-7121][Sort] Fix the parsing error of Flink SQL for MongoDB (#7122)
     new 5c37f1efa [INLONG-7126][Sort] Support multiple dirty sink archive helper (#7127)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../protocol/node/extract/MongoExtractNode.java    | 39 +++++++++++++++++++-
 .../node/extract/MongoExtractNodeTest.java         |  4 ++
 .../inlong/sort/base/dirty/DirtySinkHelper.java    | 43 ++++++++++++++++++++++
 .../internal/JdbcMultiBatchingOutputFormat.java    |  3 +-
 .../starrocks/manager/StarRocksSinkManager.java    | 16 ++++----
 .../table/sink/StarRocksDynamicSinkFunction.java   |  2 +-
 .../inlong/sort/parser/impl/FlinkSqlParser.java    | 33 ++++++++++++++---
 7 files changed, 125 insertions(+), 15 deletions(-)


[inlong] 02/02: [INLONG-7126][Sort] Support multiple dirty sink archive helper (#7127)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 5c37f1efafecebb239ea23bf60cf9b1c45a713cf
Author: Yizhou Yang <32...@users.noreply.github.com>
AuthorDate: Wed Jan 4 10:16:19 2023 +0800

    [INLONG-7126][Sort] Support multiple dirty sink archive helper (#7127)
---
 .../inlong/sort/base/dirty/DirtySinkHelper.java    | 43 ++++++++++++++++++++++
 .../internal/JdbcMultiBatchingOutputFormat.java    |  3 +-
 .../starrocks/manager/StarRocksSinkManager.java    | 16 ++++----
 .../table/sink/StarRocksDynamicSinkFunction.java   |  2 +-
 4 files changed, 55 insertions(+), 9 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
index a962b974e..e815db856 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
@@ -18,8 +18,12 @@
 package org.apache.inlong.sort.base.dirty;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Preconditions;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,6 +97,45 @@ public class DirtySinkHelper<T> implements Serializable {
         }
     }
 
+    public void invokeMultiple(T dirtyData, DirtyType dirtyType, Throwable e,
+            String sinkMultipleFormat) {
+        JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
+                (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+        if (!dirtyOptions.ignoreDirty()) {
+            RuntimeException ex;
+            if (e instanceof RuntimeException) {
+                ex = (RuntimeException) e;
+            } else {
+                ex = new RuntimeException(e);
+            }
+            throw ex;
+        }
+        if (dirtySink != null) {
+            JsonNode rootNode;
+            DirtyData.Builder<T> builder = DirtyData.builder();
+            try {
+                rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0));
+            } catch (Exception ex) {
+                invoke(dirtyData, DirtyType.DESERIALIZE_ERROR, e);
+                return;
+            }
+            try {
+                builder.setData(dirtyData)
+                        .setDirtyType(dirtyType)
+                        .setLabels(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLabels()))
+                        .setLogTag(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLogTag()))
+                        .setDirtyMessage(e.getMessage())
+                        .setIdentifier(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getIdentifier()));
+                dirtySink.invoke(builder.build());
+            } catch (Exception ex) {
+                if (!dirtyOptions.ignoreSideOutputErrors()) {
+                    throw new RuntimeException(ex);
+                }
+                LOGGER.warn("Dirty sink failed", ex);
+            }
+        }
+    }
+
     public void setDirtyOptions(DirtyOptions dirtyOptions) {
         this.dirtyOptions = dirtyOptions;
     }
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
index dd4be4d5c..67e9adac0 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
@@ -566,7 +566,8 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc
                         outputMetrics(tableIdentifier, Long.valueOf(tableIdRecordList.size()),
                                 1L, true);
                         if (!schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.THROW_WITH_STOP)) {
-                            dirtySinkHelper.invoke(record, DirtyType.RETRY_LOAD_ERROR, tableException);
+                            dirtySinkHelper.invokeMultiple(record, DirtyType.RETRY_LOAD_ERROR, tableException,
+                                    sinkMultipleFormat);
                         }
                         tableExceptionMap.put(tableIdentifier, tableException);
                         if (stopWritingWhenTableException) {
diff --git a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
index d6a0f0f4e..19c762192 100644
--- a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
+++ b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
@@ -122,7 +122,8 @@ public class StarRocksSinkManager implements Serializable {
     private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
     private transient SinkTableMetricData metricData;
 
-    private final DirtySinkHelper<Object> dirtySinkHelper;;
+    private final DirtySinkHelper<Object> dirtySinkHelper;
+    private String sinkMultipleFormat;
 
     /**
      * If a table writing throws exception, ignore it when receiving data later again
@@ -149,7 +150,6 @@ public class StarRocksSinkManager implements Serializable {
         this.schemaUpdatePolicy = schemaUpdatePolicy;
 
         this.dirtySinkHelper = dirtySinkHelper;
-
         init(flinkSchema);
     }
 
@@ -159,7 +159,8 @@ public class StarRocksSinkManager implements Serializable {
             StarRocksQueryVisitor starrocksQueryVisitor,
             boolean multipleSink,
             SchemaUpdateExceptionPolicy schemaUpdatePolicy,
-            DirtySinkHelper<Object> dirtySinkHelper) {
+            DirtySinkHelper<Object> dirtySinkHelper,
+            String multipleformat) {
         this.sinkOptions = sinkOptions;
         this.jdbcConnProvider = jdbcConnProvider;
         this.starrocksQueryVisitor = starrocksQueryVisitor;
@@ -168,7 +169,7 @@ public class StarRocksSinkManager implements Serializable {
         this.schemaUpdatePolicy = schemaUpdatePolicy;
 
         this.dirtySinkHelper = dirtySinkHelper;
-
+        this.sinkMultipleFormat = multipleformat;
         init(flinkSchema);
     }
 
@@ -450,12 +451,13 @@ public class StarRocksSinkManager implements Serializable {
         // archive dirty data
         if (StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat())) {
             for (byte[] row : flushData.getBuffer()) {
-                dirtySinkHelper.invoke(new String(row, StandardCharsets.UTF_8), DirtyType.BATCH_LOAD_ERROR, e);
+                dirtySinkHelper.invokeMultiple(new String(row, StandardCharsets.UTF_8), DirtyType.BATCH_LOAD_ERROR, e,
+                        sinkMultipleFormat);
             }
         } else if (StarRocksSinkOptions.StreamLoadFormat.JSON.equals(sinkOptions.getStreamLoadFormat())) {
             for (byte[] row : flushData.getBuffer()) {
-                dirtySinkHelper.invoke(OBJECT_MAPPER.readTree(new String(row, StandardCharsets.UTF_8)),
-                        DirtyType.BATCH_LOAD_ERROR, e);
+                dirtySinkHelper.invokeMultiple(OBJECT_MAPPER.readTree(new String(row, StandardCharsets.UTF_8)),
+                        DirtyType.BATCH_LOAD_ERROR, e, sinkMultipleFormat);
             }
         }
 
diff --git a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
index 16b985595..089c86e84 100644
--- a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
+++ b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
@@ -124,7 +124,7 @@ public class StarRocksDynamicSinkFunction<T> extends RichSinkFunction<T> impleme
         StarRocksQueryVisitor starrocksQueryVisitor = new StarRocksQueryVisitor(jdbcConnProvider,
                 sinkOptions.getDatabaseName(), sinkOptions.getTableName());
         this.sinkManager = new StarRocksSinkManager(sinkOptions, schema, jdbcConnProvider, starrocksQueryVisitor,
-                multipleSink, schemaUpdatePolicy, dirtySinkHelper);
+                multipleSink, schemaUpdatePolicy, dirtySinkHelper, sinkMultipleFormat);
 
         rowTransformer.setStarRocksColumns(starrocksQueryVisitor.getFieldMapping());
         rowTransformer.setTableSchema(schema);


[inlong] 01/02: [INLONG-7121][Sort] Fix the parsing error of Flink SQL for MongoDB (#7122)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 13459e233e87c17a9c7da5e00c03119c31f7322c
Author: kuansix <49...@qq.com>
AuthorDate: Wed Jan 4 10:15:38 2023 +0800

    [INLONG-7121][Sort] Fix the parsing error of Flink SQL for MongoDB (#7122)
---
 .../protocol/node/extract/MongoExtractNode.java    | 39 +++++++++++++++++++++-
 .../node/extract/MongoExtractNodeTest.java         |  4 +++
 .../inlong/sort/parser/impl/FlinkSqlParser.java    | 33 +++++++++++++++---
 3 files changed, 70 insertions(+), 6 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNode.java
index 881ab472d..38daa67ef 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNode.java
@@ -120,6 +120,40 @@ public class MongoExtractNode extends ExtractNode implements InlongMetric, Metad
         return options;
     }
 
+    @Override
+    public String getMetadataKey(MetaField metaField) {
+        String metadataKey;
+        switch (metaField) {
+            case TABLE_NAME:
+                metadataKey = "table_name";
+                break;
+            case COLLECTION_NAME:
+                metadataKey = "collection_name";
+                break;
+            case SCHEMA_NAME:
+                metadataKey = "schema_name";
+                break;
+            case DATABASE_NAME:
+                metadataKey = "database_name";
+                break;
+            case OP_TS:
+                metadataKey = "op_ts";
+                break;
+            case DATA_DEBEZIUM:
+            case DATA_BYTES_DEBEZIUM:
+                metadataKey = "meta.data_debezium";
+                break;
+            case DATA_CANAL:
+            case DATA_BYTES_CANAL:
+                metadataKey = "meta.data_canal";
+                break;
+            default:
+                throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
+                        this.getClass().getSimpleName(), metaField));
+        }
+        return metadataKey;
+    }
+
     @Override
     public boolean isVirtual(MetaField metaField) {
         return true;
@@ -127,6 +161,9 @@ public class MongoExtractNode extends ExtractNode implements InlongMetric, Metad
 
     @Override
     public Set<MetaField> supportedMetaFields() {
-        return EnumSet.of(MetaField.PROCESS_TIME, MetaField.COLLECTION_NAME, MetaField.DATABASE_NAME, MetaField.OP_TS);
+        return EnumSet.of(MetaField.PROCESS_TIME, MetaField.COLLECTION_NAME,
+                MetaField.DATABASE_NAME, MetaField.OP_TS,
+                MetaField.DATA_DEBEZIUM, MetaField.DATA_BYTES_DEBEZIUM,
+                MetaField.DATA_CANAL, MetaField.DATA_BYTES_CANAL);
     }
 }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNodeTest.java
index ff4911e0b..db803122e 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNodeTest.java
@@ -51,6 +51,10 @@ public class MongoExtractNodeTest extends SerializeBaseTest<MongoExtractNode> {
         formatMap.put(MetaField.COLLECTION_NAME, "STRING METADATA FROM 'collection_name' VIRTUAL");
         formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 'database_name' VIRTUAL");
         formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL");
+        formatMap.put(MetaField.DATA_BYTES_DEBEZIUM, "BYTES METADATA FROM 'meta.data_debezium' VIRTUAL");
+        formatMap.put(MetaField.DATA_DEBEZIUM, "STRING METADATA FROM 'meta.data_debezium' VIRTUAL");
+        formatMap.put(MetaField.DATA_CANAL, "STRING METADATA FROM 'meta.data_canal' VIRTUAL");
+        formatMap.put(MetaField.DATA_BYTES_CANAL, "BYTES METADATA FROM 'meta.data_canal' VIRTUAL");
         MongoExtractNode node = getTestObject();
         boolean formatEquals = true;
         for (MetaField metaField : node.supportedMetaFields()) {
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 44cc679eb..22f3b968f 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -40,6 +40,7 @@ import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.LoadNode;
 import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
 import org.apache.inlong.sort.protocol.node.transform.TransformNode;
@@ -75,6 +76,7 @@ public class FlinkSqlParser implements Parser {
 
     private static final Logger log = LoggerFactory.getLogger(FlinkSqlParser.class);
 
+    public static final String SOURCE_MULTIPLE_ENABLE_KEY = "source.multiple.enable";
     private final TableEnvironment tableEnv;
     private final GroupInfo groupInfo;
     private final Set<String> hasParsedSet = new HashSet<>();
@@ -742,8 +744,9 @@ public class FlinkSqlParser implements Parser {
         }
         StringBuilder sb = new StringBuilder("CREATE TABLE `");
         sb.append(node.genTableName()).append("`(\n");
-        sb.append(genPrimaryKey(node.getPrimaryKey()));
-        sb.append(parseFields(node.getFields(), node));
+        String filterPrimaryKey = getFilterPrimaryKey(node);
+        sb.append(genPrimaryKey(node.getPrimaryKey(), filterPrimaryKey));
+        sb.append(parseFields(node.getFields(), node, filterPrimaryKey));
         if (node instanceof ExtractNode) {
             ExtractNode extractNode = (ExtractNode) node;
             if (extractNode.getWatermarkField() != null) {
@@ -759,6 +762,19 @@ public class FlinkSqlParser implements Parser {
         return sb.toString();
     }
 
+    /**
+     * Get filter PrimaryKey for Mongo when multi-sink mode
+     */
+    private String getFilterPrimaryKey(Node node) {
+        if (node instanceof MongoExtractNode) {
+            if (null != node.getProperties().get(SOURCE_MULTIPLE_ENABLE_KEY)
+                    && node.getProperties().get(SOURCE_MULTIPLE_ENABLE_KEY).equals("true")) {
+                return node.getPrimaryKey();
+            }
+        }
+        return null;
+    }
+
     /**
      * Gen create table DDL for hbase load
      */
@@ -857,11 +873,15 @@ public class FlinkSqlParser implements Parser {
      *
      * @param fields The fields defined in node
      * @param node The abstract of extract, transform, load
+     * @param filterPrimaryKey filter PrimaryKey, use for mongo
      * @return Field formats in select sql
      */
-    private String parseFields(List<FieldInfo> fields, Node node) {
+    private String parseFields(List<FieldInfo> fields, Node node, String filterPrimaryKey) {
         StringBuilder sb = new StringBuilder();
         for (FieldInfo field : fields) {
+            if (StringUtils.isNotBlank(filterPrimaryKey) && field.getName().equals(filterPrimaryKey)) {
+                continue;
+            }
             sb.append("    `").append(field.getName()).append("` ");
             if (field instanceof MetaFieldInfo) {
                 if (!(node instanceof Metadata)) {
@@ -890,10 +910,13 @@ public class FlinkSqlParser implements Parser {
      * Generate primary key format in sql
      *
      * @param primaryKey The primary key of table
+     * @param filterPrimaryKey filter PrimaryKey, use for mongo
      * @return Primary key format in sql
      */
-    private String genPrimaryKey(String primaryKey) {
-        if (StringUtils.isNotBlank(primaryKey)) {
+    private String genPrimaryKey(String primaryKey, String filterPrimaryKey) {
+        boolean checkPrimaryKeyFlag = StringUtils.isNotBlank(primaryKey)
+                && (StringUtils.isBlank(filterPrimaryKey) || !primaryKey.equals(filterPrimaryKey));
+        if (checkPrimaryKeyFlag) {
             primaryKey = String.format("    PRIMARY KEY (%s) NOT ENFORCED,\n",
                     StringUtils.join(formatFields(primaryKey.split(",")), ","));
         } else {