You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/28 09:42:23 UTC

[inlong] branch master updated: [INLONG-7934][Manager] Optimize the serializationType to support debezium json (#7935)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b38bdc0f9 [INLONG-7934][Manager] Optimize the serializationType to support debezium json (#7935)
b38bdc0f9 is described below

commit b38bdc0f9de7ea0eef4fbdd9bc350fd74603e214
Author: haifxu <xh...@gmail.com>
AuthorDate: Fri Apr 28 17:42:17 2023 +0800

    [INLONG-7934][Manager] Optimize the serializationType to support debezium json (#7935)
---
 .../pojo/source/mongodb/MongoDBSourceRequest.java        |  2 ++
 .../manager/pojo/source/oracle/OracleSourceRequest.java  |  2 ++
 .../pojo/source/postgresql/PostgreSQLSourceRequest.java  |  2 ++
 .../pojo/source/sqlserver/SQLServerSourceRequest.java    |  2 ++
 .../manager/service/source/AbstractSourceOperator.java   | 16 ++++++++++++++++
 .../service/source/kafka/KafkaSourceOperator.java        | 10 ++--------
 .../service/source/pulsar/PulsarSourceOperator.java      | 11 +++--------
 .../service/source/tubemq/TubeMQSourceOperator.java      |  9 ++-------
 8 files changed, 31 insertions(+), 23 deletions(-)

diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mongodb/MongoDBSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mongodb/MongoDBSourceRequest.java
index 95200c9d9..d1466ce44 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mongodb/MongoDBSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mongodb/MongoDBSourceRequest.java
@@ -23,6 +23,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataFormat;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
@@ -56,6 +57,7 @@ public class MongoDBSourceRequest extends SourceRequest {
 
     public MongoDBSourceRequest() {
         this.setSourceType(SourceType.MONGODB);
+        this.setSerializationType(DataFormat.DEBEZIUM_JSON.getName());
     }
 
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oracle/OracleSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oracle/OracleSourceRequest.java
index 8cc3c1627..aacd0489b 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oracle/OracleSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oracle/OracleSourceRequest.java
@@ -23,6 +23,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataFormat;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
@@ -68,6 +69,7 @@ public class OracleSourceRequest extends SourceRequest {
 
     public OracleSourceRequest() {
         this.setSourceType(SourceType.ORACLE);
+        this.setSerializationType(DataFormat.DEBEZIUM_JSON.getName());
     }
 
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/postgresql/PostgreSQLSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/postgresql/PostgreSQLSourceRequest.java
index 854a7ef45..15136662b 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/postgresql/PostgreSQLSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/postgresql/PostgreSQLSourceRequest.java
@@ -23,6 +23,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataFormat;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
@@ -73,6 +74,7 @@ public class PostgreSQLSourceRequest extends SourceRequest {
 
     public PostgreSQLSourceRequest() {
         this.setSourceType(SourceType.POSTGRESQL);
+        this.setSerializationType(DataFormat.DEBEZIUM_JSON.getName());
     }
 
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sqlserver/SQLServerSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sqlserver/SQLServerSourceRequest.java
index 519531378..9dafc11e4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sqlserver/SQLServerSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sqlserver/SQLServerSourceRequest.java
@@ -23,6 +23,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataFormat;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
@@ -68,6 +69,7 @@ public class SQLServerSourceRequest extends SourceRequest {
 
     public SQLServerSourceRequest() {
         this.setSourceType(SourceType.SQLSERVER);
+        this.setSerializationType(DataFormat.DEBEZIUM_JSON.getName());
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 3c2584c03..a6a25b5e5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source;
 import com.github.pagehelper.Page;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -306,4 +307,19 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
         sourceFieldMapper.insertAll(entityList);
         LOGGER.debug("success to save source fields");
     }
+
+    /**
+     * If the stream source can only use one data type, return the data type that has been set.
+     *
+     * @param streamSource stream source
+     * @param streamDataType stream data type
+     * @return serialization type
+     */
+    protected String getSerializationType(StreamSource streamSource, String streamDataType) {
+        if (StringUtils.isNotBlank(streamSource.getSerializationType())) {
+            return streamSource.getSerializationType();
+        }
+
+        return DataTypeEnum.forType(streamDataType).getType();
+    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index bb5e5e3d9..f2a15de59 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -113,10 +113,6 @@ public class KafkaSourceOperator extends AbstractSourceOperator {
             kafkaSource.setSourceName(streamId);
             kafkaSource.setBootstrapServers(bootstrapServers);
             kafkaSource.setTopic(streamInfo.getMqResource());
-            if (StringUtils.isNotBlank(streamInfo.getDataType())) {
-                String serializationType = DataTypeEnum.forType(streamInfo.getDataType()).getType();
-                kafkaSource.setSerializationType(serializationType);
-            }
             String topicName = streamInfo.getMqResource();
             if (StringUtils.isBlank(topicName) || topicName.equals(streamId)) {
                 // the default mq resource (stream id) is not sufficient to discriminate different kafka topics
@@ -131,10 +127,8 @@ public class KafkaSourceOperator extends AbstractSourceOperator {
                 if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) {
                     continue;
                 }
-                if (StringUtils.isEmpty(kafkaSource.getSerializationType()) && StringUtils.isNotEmpty(
-                        sourceInfo.getSerializationType())) {
-                    kafkaSource.setSerializationType(sourceInfo.getSerializationType());
-                }
+
+                kafkaSource.setSerializationType(getSerializationType(sourceInfo, streamInfo.getDataType()));
             }
 
             // if the SerializationType is still null, set it to the CSV
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 96fcf1cd0..1cb579a09 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -131,10 +131,6 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
             pulsarSource.setAdminUrl(adminUrl);
             pulsarSource.setServiceUrl(serviceUrl);
             pulsarSource.setInlongComponent(true);
-            if (StringUtils.isNotBlank(streamInfo.getDataType())) {
-                String serializationType = DataTypeEnum.forType(streamInfo.getDataType()).getType();
-                pulsarSource.setSerializationType(serializationType);
-            }
             pulsarSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
             pulsarSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
 
@@ -149,10 +145,9 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
                 if (!Objects.equal(streamId, sourceInfo.getInlongStreamId())) {
                     continue;
                 }
-                if (StringUtils.isEmpty(pulsarSource.getSerializationType())
-                        && StringUtils.isNotEmpty(sourceInfo.getSerializationType())) {
-                    pulsarSource.setSerializationType(sourceInfo.getSerializationType());
-                }
+
+                pulsarSource.setSerializationType(getSerializationType(sourceInfo, streamInfo.getDataType()));
+
                 // currently, only reuse the primary key from Kafka source
                 if (SourceType.KAFKA.equals(sourceInfo.getSourceType())) {
                     pulsarSource.setPrimaryKey(((KafkaSource) sourceInfo).getPrimaryKey());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
index 142977e9f..2dda1f507 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
@@ -20,8 +20,6 @@ package org.apache.inlong.manager.service.source.tubemq;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -110,17 +108,14 @@ public class TubeMQSourceOperator extends AbstractSourceOperator {
             tubeMQSource.setTopic(streamInfo.getMqResource());
             tubeMQSource.setGroupId(streamId);
             tubeMQSource.setMasterRpc(masterRpc);
-            if (StringUtils.isNotBlank(streamInfo.getDataType())) {
-                String serializationType = DataTypeEnum.forType(streamInfo.getDataType()).getType();
-                tubeMQSource.setSerializationType(serializationType);
-            }
             tubeMQSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
 
             for (StreamSource sourceInfo : streamSources) {
                 if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) {
                     continue;
                 }
-                tubeMQSource.setSerializationType(sourceInfo.getSerializationType());
+
+                tubeMQSource.setSerializationType(getSerializationType(sourceInfo, streamInfo.getDataType()));
             }
             tubeMQSource.setFieldList(streamInfo.getFieldList());
             sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(tubeMQSource);