You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/04 04:43:28 UTC

[inlong] 01/02: [INLONG-7121][Sort] Fix the parsing error of Flink SQL for MongoDB (#7122)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 13459e233e87c17a9c7da5e00c03119c31f7322c
Author: kuansix <49...@qq.com>
AuthorDate: Wed Jan 4 10:15:38 2023 +0800

    [INLONG-7121][Sort] Fix the parsing error of Flink SQL for MongoDB (#7122)
---
 .../protocol/node/extract/MongoExtractNode.java    | 39 +++++++++++++++++++++-
 .../node/extract/MongoExtractNodeTest.java         |  4 +++
 .../inlong/sort/parser/impl/FlinkSqlParser.java    | 33 +++++++++++++++---
 3 files changed, 70 insertions(+), 6 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNode.java
index 881ab472d..38daa67ef 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNode.java
@@ -120,6 +120,40 @@ public class MongoExtractNode extends ExtractNode implements InlongMetric, Metad
         return options;
     }
 
+    @Override
+    public String getMetadataKey(MetaField metaField) {
+        String metadataKey;
+        switch (metaField) {
+            case TABLE_NAME:
+                metadataKey = "table_name";
+                break;
+            case COLLECTION_NAME:
+                metadataKey = "collection_name";
+                break;
+            case SCHEMA_NAME:
+                metadataKey = "schema_name";
+                break;
+            case DATABASE_NAME:
+                metadataKey = "database_name";
+                break;
+            case OP_TS:
+                metadataKey = "op_ts";
+                break;
+            case DATA_DEBEZIUM:
+            case DATA_BYTES_DEBEZIUM:
+                metadataKey = "meta.data_debezium";
+                break;
+            case DATA_CANAL:
+            case DATA_BYTES_CANAL:
+                metadataKey = "meta.data_canal";
+                break;
+            default:
+                throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
+                        this.getClass().getSimpleName(), metaField));
+        }
+        return metadataKey;
+    }
+
     @Override
     public boolean isVirtual(MetaField metaField) {
         return true;
@@ -127,6 +161,9 @@ public class MongoExtractNode extends ExtractNode implements InlongMetric, Metad
 
     @Override
     public Set<MetaField> supportedMetaFields() {
-        return EnumSet.of(MetaField.PROCESS_TIME, MetaField.COLLECTION_NAME, MetaField.DATABASE_NAME, MetaField.OP_TS);
+        return EnumSet.of(MetaField.PROCESS_TIME, MetaField.COLLECTION_NAME,
+                MetaField.DATABASE_NAME, MetaField.OP_TS,
+                MetaField.DATA_DEBEZIUM, MetaField.DATA_BYTES_DEBEZIUM,
+                MetaField.DATA_CANAL, MetaField.DATA_BYTES_CANAL);
     }
 }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNodeTest.java
index ff4911e0b..db803122e 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNodeTest.java
@@ -51,6 +51,10 @@ public class MongoExtractNodeTest extends SerializeBaseTest<MongoExtractNode> {
         formatMap.put(MetaField.COLLECTION_NAME, "STRING METADATA FROM 'collection_name' VIRTUAL");
         formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 'database_name' VIRTUAL");
         formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL");
+        formatMap.put(MetaField.DATA_BYTES_DEBEZIUM, "BYTES METADATA FROM 'meta.data_debezium' VIRTUAL");
+        formatMap.put(MetaField.DATA_DEBEZIUM, "STRING METADATA FROM 'meta.data_debezium' VIRTUAL");
+        formatMap.put(MetaField.DATA_CANAL, "STRING METADATA FROM 'meta.data_canal' VIRTUAL");
+        formatMap.put(MetaField.DATA_BYTES_CANAL, "BYTES METADATA FROM 'meta.data_canal' VIRTUAL");
         MongoExtractNode node = getTestObject();
         boolean formatEquals = true;
         for (MetaField metaField : node.supportedMetaFields()) {
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 44cc679eb..22f3b968f 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -40,6 +40,7 @@ import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.LoadNode;
 import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
 import org.apache.inlong.sort.protocol.node.transform.TransformNode;
@@ -75,6 +76,7 @@ public class FlinkSqlParser implements Parser {
 
     private static final Logger log = LoggerFactory.getLogger(FlinkSqlParser.class);
 
+    public static final String SOURCE_MULTIPLE_ENABLE_KEY = "source.multiple.enable";
     private final TableEnvironment tableEnv;
     private final GroupInfo groupInfo;
     private final Set<String> hasParsedSet = new HashSet<>();
@@ -742,8 +744,9 @@ public class FlinkSqlParser implements Parser {
         }
         StringBuilder sb = new StringBuilder("CREATE TABLE `");
         sb.append(node.genTableName()).append("`(\n");
-        sb.append(genPrimaryKey(node.getPrimaryKey()));
-        sb.append(parseFields(node.getFields(), node));
+        String filterPrimaryKey = getFilterPrimaryKey(node);
+        sb.append(genPrimaryKey(node.getPrimaryKey(), filterPrimaryKey));
+        sb.append(parseFields(node.getFields(), node, filterPrimaryKey));
         if (node instanceof ExtractNode) {
             ExtractNode extractNode = (ExtractNode) node;
             if (extractNode.getWatermarkField() != null) {
@@ -759,6 +762,19 @@ public class FlinkSqlParser implements Parser {
         return sb.toString();
     }
 
+    /**
+     * Get filter PrimaryKey for Mongo when multi-sink mode
+     */
+    private String getFilterPrimaryKey(Node node) {
+        if (node instanceof MongoExtractNode) {
+            if (null != node.getProperties().get(SOURCE_MULTIPLE_ENABLE_KEY)
+                    && node.getProperties().get(SOURCE_MULTIPLE_ENABLE_KEY).equals("true")) {
+                return node.getPrimaryKey();
+            }
+        }
+        return null;
+    }
+
     /**
      * Gen create table DDL for hbase load
      */
@@ -857,11 +873,15 @@ public class FlinkSqlParser implements Parser {
      *
      * @param fields The fields defined in node
      * @param node The abstract of extract, transform, load
+     * @param filterPrimaryKey filter PrimaryKey, use for mongo
      * @return Field formats in select sql
      */
-    private String parseFields(List<FieldInfo> fields, Node node) {
+    private String parseFields(List<FieldInfo> fields, Node node, String filterPrimaryKey) {
         StringBuilder sb = new StringBuilder();
         for (FieldInfo field : fields) {
+            if (StringUtils.isNotBlank(filterPrimaryKey) && field.getName().equals(filterPrimaryKey)) {
+                continue;
+            }
             sb.append("    `").append(field.getName()).append("` ");
             if (field instanceof MetaFieldInfo) {
                 if (!(node instanceof Metadata)) {
@@ -890,10 +910,13 @@ public class FlinkSqlParser implements Parser {
      * Generate primary key format in sql
      *
      * @param primaryKey The primary key of table
+     * @param filterPrimaryKey filter PrimaryKey, use for mongo
      * @return Primary key format in sql
      */
-    private String genPrimaryKey(String primaryKey) {
-        if (StringUtils.isNotBlank(primaryKey)) {
+    private String genPrimaryKey(String primaryKey, String filterPrimaryKey) {
+        boolean checkPrimaryKeyFlag = StringUtils.isNotBlank(primaryKey)
+                && (StringUtils.isBlank(filterPrimaryKey) || !primaryKey.equals(filterPrimaryKey));
+        if (checkPrimaryKeyFlag) {
             primaryKey = String.format("    PRIMARY KEY (%s) NOT ENFORCED,\n",
                     StringUtils.join(formatFields(primaryKey.split(",")), ","));
         } else {