You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/28 09:42:23 UTC
[inlong] branch master updated: [INLONG-7934][Manager] Optimize the serializationType to support debezium json (#7935)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b38bdc0f9 [INLONG-7934][Manager] Optimize the serializationType to support debezium json (#7935)
b38bdc0f9 is described below
commit b38bdc0f9de7ea0eef4fbdd9bc350fd74603e214
Author: haifxu <xh...@gmail.com>
AuthorDate: Fri Apr 28 17:42:17 2023 +0800
[INLONG-7934][Manager] Optimize the serializationType to support debezium json (#7935)
---
.../pojo/source/mongodb/MongoDBSourceRequest.java | 2 ++
.../manager/pojo/source/oracle/OracleSourceRequest.java | 2 ++
.../pojo/source/postgresql/PostgreSQLSourceRequest.java | 2 ++
.../pojo/source/sqlserver/SQLServerSourceRequest.java | 2 ++
.../manager/service/source/AbstractSourceOperator.java | 16 ++++++++++++++++
.../service/source/kafka/KafkaSourceOperator.java | 10 ++--------
.../service/source/pulsar/PulsarSourceOperator.java | 11 +++--------
.../service/source/tubemq/TubeMQSourceOperator.java | 9 ++-------
8 files changed, 31 insertions(+), 23 deletions(-)
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mongodb/MongoDBSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mongodb/MongoDBSourceRequest.java
index 95200c9d9..d1466ce44 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mongodb/MongoDBSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mongodb/MongoDBSourceRequest.java
@@ -23,6 +23,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataFormat;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.source.SourceRequest;
@@ -56,6 +57,7 @@ public class MongoDBSourceRequest extends SourceRequest {
public MongoDBSourceRequest() {
this.setSourceType(SourceType.MONGODB);
+ this.setSerializationType(DataFormat.DEBEZIUM_JSON.getName());
}
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oracle/OracleSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oracle/OracleSourceRequest.java
index 8cc3c1627..aacd0489b 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oracle/OracleSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oracle/OracleSourceRequest.java
@@ -23,6 +23,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataFormat;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.source.SourceRequest;
@@ -68,6 +69,7 @@ public class OracleSourceRequest extends SourceRequest {
public OracleSourceRequest() {
this.setSourceType(SourceType.ORACLE);
+ this.setSerializationType(DataFormat.DEBEZIUM_JSON.getName());
}
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/postgresql/PostgreSQLSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/postgresql/PostgreSQLSourceRequest.java
index 854a7ef45..15136662b 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/postgresql/PostgreSQLSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/postgresql/PostgreSQLSourceRequest.java
@@ -23,6 +23,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataFormat;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.source.SourceRequest;
@@ -73,6 +74,7 @@ public class PostgreSQLSourceRequest extends SourceRequest {
public PostgreSQLSourceRequest() {
this.setSourceType(SourceType.POSTGRESQL);
+ this.setSerializationType(DataFormat.DEBEZIUM_JSON.getName());
}
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sqlserver/SQLServerSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sqlserver/SQLServerSourceRequest.java
index 519531378..9dafc11e4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sqlserver/SQLServerSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sqlserver/SQLServerSourceRequest.java
@@ -23,6 +23,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataFormat;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.source.SourceRequest;
@@ -68,6 +69,7 @@ public class SQLServerSourceRequest extends SourceRequest {
public SQLServerSourceRequest() {
this.setSourceType(SourceType.SQLSERVER);
+ this.setSerializationType(DataFormat.DEBEZIUM_JSON.getName());
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 3c2584c03..a6a25b5e5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source;
import com.github.pagehelper.Page;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -306,4 +307,19 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
sourceFieldMapper.insertAll(entityList);
LOGGER.debug("success to save source fields");
}
+
+ /**
+ * If the stream source can only use one data type, return the data type that has been set.
+ *
+ * @param streamSource stream source
+ * @param streamDataType stream data type
+ * @return serialization type
+ */
+ protected String getSerializationType(StreamSource streamSource, String streamDataType) {
+ if (StringUtils.isNotBlank(streamSource.getSerializationType())) {
+ return streamSource.getSerializationType();
+ }
+
+ return DataTypeEnum.forType(streamDataType).getType();
+ }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index bb5e5e3d9..f2a15de59 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -113,10 +113,6 @@ public class KafkaSourceOperator extends AbstractSourceOperator {
kafkaSource.setSourceName(streamId);
kafkaSource.setBootstrapServers(bootstrapServers);
kafkaSource.setTopic(streamInfo.getMqResource());
- if (StringUtils.isNotBlank(streamInfo.getDataType())) {
- String serializationType = DataTypeEnum.forType(streamInfo.getDataType()).getType();
- kafkaSource.setSerializationType(serializationType);
- }
String topicName = streamInfo.getMqResource();
if (StringUtils.isBlank(topicName) || topicName.equals(streamId)) {
// the default mq resource (stream id) is not sufficient to discriminate different kafka topics
@@ -131,10 +127,8 @@ public class KafkaSourceOperator extends AbstractSourceOperator {
if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) {
continue;
}
- if (StringUtils.isEmpty(kafkaSource.getSerializationType()) && StringUtils.isNotEmpty(
- sourceInfo.getSerializationType())) {
- kafkaSource.setSerializationType(sourceInfo.getSerializationType());
- }
+
+ kafkaSource.setSerializationType(getSerializationType(sourceInfo, streamInfo.getDataType()));
}
// if the SerializationType is still null, set it to the CSV
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 96fcf1cd0..1cb579a09 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -131,10 +131,6 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
pulsarSource.setAdminUrl(adminUrl);
pulsarSource.setServiceUrl(serviceUrl);
pulsarSource.setInlongComponent(true);
- if (StringUtils.isNotBlank(streamInfo.getDataType())) {
- String serializationType = DataTypeEnum.forType(streamInfo.getDataType()).getType();
- pulsarSource.setSerializationType(serializationType);
- }
pulsarSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
pulsarSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
@@ -149,10 +145,9 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
if (!Objects.equal(streamId, sourceInfo.getInlongStreamId())) {
continue;
}
- if (StringUtils.isEmpty(pulsarSource.getSerializationType())
- && StringUtils.isNotEmpty(sourceInfo.getSerializationType())) {
- pulsarSource.setSerializationType(sourceInfo.getSerializationType());
- }
+
+ pulsarSource.setSerializationType(getSerializationType(sourceInfo, streamInfo.getDataType()));
+
// currently, only reuse the primary key from Kafka source
if (SourceType.KAFKA.equals(sourceInfo.getSourceType())) {
pulsarSource.setPrimaryKey(((KafkaSource) sourceInfo).getPrimaryKey());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
index 142977e9f..2dda1f507 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
@@ -20,8 +20,6 @@ package org.apache.inlong.manager.service.source.tubemq;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -110,17 +108,14 @@ public class TubeMQSourceOperator extends AbstractSourceOperator {
tubeMQSource.setTopic(streamInfo.getMqResource());
tubeMQSource.setGroupId(streamId);
tubeMQSource.setMasterRpc(masterRpc);
- if (StringUtils.isNotBlank(streamInfo.getDataType())) {
- String serializationType = DataTypeEnum.forType(streamInfo.getDataType()).getType();
- tubeMQSource.setSerializationType(serializationType);
- }
tubeMQSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
for (StreamSource sourceInfo : streamSources) {
if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) {
continue;
}
- tubeMQSource.setSerializationType(sourceInfo.getSerializationType());
+
+ tubeMQSource.setSerializationType(getSerializationType(sourceInfo, streamInfo.getDataType()));
}
tubeMQSource.setFieldList(streamInfo.getFieldList());
sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(tubeMQSource);