You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/04 04:43:28 UTC
[inlong] 01/02: [INLONG-7121][Sort] Fix the parsing error of Flink SQL for MongoDB (#7122)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 13459e233e87c17a9c7da5e00c03119c31f7322c
Author: kuansix <49...@qq.com>
AuthorDate: Wed Jan 4 10:15:38 2023 +0800
[INLONG-7121][Sort] Fix the parsing error of Flink SQL for MongoDB (#7122)
---
.../protocol/node/extract/MongoExtractNode.java | 39 +++++++++++++++++++++-
.../node/extract/MongoExtractNodeTest.java | 4 +++
.../inlong/sort/parser/impl/FlinkSqlParser.java | 33 +++++++++++++++---
3 files changed, 70 insertions(+), 6 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNode.java
index 881ab472d..38daa67ef 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNode.java
@@ -120,6 +120,40 @@ public class MongoExtractNode extends ExtractNode implements InlongMetric, Metad
return options;
}
+ @Override
+ public String getMetadataKey(MetaField metaField) {
+ String metadataKey;
+ switch (metaField) {
+ case TABLE_NAME:
+ metadataKey = "table_name";
+ break;
+ case COLLECTION_NAME:
+ metadataKey = "collection_name";
+ break;
+ case SCHEMA_NAME:
+ metadataKey = "schema_name";
+ break;
+ case DATABASE_NAME:
+ metadataKey = "database_name";
+ break;
+ case OP_TS:
+ metadataKey = "op_ts";
+ break;
+ case DATA_DEBEZIUM:
+ case DATA_BYTES_DEBEZIUM:
+ metadataKey = "meta.data_debezium";
+ break;
+ case DATA_CANAL:
+ case DATA_BYTES_CANAL:
+ metadataKey = "meta.data_canal";
+ break;
+ default:
+ throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
+ this.getClass().getSimpleName(), metaField));
+ }
+ return metadataKey;
+ }
+
@Override
public boolean isVirtual(MetaField metaField) {
return true;
@@ -127,6 +161,9 @@ public class MongoExtractNode extends ExtractNode implements InlongMetric, Metad
@Override
public Set<MetaField> supportedMetaFields() {
- return EnumSet.of(MetaField.PROCESS_TIME, MetaField.COLLECTION_NAME, MetaField.DATABASE_NAME, MetaField.OP_TS);
+ return EnumSet.of(MetaField.PROCESS_TIME, MetaField.COLLECTION_NAME,
+ MetaField.DATABASE_NAME, MetaField.OP_TS,
+ MetaField.DATA_DEBEZIUM, MetaField.DATA_BYTES_DEBEZIUM,
+ MetaField.DATA_CANAL, MetaField.DATA_BYTES_CANAL);
}
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNodeTest.java
index ff4911e0b..db803122e 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNodeTest.java
@@ -51,6 +51,10 @@ public class MongoExtractNodeTest extends SerializeBaseTest<MongoExtractNode> {
formatMap.put(MetaField.COLLECTION_NAME, "STRING METADATA FROM 'collection_name' VIRTUAL");
formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 'database_name' VIRTUAL");
formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL");
+ formatMap.put(MetaField.DATA_BYTES_DEBEZIUM, "BYTES METADATA FROM 'meta.data_debezium' VIRTUAL");
+ formatMap.put(MetaField.DATA_DEBEZIUM, "STRING METADATA FROM 'meta.data_debezium' VIRTUAL");
+ formatMap.put(MetaField.DATA_CANAL, "STRING METADATA FROM 'meta.data_canal' VIRTUAL");
+ formatMap.put(MetaField.DATA_BYTES_CANAL, "BYTES METADATA FROM 'meta.data_canal' VIRTUAL");
MongoExtractNode node = getTestObject();
boolean formatEquals = true;
for (MetaField metaField : node.supportedMetaFields()) {
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 44cc679eb..22f3b968f 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -40,6 +40,7 @@ import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.LoadNode;
import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
import org.apache.inlong.sort.protocol.node.transform.TransformNode;
@@ -75,6 +76,7 @@ public class FlinkSqlParser implements Parser {
private static final Logger log = LoggerFactory.getLogger(FlinkSqlParser.class);
+ public static final String SOURCE_MULTIPLE_ENABLE_KEY = "source.multiple.enable";
private final TableEnvironment tableEnv;
private final GroupInfo groupInfo;
private final Set<String> hasParsedSet = new HashSet<>();
@@ -742,8 +744,9 @@ public class FlinkSqlParser implements Parser {
}
StringBuilder sb = new StringBuilder("CREATE TABLE `");
sb.append(node.genTableName()).append("`(\n");
- sb.append(genPrimaryKey(node.getPrimaryKey()));
- sb.append(parseFields(node.getFields(), node));
+ String filterPrimaryKey = getFilterPrimaryKey(node);
+ sb.append(genPrimaryKey(node.getPrimaryKey(), filterPrimaryKey));
+ sb.append(parseFields(node.getFields(), node, filterPrimaryKey));
if (node instanceof ExtractNode) {
ExtractNode extractNode = (ExtractNode) node;
if (extractNode.getWatermarkField() != null) {
@@ -759,6 +762,19 @@ public class FlinkSqlParser implements Parser {
return sb.toString();
}
+ /**
+ * Get filter PrimaryKey for Mongo when multi-sink mode
+ */
+ private String getFilterPrimaryKey(Node node) {
+ if (node instanceof MongoExtractNode) {
+ if (null != node.getProperties().get(SOURCE_MULTIPLE_ENABLE_KEY)
+ && node.getProperties().get(SOURCE_MULTIPLE_ENABLE_KEY).equals("true")) {
+ return node.getPrimaryKey();
+ }
+ }
+ return null;
+ }
+
/**
* Gen create table DDL for hbase load
*/
@@ -857,11 +873,15 @@ public class FlinkSqlParser implements Parser {
*
* @param fields The fields defined in node
* @param node The abstract of extract, transform, load
+ * @param filterPrimaryKey filter PrimaryKey, use for mongo
* @return Field formats in select sql
*/
- private String parseFields(List<FieldInfo> fields, Node node) {
+ private String parseFields(List<FieldInfo> fields, Node node, String filterPrimaryKey) {
StringBuilder sb = new StringBuilder();
for (FieldInfo field : fields) {
+ if (StringUtils.isNotBlank(filterPrimaryKey) && field.getName().equals(filterPrimaryKey)) {
+ continue;
+ }
sb.append(" `").append(field.getName()).append("` ");
if (field instanceof MetaFieldInfo) {
if (!(node instanceof Metadata)) {
@@ -890,10 +910,13 @@ public class FlinkSqlParser implements Parser {
* Generate primary key format in sql
*
* @param primaryKey The primary key of table
+ * @param filterPrimaryKey filter PrimaryKey, use for mongo
* @return Primary key format in sql
*/
- private String genPrimaryKey(String primaryKey) {
- if (StringUtils.isNotBlank(primaryKey)) {
+ private String genPrimaryKey(String primaryKey, String filterPrimaryKey) {
+ boolean checkPrimaryKeyFlag = StringUtils.isNotBlank(primaryKey)
+ && (StringUtils.isBlank(filterPrimaryKey) || !primaryKey.equals(filterPrimaryKey));
+ if (checkPrimaryKeyFlag) {
primaryKey = String.format(" PRIMARY KEY (%s) NOT ENFORCED,\n",
StringUtils.join(formatFields(primaryKey.split(",")), ","));
} else {