You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/23 03:36:54 UTC
[incubator-inlong] branch master updated: [INLONG-2271] move TDMsg to InLongMsg (#2273)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 912a30a [INLONG-2271] move TDMsg to InLongMsg (#2273)
912a30a is described below
commit 912a30ab7c85eab61d09849e8c5f4c9345e4d876
Author: dockerzhang <do...@apache.org>
AuthorDate: Sun Jan 23 11:36:47 2022 +0800
[INLONG-2271] move TDMsg to InLongMsg (#2273)
Co-authored-by: dockerzhang(张超) <do...@tencent.com>
---
LICENSE | 2 +-
docker/kubernetes/templates/NOTES.txt | 78 +--
inlong-audit/audit-store/pom.xml | 9 +-
inlong-audit/pom.xml | 8 +-
.../commons/msg/{TDMsg1.java => InLongMsg.java} | 48 +-
...gAttrBuilder.java => InLongMsgAttrBuilder.java} | 60 +--
.../dataproxy/http/SimpleMessageHandler.java | 12 +-
.../dataproxy/source/ServerMessageHandler.java | 22 +-
.../dataproxy/source/SimpleMessageHandler.java | 22 +-
.../thirdpart/sort/PushHiveConfigTaskListener.java | 6 +-
.../deserialization/DeserializationInfo.java | 10 +-
....java => InLongMsgCsv2DeserializationInfo.java} | 6 +-
...o.java => InLongMsgCsvDeserializationInfo.java} | 8 +-
...Info.java => InLongMsgDeserializationInfo.java} | 6 +-
...fo.java => InLongMsgKvDeserializationInfo.java} | 6 +-
...va => InLongMsgTlogCsvDeserializationInfo.java} | 6 +-
...ava => InLongMsgTlogKvDeserializationInfo.java} | 6 +-
.../sort/protocol/DeserializationInfoTest.java | 4 +-
.../sort/protocol/source/PulsarSourceInfoTest.java | 6 +-
inlong-sort/sort-core/pom.xml | 4 +-
...rd.java => InLongMsgMixedSerializedRecord.java} | 8 +-
.../deserialization/DeserializationSchema.java | 19 +-
...eserializer.java => InLongMsgDeserializer.java} | 8 +-
...alizer.java => InLongMsgMixedDeserializer.java} | 35 +-
.../deserialization/MultiTenancyDeserializer.java | 21 +-
...=> MultiTenancyInLongMsgMixedDeserializer.java} | 83 ++--
.../flink/tubemq/MultiTenancyTubeConsumer.java | 4 +-
.../flink/tubemq/TubeSubscriptionDescription.java | 8 +-
.../org/apache/inlong/sort/util/CommonUtils.java | 6 +-
...st.java => InLongMsgMixedDeserializerTest.java} | 38 +-
...ultiTenancyInLongMsgMixedDeserializerTest.java} | 36 +-
.../tubemq/MultiTopicTubeSourceFunctionTest.java | 4 +-
.../tubemq/TubeSubscriptionDescriptionTest.java | 8 +-
.../pom.xml | 4 +-
.../AbstractInLongMsgFormatDeserializer.java} | 38 +-
.../AbstractInLongMsgMixedFormatDeserializer.java} | 10 +-
.../sort/formats/inlongmsg/InLongMsgBody.java} | 16 +-
.../sort/formats/inlongmsg/InLongMsgHead.java} | 15 +-
.../inlongmsg/InLongMsgMixedFormatConverter.java} | 6 +-
.../inlongmsg/InLongMsgMixedFormatFactory.java} | 12 +-
.../inlongmsg/InLongMsgMixedValidator.java} | 6 +-
.../sort/formats/inlongmsg/InLongMsgUtils.java} | 38 +-
.../formats/inlongmsg/InLongMsgValidator.java} | 6 +-
.../pom.xml | 6 +-
.../sort/formats/inlongmsgcsv/InLongMsgCsv.java} | 36 +-
.../InLongMsgCsvFormatDeserializer.java} | 40 +-
.../inlongmsgcsv/InLongMsgCsvFormatFactory.java} | 56 +--
.../InLongMsgCsvMixedFormatConverter.java} | 28 +-
.../InLongMsgCsvMixedFormatDeserializer.java} | 36 +-
.../formats/inlongmsgcsv/InLongMsgCsvUtils.java} | 54 +--
.../org.apache.flink.table.factories.TableFactory | 2 +-
.../InLongMsgCsvFormatDeserializerTest.java} | 152 +++---
.../InLongMsgCsvFormatFactoryTest.java} | 24 +-
.../formats/inlongmsgcsv/InLongMsgCsvTest.java} | 18 +-
.../src/test/resources/log4j-test.properties | 0
inlong-sort/sort-formats/pom.xml | 4 +-
.../example/consumer/test_consumer.cc | 295 ++++++------
.../example/consumer/test_multi_thread_filter.cc | 533 +++++++++++----------
.../example/consumer/test_multithread_pull.cc | 355 +++++++-------
licenses/inlong-dataproxy/LICENSE-binary | 2 +-
.../LICENSE-google-protobuf-java-format.txt | 54 +--
licenses/inlong-tubemq/LICENSE-binary | 2 +-
.../LICENSE-google-protobuf-java-format.txt | 54 +--
63 files changed, 1256 insertions(+), 1253 deletions(-)
diff --git a/LICENSE b/LICENSE
index 0bcceda..93c347b 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,4 +1,4 @@
-
+
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
diff --git a/docker/kubernetes/templates/NOTES.txt b/docker/kubernetes/templates/NOTES.txt
index 7633784..37b4f95 100644
--- a/docker/kubernetes/templates/NOTES.txt
+++ b/docker/kubernetes/templates/NOTES.txt
@@ -1,39 +1,39 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-1. Get the application URL by running these commands:
-{{/*{{- if .Values.ingress.enabled }}*/}}
-{{/*{{- range $host := .Values.ingress.hosts }}*/}}
-{{/* {{- range .paths }}*/}}
-{{/* http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ .path }}*/}}
-{{/* {{- end }}*/}}
-{{/*{{- end }}*/}}
-{{/*{{- else if contains "NodePort" .Values.service.type }}*/}}
-{{/* export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "inlong.fullname" . }})*/}}
-{{/* export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}")*/}}
-{{/* echo http://$NODE_IP:$NODE_PORT*/}}
-{{/*{{- else if contains "LoadBalancer" .Values.service.type }}*/}}
-{{/* NOTE: It may take a few minutes for the LoadBalancer IP to be available.*/}}
-{{/* You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "inlong.fullname" . }}'*/}}
-{{/* export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "inlong.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}")*/}}
-{{/* echo http://$SERVICE_IP:{{ .Values.service.port }}*/}}
-{{/*{{- else if contains "ClusterIP" .Values.service.type }}*/}}
-{{/* export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "inlong.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")*/}}
-{{/* export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")*/}}
-{{/* echo "Visit http://127.0.0.1:8080 to use your application"*/}}
-{{/* kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT*/}}
-{{/*{{- end }}*/}}
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+1. Get the application URL by running these commands:
+{{/*{{- if .Values.ingress.enabled }}*/}}
+{{/*{{- range $host := .Values.ingress.hosts }}*/}}
+{{/* {{- range .paths }}*/}}
+{{/* http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ .path }}*/}}
+{{/* {{- end }}*/}}
+{{/*{{- end }}*/}}
+{{/*{{- else if contains "NodePort" .Values.service.type }}*/}}
+{{/* export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "inlong.fullname" . }})*/}}
+{{/* export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}")*/}}
+{{/* echo http://$NODE_IP:$NODE_PORT*/}}
+{{/*{{- else if contains "LoadBalancer" .Values.service.type }}*/}}
+{{/* NOTE: It may take a few minutes for the LoadBalancer IP to be available.*/}}
+{{/* You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "inlong.fullname" . }}'*/}}
+{{/* export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "inlong.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}")*/}}
+{{/* echo http://$SERVICE_IP:{{ .Values.service.port }}*/}}
+{{/*{{- else if contains "ClusterIP" .Values.service.type }}*/}}
+{{/* export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "inlong.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")*/}}
+{{/* export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")*/}}
+{{/* echo "Visit http://127.0.0.1:8080 to use your application"*/}}
+{{/* kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT*/}}
+{{/*{{- end }}*/}}
diff --git a/inlong-audit/audit-store/pom.xml b/inlong-audit/audit-store/pom.xml
index 3ae6690..965a0c4 100644
--- a/inlong-audit/audit-store/pom.xml
+++ b/inlong-audit/audit-store/pom.xml
@@ -32,10 +32,6 @@
<dependencies>
<dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>audit-common</artifactId>
<version>${project.version}</version>
@@ -291,6 +287,7 @@
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
+ <version>${spring.plugin.version}</version>
<executions>
<execution>
<id>repackage</id>
@@ -324,7 +321,7 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
- <version>2.3.3.RELEASE</version>
+ <version>${spring.plugin.version}</version>
</dependency>
</dependencies>
<executions>
@@ -371,10 +368,10 @@
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
+ <version>${spring.plugin.version}</version>
<configuration>
<layout>ZIP</layout>
<includes>
- <!-- 打包时,本jar包不包含其他依赖包 , 否则打出的jar包还是很大 -->
<include>
<groupId>nothing</groupId>
<artifactId>nothing</artifactId>
diff --git a/inlong-audit/pom.xml b/inlong-audit/pom.xml
index 7c5e6e5..52bb50e 100644
--- a/inlong-audit/pom.xml
+++ b/inlong-audit/pom.xml
@@ -63,16 +63,12 @@
<gson.version>2.8.6</gson.version>
<jackson.version>2.12.3</jackson.version>
<junit.version>4.12</junit.version>
- <autoconfigure.version>2.4.3</autoconfigure.version>
+ <autoconfigure.version>2.4.3</autoconfigure.version>
+ <spring.plugin.version>2.3.3.RELEASE</spring.plugin.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client</artifactId>
- <version>${pulsar.version}</version>
- </dependency>
- <dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf-version}</version>
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java b/inlong-common/src/main/java/org/apache/inlong/commons/msg/InLongMsg.java
similarity index 96%
rename from inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java
rename to inlong-common/src/main/java/org/apache/inlong/commons/msg/InLongMsg.java
index be871ed..2dcb62b 100644
--- a/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/msg/InLongMsg.java
@@ -30,7 +30,7 @@ import java.util.Set;
import org.xerial.snappy.Snappy;
-public class TDMsg1 {
+public class InLongMsg {
private static final int DEFAULT_CAPACITY = 4096;
private final int capacity;
@@ -143,28 +143,28 @@ public class TDMsg1 {
*
* @return
*/
- public static TDMsg1 newTDMsg() {
- return newTDMsg(true);
+ public static InLongMsg newInLongMsg() {
+ return newInLongMsg(true);
}
/**
* capacity: 4096, version: 1
*
* @param compress if copress
- * @return TDMsg1
+ * @return InLongMsg
*/
- public static TDMsg1 newTDMsg(boolean compress) {
- return newTDMsg(DEFAULT_CAPACITY, compress);
+ public static InLongMsg newInLongMsg(boolean compress) {
+ return newInLongMsg(DEFAULT_CAPACITY, compress);
}
/**
* capacity: 4096, compress: true
*
* @param v version info
- * @return TDMsg1
+ * @return InLongMsg
*/
- public static TDMsg1 newTDMsg(int v) {
- return newTDMsg(DEFAULT_CAPACITY, true, v);
+ public static InLongMsg newInLongMsg(int v) {
+ return newInLongMsg(DEFAULT_CAPACITY, true, v);
}
/**
@@ -172,10 +172,10 @@ public class TDMsg1 {
*
* @param compress if compress
* @param v version
- * @return TDMsg1
+ * @return InLongMsg
*/
- public static TDMsg1 newTDMsg(boolean compress, int v) {
- return newTDMsg(DEFAULT_CAPACITY, compress, v);
+ public static InLongMsg newInLongMsg(boolean compress, int v) {
+ return newInLongMsg(DEFAULT_CAPACITY, compress, v);
}
/**
@@ -183,25 +183,25 @@ public class TDMsg1 {
*
* @param capacity data capacity
* @param compress if compress
- * @return TDMsg1
+ * @return InLongMsg
*/
- public static TDMsg1 newTDMsg(int capacity, boolean compress) {
- return new TDMsg1(capacity, compress, Version.v1);
+ public static InLongMsg newInLongMsg(int capacity, boolean compress) {
+ return new InLongMsg(capacity, compress, Version.v1);
}
/**
- * netTDmsg
+ * netInLongMsg
* @param capacity data capacity
* @param compress compress
* @param v version
- * @return TDMsg1
+ * @return InLongMsg
*/
- public static TDMsg1 newTDMsg(int capacity, boolean compress, int v) {
- return new TDMsg1(capacity, compress, Version.of(v));
+ public static InLongMsg newInLongMsg(int capacity, boolean compress, int v) {
+ return new InLongMsg(capacity, compress, Version.of(v));
}
// for create
- private TDMsg1(int capacity, boolean compress, Version v) {
+ private InLongMsg(int capacity, boolean compress, Version v) {
version = v;
addmode = true;
this.compress = compress;
@@ -616,7 +616,7 @@ public class TDMsg1 {
private ByteBuffer parsedBinInput;
// for parsed
- private TDMsg1(ByteBuffer buffer, Version magic) throws IOException {
+ private InLongMsg(ByteBuffer buffer, Version magic) throws IOException {
version = magic;
addmode = false;
capacity = 0;
@@ -914,18 +914,18 @@ public class TDMsg1 {
return Version.vn;
}
- public static TDMsg1 parseFrom(byte[] data) {
+ public static InLongMsg parseFrom(byte[] data) {
return parseFrom(ByteBuffer.wrap(data));
}
- public static TDMsg1 parseFrom(ByteBuffer buffer) {
+ public static InLongMsg parseFrom(ByteBuffer buffer) {
Version magic = getMagic(buffer);
if (magic == Version.vn) {
return null;
}
try {
- return new TDMsg1(buffer, magic);
+ return new InLongMsg(buffer, magic);
} catch (IOException e) {
return null;
}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsgAttrBuilder.java b/inlong-common/src/main/java/org/apache/inlong/commons/msg/InLongMsgAttrBuilder.java
similarity index 88%
rename from inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsgAttrBuilder.java
rename to inlong-common/src/main/java/org/apache/inlong/commons/msg/InLongMsgAttrBuilder.java
index 69c20f1..30c0471 100644
--- a/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsgAttrBuilder.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/msg/InLongMsgAttrBuilder.java
@@ -22,7 +22,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
-public class TDMsgAttrBuilder {
+public class InLongMsgAttrBuilder {
public enum PartitionUnit {
DAY("d"), HOUR("h"), HALFHOUR("n"),
@@ -334,93 +334,93 @@ public class TDMsgAttrBuilder {
SimpleDateFormat f1 =
new SimpleDateFormat("yyyyMMddHH");
- System.out.println(TDMsgAttrBuilder.getProtocolM0()
+ System.out.println(InLongMsgAttrBuilder.getProtocolM0()
.setId("interfaceid").setTimeType(TimeType.S)
.setTime(String.valueOf(System.currentTimeMillis() / 1000))
.buildAttr());
- System.out.println(TDMsgAttrBuilder.getProtocolM0()
+ System.out.println(InLongMsgAttrBuilder.getProtocolM0()
.setId("interfaceid").setTime(System.currentTimeMillis())
.setTimeType(TimeType.MS).buildAttr());
- System.out.println(TDMsgAttrBuilder.getProtocolM0()
+ System.out.println(InLongMsgAttrBuilder.getProtocolM0()
.setId("interfaceid")
.setTime(f1.format(new Date(System.currentTimeMillis())))
.buildAttr());
- System.out.println(TDMsgAttrBuilder.getProtocolM0()
+ System.out.println(InLongMsgAttrBuilder.getProtocolM0()
.setId("interfaceid")
.setTime(f.format(new Date(System.currentTimeMillis())))
.setTimeType(TimeType.STANDARD).buildAttr());
- System.out.println(TDMsgAttrBuilder.getProtocolM0()
+ System.out.println(InLongMsgAttrBuilder.getProtocolM0()
.setId("interfaceid")
.setTime(f1.format(new Date(System.currentTimeMillis())))
.setTimeType(TimeType.NORMAL).buildAttr());
- System.out.println(TDMsgAttrBuilder.getProtocolM0()
+ System.out.println(InLongMsgAttrBuilder.getProtocolM0()
.setId("interfaceid").setTimeType(TimeType.S)
.setTime(String.valueOf(System.currentTimeMillis() / 1000))
.setPartitionUnit(PartitionUnit.DAY).buildAttr());
- System.out.println(TDMsgAttrBuilder.getProtocolM0()
+ System.out.println(InLongMsgAttrBuilder.getProtocolM0()
.setId("interfaceid").setTimeType(TimeType.S)
.setTime(String.valueOf(System.currentTimeMillis() / 1000))
.setPartitionUnit(PartitionUnit.HOUR).buildAttr());
- System.out.println(TDMsgAttrBuilder.getProtocolM0()
+ System.out.println(InLongMsgAttrBuilder.getProtocolM0()
.setId("interfaceid").setTimeType(TimeType.S)
.setTime(String.valueOf(System.currentTimeMillis() / 1000))
.setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
System.out.println();
- System.out.print(TDMsgAttrBuilder.getProtocolM100().buildAttr());
+ System.out.print(InLongMsgAttrBuilder.getProtocolM100().buildAttr());
System.out.println("\t\t\t\t\t\t// ---- all the param is "
+ "default : s=\\t, idp=0, tp=1, tt=#ms, p=h ");
- System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+ System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
.buildAttr());
System.out.println("\t\t\t\t\t// ---- : idp=0, tp=1, tt=#ms, p=h ");
- System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+ System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
.setIdPos(0).buildAttr());
System.out.println("\t\t\t\t\t// ---- : tp=1, tt=#ms, p=h ");
- System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+ System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
.setTimePos(1).buildAttr());
System.out.println("\t\t\t\t\t// ---- : idp=0, tt=#ms, p=h ");
- System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+ System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
.setIdPos(0).setTimePos(1).buildAttr());
System.out.println("\t\t\t\t// ---- : tt=#ms, p=h ");
- System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+ System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
.setIdPos(0).setTimePos(1).setTimeType(TimeType.S).buildAttr());
System.out.println("\t\t\t// ---- : p=h ");
- System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+ System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
.setIdPos(0).setTimePos(1)
.setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
System.out.println("\t\t\t// ---- : tt=#s ");
- System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+ System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
.setIdPos(0).setTimePos(1).setTimeType(TimeType.MS)
.setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
System.out.println("\t\t\t// ---- : all ");
- System.out.print(TDMsgAttrBuilder.getProtocolM100()
+ System.out.print(InLongMsgAttrBuilder.getProtocolM100()
.setId("interfaceid").setSpliter(",").setIdPos(0).setTimePos(1)
.setTimeType(TimeType.MS)
.setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
System.out.println("\t// ---- : id is set so idp is ignored ");
- System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+ System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
.setIdPos(0).setTimePos(1).setTime(System.currentTimeMillis())
.setTimeType(TimeType.MS)
.setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
System.out.println("\t\t\t// ---- : t is set so tp is ignored ");
- System.out.print(TDMsgAttrBuilder.getProtocolM100()
+ System.out.print(InLongMsgAttrBuilder.getProtocolM100()
.setId("interfaceid").setSpliter(",").setIdPos(0).setTimePos(1)
.setTime(System.currentTimeMillis()).setTimeType(TimeType.MS)
.setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
System.out
.println("\t\t// ---- : id and t are all set so idpos and tp are all ignored ");
- System.out.print(TDMsgAttrBuilder.getProtocolM100()
+ System.out.print(InLongMsgAttrBuilder.getProtocolM100()
.setId("interfaceid").setSpliter(",").setIdPos(0).setTimePos(1)
.setTime(f1.format(new Date(System.currentTimeMillis())))
.setTimeType(TimeType.NORMAL)
@@ -432,21 +432,21 @@ public class TDMsgAttrBuilder {
// long time = System.currentTimeMillis();
// byte[] data0 = ("id," + time + ",other,data").getBytes();
- // String attr = TDMsgProtocolFactory.getProtololM100().setSpliter(",")
+ // String attr = InLongMsgProtocolFactory.getProtololM100().setSpliter(",")
// .setIdPos(0).setTimePos(1).setTimeType(TimeType.MS)
// .setPartitionUnit(PartitionUnit.QUARTER).buildAttr();
- // TDMsg1 tdmsg = TDMsg1.newTDMsg();
- // tdmsg.addMsg(attr, data0);
- // byte[] result = tdmsg.buildArray();
+ // InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ // inLongMsg.addMsg(attr, data0);
+ // byte[] result = inLongMsg.buildArray();
// System.out.println(new String(result));
- // attr = TDMsgProtocolFactory.getProtololM0().setSpliter(",")
+ // attr = InLongMsgProtocolFactory.getProtololM0().setSpliter(",")
// .setTime(System.currentTimeMillis()).setTimeType(TimeType.MS)
// .setPartitionUnit(PartitionUnit.QUARTER).buildAttr();
- // tdmsg = TDMsg1.newTDMsg();
- // tdmsg.addMsg(attr, data0);
- // result = tdmsg.buildArray();
+ // inLongMsg = InLongMsg.newInLongMsg();
+ // inLongMsg.addMsg(attr, data0);
+ // result = inLongMsg.buildArray();
// System.out.println(new String(result));
- System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+ System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
.setIdPos(0).setTimePos(1)
.setTimeType(TimeType.STANDARD)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index ba00758..c57a205 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -37,7 +37,7 @@ import org.apache.inlong.commons.monitor.CounterGroupExt;
import org.apache.inlong.commons.monitor.MonitorIndex;
import org.apache.inlong.commons.monitor.MonitorIndexExt;
import org.apache.inlong.commons.monitor.StatConstants;
-import org.apache.inlong.commons.msg.TDMsg1;
+import org.apache.inlong.commons.msg.InLongMsg;
import org.apache.inlong.commons.util.NetworkUtils;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
@@ -128,14 +128,14 @@ public class SimpleMessageHandler
msgCount = "1";
}
- TDMsg1 tdMsg = TDMsg1.newTDMsg(true);
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg(true);
String charset = (String) context.get(HttpSourceConstants.CHARSET);
if (charset == null || "".equals(charset)) {
charset = "UTF-8";
}
String body = (String) context.get(HttpSourceConstants.BODY);
try {
- tdMsg.addMsg(newAttrBuffer.toString(), body.getBytes(charset));
+ inLongMsg.addMsg(newAttrBuffer.toString(), body.getBytes(charset));
} catch (UnsupportedEncodingException e) {
throw new MessageProcessException(e);
}
@@ -147,9 +147,9 @@ public class SimpleMessageHandler
headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
headers.put(ConfigConstants.MSG_COUNTER_KEY, msgCount);
- byte[] data = tdMsg.buildArray();
+ byte[] data = inLongMsg.buildArray();
headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
- String pkgTime = dateFormator.get().format(tdMsg.getCreatetime());
+ String pkgTime = dateFormator.get().format(inLongMsg.getCreatetime());
headers.put(ConfigConstants.PKG_TIME_KEY, pkgTime);
Event event = EventBuilder.withBody(data, headers);
@@ -178,7 +178,7 @@ public class SimpleMessageHandler
monitorIndex
.addAndGet(new String(newbase), Integer.parseInt(msgCount), 1, data.length, 0);
}
- tdMsg.reset();
+ inLongMsg.reset();
long beginTime = System.currentTimeMillis();
try {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 44dab2c..362ee71 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -41,7 +41,7 @@ import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.apache.inlong.commons.monitor.MonitorIndex;
import org.apache.inlong.commons.monitor.MonitorIndexExt;
-import org.apache.inlong.commons.msg.TDMsg1;
+import org.apache.inlong.commons.msg.InLongMsg;
import org.apache.inlong.dataproxy.base.ProxyMessage;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
@@ -384,33 +384,33 @@ public class ServerMessageHandler extends SimpleChannelHandler {
Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
String strRemoteIP, MsgType msgType) throws MessageIDException {
- int tdMsgVer = 1;
+ int inLongMsgVer = 1;
if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
- tdMsgVer = 3;
+ inLongMsgVer = 3;
} else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
- tdMsgVer = 4;
+ inLongMsgVer = 4;
}
for (Map.Entry<String, HashMap<String, List<ProxyMessage>>> topicEntry : messageMap.entrySet()) {
for (Map.Entry<String, List<ProxyMessage>> streamIdEntry : topicEntry.getValue().entrySet()) {
- TDMsg1 tdMsg = TDMsg1.newTDMsg(this.isCompressed, tdMsgVer);
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg(this.isCompressed, inLongMsgVer);
Map<String, String> headers = new HashMap<String, String>();
for (ProxyMessage message : streamIdEntry.getValue()) {
if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
message.getAttributeMap().put(AttributeConstants.MESSAGE_COUNT, String.valueOf(1));
- tdMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
+ inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
} else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
- tdMsg.addMsg(message.getData());
+ inLongMsg.addMsg(message.getData());
} else {
- tdMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
+ inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
}
}
- long pkgTimeInMillis = tdMsg.getCreatetime();
+ long pkgTimeInMillis = inLongMsg.getCreatetime();
String pkgTimeStr = dateFormator.get().format(pkgTimeInMillis);
- if (tdMsgVer == 4) {
+ if (inLongMsgVer == 4) {
if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) {
pkgTimeStr = commonAttrMap.get(ConfigConstants.PKG_TIME_KEY);
} else {
@@ -432,7 +432,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
String proxyMetricMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
headers.put(ConfigConstants.MSG_COUNTER_KEY, proxyMetricMsgCnt);
- byte[] data = tdMsg.buildArray();
+ byte[] data = inLongMsg.buildArray();
headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
String sequenceId = commonAttrMap.get(AttributeConstants.SEQUENCE_ID);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index 2159b41..f00f19f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -40,7 +40,7 @@ import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
-import org.apache.inlong.commons.msg.TDMsg1;
+import org.apache.inlong.commons.msg.InLongMsg;
import org.apache.inlong.dataproxy.base.ProxyMessage;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
@@ -375,33 +375,33 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
String strRemoteIP, MsgType msgType) throws MessageIDException {
- int tdMsgVer = 1;
+ int inLongMsgVer = 1;
if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
- tdMsgVer = 3;
+ inLongMsgVer = 3;
} else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
- tdMsgVer = 4;
+ inLongMsgVer = 4;
}
for (Map.Entry<String, HashMap<String, List<ProxyMessage>>> topicEntry : messageMap.entrySet()) {
for (Map.Entry<String, List<ProxyMessage>> streamIdEntry : topicEntry.getValue().entrySet()) {
- TDMsg1 tdMsg = TDMsg1.newTDMsg(this.isCompressed, tdMsgVer);
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg(this.isCompressed, inLongMsgVer);
Map<String, String> headers = new HashMap<String, String>();
for (ProxyMessage message : streamIdEntry.getValue()) {
if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
message.getAttributeMap().put(AttributeConstants.MESSAGE_COUNT, String.valueOf(1));
- tdMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
+ inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
} else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
- tdMsg.addMsg(message.getData());
+ inLongMsg.addMsg(message.getData());
} else {
- tdMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
+ inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
}
}
- long pkgTimeInMillis = tdMsg.getCreatetime();
+ long pkgTimeInMillis = inLongMsg.getCreatetime();
String pkgTimeStr = dateFormator.get().format(pkgTimeInMillis);
- if (tdMsgVer == 4) {
+ if (inLongMsgVer == 4) {
if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) {
pkgTimeStr = commonAttrMap.get(ConfigConstants.PKG_TIME_KEY);
} else {
@@ -421,7 +421,7 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
String proxyMetricMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
headers.put(ConfigConstants.MSG_COUNTER_KEY, proxyMetricMsgCnt);
- byte[] data = tdMsg.buildArray();
+ byte[] data = inLongMsg.buildArray();
headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
String sequenceId = commonAttrMap.get(AttributeConstants.SEQUENCE_ID);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
index d5f0ca1..1263bc0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
@@ -51,7 +51,7 @@ import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.protocol.DataFlowInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveFileFormat;
import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveTimePartitionInfo;
@@ -226,7 +226,7 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
DeserializationInfo deserializationInfo = null;
boolean isDbType = BizConstant.DATA_SOURCE_DB.equals(storageInfo.getDataSourceType());
if (!isDbType) {
- // FILE and auto push source, the data format is TEXT or KEY-VALUE, temporarily use TDMsgCsv
+ // FILE and auto push source, the data format is TEXT or KEY-VALUE, temporarily use InLongMsgCsv
String dataType = storageInfo.getDataType();
if (BizConstant.DATA_TYPE_TEXT.equalsIgnoreCase(dataType)
|| BizConstant.DATA_TYPE_KEY_VALUE.equalsIgnoreCase(dataType)) {
@@ -238,7 +238,7 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
escape = info.getDataEscapeChar().charAt(0);
}*/
// Whether to delete the first separator, the default is false for the time being
- deserializationInfo = new TDMsgCsvDeserializationInfo(storageInfo.getInlongStreamId(), separator);
+ deserializationInfo = new InLongMsgCsvDeserializationInfo(storageInfo.getInlongStreamId(), separator);
}
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
index f911311..844f506 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
@@ -32,11 +32,11 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
property = "type")
@JsonSubTypes({
@Type(value = CsvDeserializationInfo.class, name = "csv"),
- @Type(value = TDMsgCsvDeserializationInfo.class, name = "tdmsg_csv"),
- @Type(value = TDMsgCsv2DeserializationInfo.class, name = "tdmsg_csv2"),
- @Type(value = TDMsgKvDeserializationInfo.class, name = "tdmsg_kv"),
- @Type(value = TDMsgTlogCsvDeserializationInfo.class, name = "tdmsg_tlog_csv"),
- @Type(value = TDMsgTlogKvDeserializationInfo.class, name = "tdmsg_tlog_kv")
+ @Type(value = InLongMsgCsvDeserializationInfo.class, name = "inlongmsg_csv"),
+ @Type(value = InLongMsgCsv2DeserializationInfo.class, name = "inlongmsg_csv2"),
+ @Type(value = InLongMsgKvDeserializationInfo.class, name = "inlongmsg_kv"),
+ @Type(value = InLongMsgTlogCsvDeserializationInfo.class, name = "inlongmsg_tlog_csv"),
+ @Type(value = InLongMsgTlogKvDeserializationInfo.class, name = "inlongmsg_tlog_kv")
})
public interface DeserializationInfo extends Serializable {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgCsv2DeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
similarity index 88%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgCsv2DeserializationInfo.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
index 4039ee8..0806717 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgCsv2DeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
@@ -21,16 +21,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
/**
- * It represents CSV2 format of TDMsg(m=9).
+ * It represents CSV2 format of InLongMsg(m=9).
*/
-public class TDMsgCsv2DeserializationInfo extends TDMsgDeserializationInfo {
+public class InLongMsgCsv2DeserializationInfo extends InLongMsgDeserializationInfo {
private static final long serialVersionUID = 2188769102604850019L;
private final char delimiter;
@JsonCreator
- public TDMsgCsv2DeserializationInfo(
+ public InLongMsgCsv2DeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("delimiter") char delimiter) {
super(tid);
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgCsvDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
similarity index 90%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgCsvDeserializationInfo.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
index 322a8d0..890cea3 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgCsvDeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
@@ -23,9 +23,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInc
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
/**
- * It represents CSV format of TDMsg(m=0).
+ * It represents CSV format of InLongMsg(m=0).
*/
-public class TDMsgCsvDeserializationInfo extends TDMsgDeserializationInfo {
+public class InLongMsgCsvDeserializationInfo extends InLongMsgDeserializationInfo {
private static final long serialVersionUID = 1499370571949888870L;
@@ -34,14 +34,14 @@ public class TDMsgCsvDeserializationInfo extends TDMsgDeserializationInfo {
@JsonInclude(Include.NON_NULL)
private final boolean deleteHeadDelimiter;
- public TDMsgCsvDeserializationInfo(
+ public InLongMsgCsvDeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("delimiter") char delimiter) {
this(tid, delimiter, true);
}
@JsonCreator
- public TDMsgCsvDeserializationInfo(
+ public InLongMsgCsvDeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("delete_head_delimiter") boolean deleteHeadDelimiter) {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
similarity index 86%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgDeserializationInfo.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
index 1b36161..e311beb 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgDeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
@@ -22,15 +22,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
/**
- * TDMsgDeserializationInfo.
+ * InLongMsgDeserializationInfo.
*/
-public abstract class TDMsgDeserializationInfo implements DeserializationInfo {
+public abstract class InLongMsgDeserializationInfo implements DeserializationInfo {
private static final long serialVersionUID = 3707412713264864315L;
private final String tid;
- public TDMsgDeserializationInfo(@JsonProperty("tid") String tid) {
+ public InLongMsgDeserializationInfo(@JsonProperty("tid") String tid) {
this.tid = checkNotNull(tid);
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgKvDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
similarity index 90%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgKvDeserializationInfo.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
index 068b25e..c61825d 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgKvDeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
@@ -20,9 +20,9 @@ package org.apache.inlong.sort.protocol.deserialization;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
/**
- * It represents KV format of TDMsg(m=5).
+ * It represents KV format of InLongMsg(m=5).
*/
-public class TDMsgKvDeserializationInfo extends TDMsgDeserializationInfo {
+public class InLongMsgKvDeserializationInfo extends InLongMsgDeserializationInfo {
private static final long serialVersionUID = 8431516458466278968L;
@@ -30,7 +30,7 @@ public class TDMsgKvDeserializationInfo extends TDMsgDeserializationInfo {
private final char kvDelimiter;
- public TDMsgKvDeserializationInfo(
+ public InLongMsgKvDeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgTlogCsvDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
similarity index 88%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgTlogCsvDeserializationInfo.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
index 34822a8..53426ff 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgTlogCsvDeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
@@ -21,16 +21,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
/**
- * It represents TLog CSV format of TDMsg(m=10).
+ * It represents TLog CSV format of InLongMsg(m=10).
*/
-public class TDMsgTlogCsvDeserializationInfo extends TDMsgDeserializationInfo {
+public class InLongMsgTlogCsvDeserializationInfo extends InLongMsgDeserializationInfo {
private static final long serialVersionUID = -6585242216925992303L;
private final char delimiter;
@JsonCreator
- public TDMsgTlogCsvDeserializationInfo(
+ public InLongMsgTlogCsvDeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("delimiter") char delimiter) {
super(tid);
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgTlogKvDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
similarity index 90%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgTlogKvDeserializationInfo.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
index fb28f8e..ace1038 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgTlogKvDeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
@@ -20,9 +20,9 @@ package org.apache.inlong.sort.protocol.deserialization;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
/**
- * It represents TLog KV format of TDMsg(m=15).
+ * It represents TLog KV format of InLongMsg(m=15).
*/
-public class TDMsgTlogKvDeserializationInfo extends TDMsgDeserializationInfo {
+public class InLongMsgTlogKvDeserializationInfo extends InLongMsgDeserializationInfo {
private static final long serialVersionUID = 3299931901024581425L;
@@ -32,7 +32,7 @@ public class TDMsgTlogKvDeserializationInfo extends TDMsgDeserializationInfo {
private final char kvDelimiter;
- public TDMsgTlogKvDeserializationInfo(
+ public InLongMsgTlogKvDeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("entry_delimiter") char entryDelimiter,
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/DeserializationInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/DeserializationInfoTest.java
index 5959752..e903e9b 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/DeserializationInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/DeserializationInfoTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessin
import org.apache.inlong.sort.protocol.deserialization.CsvDeserializationInfo;
import java.io.IOException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo;
import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo.PartitionStrategy;
import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
@@ -44,7 +44,7 @@ public class DeserializationInfoTest {
DataFlowInfo dataFlowInfo = new DataFlowInfo(
1,
new TubeSourceInfo("topic" + System.currentTimeMillis(), "ma", "cg",
- new TDMsgCsvDeserializationInfo("tid", ','), new FieldInfo[0]),
+ new InLongMsgCsvDeserializationInfo("tid", ','), new FieldInfo[0]),
new ClickHouseSinkInfo("url", "dn", "tn", "un", "pw",
false, PartitionStrategy.HASH, "pk", new FieldInfo[0], new String[0],
100, 100, 100));
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/source/PulsarSourceInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/source/PulsarSourceInfoTest.java
index f5ffd67..1ebd6f2 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/source/PulsarSourceInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/source/PulsarSourceInfoTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
import org.junit.Assert;
import org.junit.Test;
@@ -35,7 +35,7 @@ public class PulsarSourceInfoTest {
new FieldInfo("f1", StringFormatInfo.INSTANCE),
new FieldInfo("f2", StringFormatInfo.INSTANCE)
};
- DeserializationInfo deserializationInfo = new TDMsgCsvDeserializationInfo("stream", ',');
+ DeserializationInfo deserializationInfo = new InLongMsgCsvDeserializationInfo("stream", ',');
PulsarSourceInfo pulsarSourceInfo = new PulsarSourceInfo(
"http://127.0.0.1:8080",
@@ -54,7 +54,7 @@ public class PulsarSourceInfoTest {
Assert.assertTrue(pulsarSourceInfo.getServiceUrl().equals("pulsar://127.0.0.1:6650"));
Assert.assertTrue(pulsarSourceInfo.getTopic().equals("business"));
Assert.assertTrue(pulsarSourceInfo.getSubscriptionName().equals("consumer"));
- Assert.assertTrue(pulsarSourceInfo.getDeserializationInfo() instanceof TDMsgCsvDeserializationInfo);
+ Assert.assertTrue(pulsarSourceInfo.getDeserializationInfo() instanceof InLongMsgCsvDeserializationInfo);
Assert.assertTrue(pulsarSourceInfo.getAuthentication() == null);
} catch (JsonProcessingException e) {
Assert.fail();
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index d5f4b38..f958a5c 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -128,13 +128,13 @@
<dependency>
<groupId>org.apache.inlong</groupId>
- <artifactId>sort-format-tdmsg-base</artifactId>
+ <artifactId>sort-format-inlongmsg-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
- <artifactId>sort-format-tdmsg-csv</artifactId>
+ <artifactId>sort-format-inlongmsg-csv</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgMixedSerializedRecord.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/InLongMsgMixedSerializedRecord.java
similarity index 82%
rename from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgMixedSerializedRecord.java
rename to inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/InLongMsgMixedSerializedRecord.java
index 52164dc..362f71a 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgMixedSerializedRecord.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/InLongMsgMixedSerializedRecord.java
@@ -20,9 +20,9 @@ package org.apache.inlong.sort.flink;
import static org.apache.inlong.sort.configuration.Constants.UNKNOWN_DATAFLOW_ID;
/**
- * Data flow id might not been got from mixed TDMsg data stream.
+ * Data flow id might not been got from mixed InLongMsg data stream.
*/
-public class TDMsgMixedSerializedRecord extends SerializedRecord {
+public class InLongMsgMixedSerializedRecord extends SerializedRecord {
private static final long serialVersionUID = 4075321919886376829L;
@@ -31,11 +31,11 @@ public class TDMsgMixedSerializedRecord extends SerializedRecord {
/**
* Just satisfy requirement of Flink Pojo definition.
*/
- public TDMsgMixedSerializedRecord() {
+ public InLongMsgMixedSerializedRecord() {
super();
}
- public TDMsgMixedSerializedRecord(String topic, long timestampMillis, byte[] data) {
+ public InLongMsgMixedSerializedRecord(String topic, long timestampMillis, byte[] data) {
super(UNKNOWN_DATAFLOW_ID, timestampMillis, data);
this.topic = topic;
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
index 5aa92f1..b0853e2 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
@@ -30,7 +30,7 @@ import org.apache.inlong.sort.configuration.Configuration;
import org.apache.inlong.sort.configuration.Constants;
import org.apache.inlong.sort.flink.Record;
import org.apache.inlong.sort.flink.SerializedRecord;
-import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
+import org.apache.inlong.sort.flink.InLongMsgMixedSerializedRecord;
import org.apache.inlong.sort.flink.metrics.MetricData;
import org.apache.inlong.sort.flink.metrics.MetricData.MetricSource;
import org.apache.inlong.sort.flink.metrics.MetricData.MetricType;
@@ -61,7 +61,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
private transient Object schemaLock;
- private transient MultiTenancyTDMsgMixedDeserializer multiTenancyTdMsgMixedDeserializer;
+ private transient MultiTenancyInLongMsgMixedDeserializer multiTenancyInLongMsgMixedDeserializer;
private transient MultiTenancyDeserializer multiTenancyDeserializer;
@@ -81,7 +81,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
schemaLock = new Object();
- multiTenancyTdMsgMixedDeserializer = new MultiTenancyTDMsgMixedDeserializer();
+ multiTenancyInLongMsgMixedDeserializer = new MultiTenancyInLongMsgMixedDeserializer();
multiTenancyDeserializer = new MultiTenancyDeserializer();
fieldMappingTransformer = new FieldMappingTransformer();
recordTransformer = new RecordTransformer(config.getInteger(Constants.ETL_RECORD_SERIALIZATION_BUFFER_SIZE));
@@ -164,10 +164,11 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
collector.collect(serializedSinkRecord);
});
- if (serializedRecord instanceof TDMsgMixedSerializedRecord) {
- final TDMsgMixedSerializedRecord tdmsgRecord = (TDMsgMixedSerializedRecord) serializedRecord;
+ if (serializedRecord instanceof InLongMsgMixedSerializedRecord) {
+ final InLongMsgMixedSerializedRecord
+ inlongmsgRecord = (InLongMsgMixedSerializedRecord) serializedRecord;
synchronized (schemaLock) {
- multiTenancyTdMsgMixedDeserializer.deserialize(tdmsgRecord, transformCollector);
+ multiTenancyInLongMsgMixedDeserializer.deserialize(inlongmsgRecord, transformCollector);
}
} else {
synchronized (schemaLock) {
@@ -196,7 +197,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
@Override
public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
synchronized (schemaLock) {
- multiTenancyTdMsgMixedDeserializer.addDataFlow(dataFlowInfo);
+ multiTenancyInLongMsgMixedDeserializer.addDataFlow(dataFlowInfo);
multiTenancyDeserializer.addDataFlow(dataFlowInfo);
fieldMappingTransformer.addDataFlow(dataFlowInfo);
recordTransformer.addDataFlow(dataFlowInfo);
@@ -210,7 +211,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
@Override
public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
synchronized (schemaLock) {
- multiTenancyTdMsgMixedDeserializer.updateDataFlow(dataFlowInfo);
+ multiTenancyInLongMsgMixedDeserializer.updateDataFlow(dataFlowInfo);
multiTenancyDeserializer.updateDataFlow(dataFlowInfo);
fieldMappingTransformer.updateDataFlow(dataFlowInfo);
recordTransformer.updateDataFlow(dataFlowInfo);
@@ -224,7 +225,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
@Override
public void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
synchronized (schemaLock) {
- multiTenancyTdMsgMixedDeserializer.removeDataFlow(dataFlowInfo);
+ multiTenancyInLongMsgMixedDeserializer.removeDataFlow(dataFlowInfo);
multiTenancyDeserializer.removeDataFlow(dataFlowInfo);
fieldMappingTransformer.removeDataFlow(dataFlowInfo);
recordTransformer.removeDataFlow(dataFlowInfo);
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/InLongMsgDeserializer.java
similarity index 81%
rename from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgDeserializer.java
rename to inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/InLongMsgDeserializer.java
index bda542e..9aea0f7 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgDeserializer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/InLongMsgDeserializer.java
@@ -20,12 +20,12 @@ package org.apache.inlong.sort.flink.deserialization;
import org.apache.flink.util.Collector;
import org.apache.inlong.sort.flink.Record;
import org.apache.inlong.sort.flink.SerializedRecord;
-import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
-public class TDMsgDeserializer implements Deserializer<SerializedRecord, Record> {
- private final AbstractTDMsgFormatDeserializer innerDeserializer;
+public class InLongMsgDeserializer implements Deserializer<SerializedRecord, Record> {
+ private final AbstractInLongMsgFormatDeserializer innerDeserializer;
- public TDMsgDeserializer(AbstractTDMsgFormatDeserializer innerDeserializer) {
+ public InLongMsgDeserializer(AbstractInLongMsgFormatDeserializer innerDeserializer) {
this.innerDeserializer = innerDeserializer;
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/InLongMsgMixedDeserializer.java
similarity index 69%
rename from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java
rename to inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/InLongMsgMixedDeserializer.java
index d396cb7..47c8df2 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/InLongMsgMixedDeserializer.java
@@ -24,36 +24,36 @@ import java.util.Map;
import java.util.Set;
import org.apache.flink.util.Collector;
import org.apache.inlong.sort.flink.Record;
-import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
-import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgFormatDeserializer;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgMixedFormatConverter;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
+import org.apache.inlong.sort.flink.InLongMsgMixedSerializedRecord;
+import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverter;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
/**
- * A deserializer to handle mixed TDMsg records of one topic.
+ * A deserializer to handle mixed InLongMsg records of one topic.
*/
-public class TDMsgMixedDeserializer implements Deserializer<TDMsgMixedSerializedRecord, Record> {
+public class InLongMsgMixedDeserializer implements Deserializer<InLongMsgMixedSerializedRecord, Record> {
/**
* Each topic should have same preDeserializer, so just keep one.
*/
- private AbstractTDMsgFormatDeserializer preDeserializer;
+ private AbstractInLongMsgFormatDeserializer preDeserializer;
/**
* Tid -> deserializer.
*/
- private final Map<String, TDMsgMixedFormatConverter> deserializers = new HashMap<>();
+ private final Map<String, InLongMsgMixedFormatConverter> deserializers = new HashMap<>();
/**
* Tid -> data flow ids.
*/
private final Map<String, Set<Long>> interface2DataFlowsMap = new HashMap<>();
- public TDMsgMixedDeserializer() {
+ public InLongMsgMixedDeserializer() {
}
- public void updateDataFlow(long dataFlowId, String tid, AbstractTDMsgFormatDeserializer preDeserializer,
- TDMsgMixedFormatConverter deserializer) {
+ public void updateDataFlow(long dataFlowId, String tid, AbstractInLongMsgFormatDeserializer preDeserializer,
+ InLongMsgMixedFormatConverter deserializer) {
// always updates preDeserializer
this.preDeserializer = preDeserializer;
deserializers.put(tid, deserializer);
@@ -76,14 +76,15 @@ public class TDMsgMixedDeserializer implements Deserializer<TDMsgMixedSerialized
}
@Override
- public void deserialize(TDMsgMixedSerializedRecord tdMsgRecord, Collector<Record> collector) throws Exception {
- preDeserializer.flatMap(tdMsgRecord.getData(), new CallbackCollector<>(mixedRow -> {
- final String tid = TDMsgUtils.getTidFromMixedRow(mixedRow);
+ public void deserialize(InLongMsgMixedSerializedRecord inLongMsgRecord,
+ Collector<Record> collector) throws Exception {
+ preDeserializer.flatMap(inLongMsgRecord.getData(), new CallbackCollector<>(mixedRow -> {
+ final String tid = InLongMsgUtils.getTidFromMixedRow(mixedRow);
final Set<Long> dataFlowIds = interface2DataFlowsMap.get(tid);
if (dataFlowIds.isEmpty()) {
throw new Exception("No data flow found for tid:" + tid);
}
- final TDMsgMixedFormatConverter deserializer = deserializers.get(tid);
+ final InLongMsgMixedFormatConverter deserializer = deserializers.get(tid);
if (deserializer == null) {
throw new Exception("No data flow found for tid:" + tid);
}
@@ -97,12 +98,12 @@ public class TDMsgMixedDeserializer implements Deserializer<TDMsgMixedSerialized
}
@VisibleForTesting
- AbstractTDMsgFormatDeserializer getPreDeserializer() {
+ AbstractInLongMsgFormatDeserializer getPreDeserializer() {
return preDeserializer;
}
@VisibleForTesting
- Map<String, TDMsgMixedFormatConverter> getDeserializers() {
+ Map<String, InLongMsgMixedFormatConverter> getDeserializers() {
return deserializers;
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyDeserializer.java
index 3e75aa5..f8b8215 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyDeserializer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyDeserializer.java
@@ -17,8 +17,8 @@
package org.apache.inlong.sort.flink.deserialization;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
@@ -28,12 +28,12 @@ import org.apache.inlong.sort.flink.Record;
import org.apache.inlong.sort.flink.SerializedRecord;
import org.apache.inlong.sort.formats.base.TableFormatConstants;
import org.apache.inlong.sort.formats.common.RowFormatInfo;
-import org.apache.inlong.sort.formats.tdmsgcsv.TDMsgCsvFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvFormatDeserializer;
import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
import org.apache.inlong.sort.protocol.DataFlowInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
import org.apache.inlong.sort.util.CommonUtils;
public class MultiTenancyDeserializer implements DataFlowInfoListener, Deserializer<SerializedRecord, Record> {
@@ -79,20 +79,21 @@ public class MultiTenancyDeserializer implements DataFlowInfoListener, Deseriali
final RowFormatInfo rowFormatInfo = CommonUtils.generateRowFormatInfo(fields);
final Deserializer<SerializedRecord, Record> deserializer;
- if (deserializationInfo instanceof TDMsgCsvDeserializationInfo) {
- TDMsgCsvDeserializationInfo tdMsgCsvDeserializationInfo = (TDMsgCsvDeserializationInfo) deserializationInfo;
- TDMsgCsvFormatDeserializer tdMsgCsvFormatDeserializer = new TDMsgCsvFormatDeserializer(
+ if (deserializationInfo instanceof InLongMsgCsvDeserializationInfo) {
+ InLongMsgCsvDeserializationInfo
+ inLongMsgCsvDeserializationInfo = (InLongMsgCsvDeserializationInfo) deserializationInfo;
+ InLongMsgCsvFormatDeserializer inLongMsgCsvFormatDeserializer = new InLongMsgCsvFormatDeserializer(
rowFormatInfo,
DEFAULT_TIME_FIELD_NAME,
DEFAULT_ATTRIBUTES_FIELD_NAME,
TableFormatConstants.DEFAULT_CHARSET,
- tdMsgCsvDeserializationInfo.getDelimiter(),
+ inLongMsgCsvDeserializationInfo.getDelimiter(),
null,
null,
null,
- tdMsgCsvDeserializationInfo.isDeleteHeadDelimiter(),
+ inLongMsgCsvDeserializationInfo.isDeleteHeadDelimiter(),
TableFormatConstants.DEFAULT_IGNORE_ERRORS);
- deserializer = new TDMsgDeserializer(tdMsgCsvFormatDeserializer);
+ deserializer = new InLongMsgDeserializer(inLongMsgCsvFormatDeserializer);
} else {
// TODO, support more formats here
throw new UnsupportedOperationException(
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyInLongMsgMixedDeserializer.java
similarity index 54%
rename from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializer.java
rename to inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyInLongMsgMixedDeserializer.java
index 080c4f7..c572a7d 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyInLongMsgMixedDeserializer.java
@@ -17,8 +17,8 @@
package org.apache.inlong.sort.flink.deserialization;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
import com.google.common.annotations.VisibleForTesting;
import java.nio.charset.StandardCharsets;
@@ -27,32 +27,32 @@ import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.util.Collector;
import org.apache.inlong.sort.flink.Record;
-import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
+import org.apache.inlong.sort.flink.InLongMsgMixedSerializedRecord;
import org.apache.inlong.sort.formats.common.RowFormatInfo;
-import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgMixedFormatDeserializer;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgMixedFormatConverter;
-import org.apache.inlong.sort.formats.tdmsgcsv.TDMsgCsvMixedFormatConverter;
-import org.apache.inlong.sort.formats.tdmsgcsv.TDMsgCsvMixedFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverter;
+import org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvMixedFormatConverter;
+import org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvMixedFormatDeserializer;
import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
import org.apache.inlong.sort.protocol.DataFlowInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgDeserializationInfo;
import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
import org.apache.inlong.sort.util.CommonUtils;
/**
- * A deserializer to handle mixed TDMsg records.
+ * A deserializer to handle mixed InLongMsg records.
*/
-public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
- Deserializer<TDMsgMixedSerializedRecord, Record> {
+public class MultiTenancyInLongMsgMixedDeserializer implements DataFlowInfoListener,
+ Deserializer<InLongMsgMixedSerializedRecord, Record> {
/**
* Maps topic to mixed deserializer.
*/
- private final Map<String, TDMsgMixedDeserializer> mixedDeserializerMap = new HashMap<>();
+ private final Map<String, InLongMsgMixedDeserializer> mixedDeserializerMap = new HashMap<>();
@Override
public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
@@ -61,16 +61,17 @@ public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
@Override
public void updateDataFlow(DataFlowInfo dataFlowInfo) {
- if (!isTDMsgDataFlow(dataFlowInfo)) {
+ if (!isInLongMsgDataFlow(dataFlowInfo)) {
return;
}
- final TDMsgDeserializationInfo tdMsgDeserializationInfo =
- (TDMsgDeserializationInfo) dataFlowInfo.getSourceInfo().getDeserializationInfo();
+ final InLongMsgDeserializationInfo inLongMsgDeserializationInfo =
+ (InLongMsgDeserializationInfo) dataFlowInfo.getSourceInfo().getDeserializationInfo();
- Pair<AbstractTDMsgMixedFormatDeserializer, TDMsgMixedFormatConverter> allDeserializer = generateDeserializer(
- dataFlowInfo.getSourceInfo().getFields(), tdMsgDeserializationInfo);
- final AbstractTDMsgMixedFormatDeserializer preDeserializer = allDeserializer.getLeft();
- final TDMsgMixedFormatConverter deserializer = allDeserializer.getRight();
+ Pair<AbstractInLongMsgMixedFormatDeserializer, InLongMsgMixedFormatConverter>
+ allDeserializer = generateDeserializer(
+ dataFlowInfo.getSourceInfo().getFields(), inLongMsgDeserializationInfo);
+ final AbstractInLongMsgMixedFormatDeserializer preDeserializer = allDeserializer.getLeft();
+ final InLongMsgMixedFormatConverter deserializer = allDeserializer.getRight();
final String topic;
if (dataFlowInfo.getSourceInfo() instanceof TubeSourceInfo) {
@@ -81,23 +82,23 @@ public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
throw new UnsupportedOperationException("Unknown source type " + dataFlowInfo.getSourceInfo());
}
- final TDMsgMixedDeserializer mixedDeserializer = mixedDeserializerMap
- .computeIfAbsent(topic, key -> new TDMsgMixedDeserializer());
+ final InLongMsgMixedDeserializer mixedDeserializer = mixedDeserializerMap
+ .computeIfAbsent(topic, key -> new InLongMsgMixedDeserializer());
mixedDeserializer.updateDataFlow(
- dataFlowInfo.getId(), tdMsgDeserializationInfo.getTid(), preDeserializer, deserializer);
+ dataFlowInfo.getId(), inLongMsgDeserializationInfo.getTid(), preDeserializer, deserializer);
}
@Override
public void removeDataFlow(DataFlowInfo dataFlowInfo) {
- if (!isTDMsgDataFlow(dataFlowInfo)) {
+ if (!isInLongMsgDataFlow(dataFlowInfo)) {
return;
}
final TubeSourceInfo tubeSourceInfo = (TubeSourceInfo) dataFlowInfo.getSourceInfo();
- final TDMsgDeserializationInfo tdMsgDeserializationInfo = (TDMsgDeserializationInfo) tubeSourceInfo
+ final InLongMsgDeserializationInfo inLongMsgDeserializationInfo = (InLongMsgDeserializationInfo) tubeSourceInfo
.getDeserializationInfo();
- final TDMsgMixedDeserializer mixedDeserializer = mixedDeserializerMap.get(tubeSourceInfo.getTopic());
+ final InLongMsgMixedDeserializer mixedDeserializer = mixedDeserializerMap.get(tubeSourceInfo.getTopic());
if (mixedDeserializer != null) {
- mixedDeserializer.removeDataFlow(dataFlowInfo.getId(), tdMsgDeserializationInfo.getTid());
+ mixedDeserializer.removeDataFlow(dataFlowInfo.getId(), inLongMsgDeserializationInfo.getTid());
if (mixedDeserializer.isEmpty()) {
mixedDeserializerMap.remove(tubeSourceInfo.getTopic());
}
@@ -105,14 +106,14 @@ public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
}
@VisibleForTesting
- static boolean isTDMsgDataFlow(DataFlowInfo dataFlowInfo) {
+ static boolean isInLongMsgDataFlow(DataFlowInfo dataFlowInfo) {
final DeserializationInfo deserializationInfo = dataFlowInfo.getSourceInfo().getDeserializationInfo();
- return deserializationInfo instanceof TDMsgDeserializationInfo;
+ return deserializationInfo instanceof InLongMsgDeserializationInfo;
}
- public void deserialize(TDMsgMixedSerializedRecord record, Collector<Record> collector) throws Exception {
+ public void deserialize(InLongMsgMixedSerializedRecord record, Collector<Record> collector) throws Exception {
final String topic = record.getTopic();
- final TDMsgMixedDeserializer mixedDeserializer = mixedDeserializerMap.get(topic);
+ final InLongMsgMixedDeserializer mixedDeserializer = mixedDeserializerMap.get(topic);
if (mixedDeserializer == null) {
throw new Exception("No schema found for topic:" + topic);
}
@@ -120,26 +121,26 @@ public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
}
@VisibleForTesting
- Pair<AbstractTDMsgMixedFormatDeserializer, TDMsgMixedFormatConverter> generateDeserializer(
+ Pair<AbstractInLongMsgMixedFormatDeserializer, InLongMsgMixedFormatConverter> generateDeserializer(
FieldInfo[] fields,
- TDMsgDeserializationInfo tdMsgDeserializationInfo) {
+ InLongMsgDeserializationInfo inLongMsgDeserializationInfo) {
final RowFormatInfo rowFormatInfo =
CommonUtils.generateRowFormatInfo(fields);
- final AbstractTDMsgMixedFormatDeserializer preDeserializer;
- final TDMsgMixedFormatConverter deserializer;
- if (tdMsgDeserializationInfo instanceof TDMsgCsvDeserializationInfo) {
- final TDMsgCsvDeserializationInfo csvDeserializationInfo =
- (TDMsgCsvDeserializationInfo) tdMsgDeserializationInfo;
- preDeserializer = new TDMsgCsvMixedFormatDeserializer(
+ final AbstractInLongMsgMixedFormatDeserializer preDeserializer;
+ final InLongMsgMixedFormatConverter deserializer;
+ if (inLongMsgDeserializationInfo instanceof InLongMsgCsvDeserializationInfo) {
+ final InLongMsgCsvDeserializationInfo csvDeserializationInfo =
+ (InLongMsgCsvDeserializationInfo) inLongMsgDeserializationInfo;
+ preDeserializer = new InLongMsgCsvMixedFormatDeserializer(
StandardCharsets.UTF_8.name(),
csvDeserializationInfo.getDelimiter(),
null,
null,
csvDeserializationInfo.isDeleteHeadDelimiter(),
false);
- deserializer = new TDMsgCsvMixedFormatConverter(
+ deserializer = new InLongMsgCsvMixedFormatConverter(
rowFormatInfo,
DEFAULT_TIME_FIELD_NAME,
DEFAULT_ATTRIBUTES_FIELD_NAME,
@@ -147,7 +148,7 @@ public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
false);
} else {
throw new UnsupportedOperationException(
- "Not supported yet " + tdMsgDeserializationInfo.getClass().getSimpleName());
+ "Not supported yet " + inLongMsgDeserializationInfo.getClass().getSimpleName());
}
return Pair.of(preDeserializer, deserializer);
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
index 20cdd26..31956fa 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
@@ -39,7 +39,7 @@ import org.apache.flink.util.TimeUtils;
import org.apache.inlong.sort.configuration.Configuration;
import org.apache.inlong.sort.configuration.Constants;
import org.apache.inlong.sort.flink.SerializedRecord;
-import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
+import org.apache.inlong.sort.flink.InLongMsgMixedSerializedRecord;
import org.apache.inlong.sort.meta.MetaManager;
import org.apache.inlong.sort.util.CommonUtils;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
@@ -386,7 +386,7 @@ public class MultiTenancyTubeConsumer {
synchronized (context.getCheckpointLock()) {
for (Message message : consumeResult.getMessageList()) {
// TODO, optimize for single tid or no tid topic
- context.collect(new TDMsgMixedSerializedRecord(
+ context.collect(new InLongMsgMixedSerializedRecord(
topic, System.currentTimeMillis(), message.getData()));
}
final String partitionKey = consumeResult.getPartitionKey();
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/TubeSubscriptionDescription.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/TubeSubscriptionDescription.java
index df37077..8634ffc 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/TubeSubscriptionDescription.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/TubeSubscriptionDescription.java
@@ -20,7 +20,7 @@ package org.apache.inlong.sort.flink.tubemq;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgDeserializationInfo;
import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
import java.io.Serializable;
import java.util.HashMap;
@@ -47,7 +47,7 @@ public class TubeSubscriptionDescription implements Serializable {
private String consumerGroup;
/**
- * Each topic might include multiple data flow if it's packed by TDMsg.
+ * Each topic might include multiple data flow if it's packed by InLongMsg.
*/
private final Map<Long, TubeDataFlowDescription> dataFlows = new HashMap<>();
@@ -194,8 +194,8 @@ public class TubeSubscriptionDescription implements Serializable {
public static TubeDataFlowDescription generate(long dataFlowId, TubeSourceInfo tubeSourceInfo) {
String tid = null;
- if (tubeSourceInfo.getDeserializationInfo() instanceof TDMsgDeserializationInfo) {
- tid = ((TDMsgDeserializationInfo) tubeSourceInfo.getDeserializationInfo()).getTid();
+ if (tubeSourceInfo.getDeserializationInfo() instanceof InLongMsgDeserializationInfo) {
+ tid = ((InLongMsgDeserializationInfo) tubeSourceInfo.getDeserializationInfo()).getTid();
}
return new TubeDataFlowDescription(dataFlowId, tid);
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/util/CommonUtils.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/util/CommonUtils.java
index 2edb554..41e0954 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/util/CommonUtils.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/util/CommonUtils.java
@@ -32,7 +32,7 @@ import org.apache.inlong.sort.formats.common.RowFormatInfo;
import org.apache.inlong.sort.protocol.DataFlowInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgDeserializationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,8 +102,8 @@ public class CommonUtils {
// Get stream id
final DeserializationInfo deserializationInfo = dataFlowInfo.getSourceInfo().getDeserializationInfo();
- if (deserializationInfo instanceof TDMsgDeserializationInfo) {
- streamId = ((TDMsgDeserializationInfo) deserializationInfo).getTid();
+ if (deserializationInfo instanceof InLongMsgDeserializationInfo) {
+ streamId = ((InLongMsgDeserializationInfo) deserializationInfo).getTid();
}
}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializerTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/InLongMsgMixedDeserializerTest.java
similarity index 75%
rename from inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializerTest.java
rename to inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/InLongMsgMixedDeserializerTest.java
index 7ed9c9b..e573499 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializerTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/InLongMsgMixedDeserializerTest.java
@@ -27,25 +27,26 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.inlong.sort.flink.Record;
-import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
-import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgFormatDeserializer;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgBody;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgHead;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgMixedFormatConverter;
+import org.apache.inlong.sort.flink.InLongMsgMixedSerializedRecord;
+import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverter;
import org.apache.inlong.sort.util.TestLogger;
import org.apache.inlong.sort.util.TestingUtils.TestingCollector;
import org.junit.Before;
import org.junit.Test;
/**
- * Unit test for {@link TDMsgMixedDeserializer}.
+ * Unit test for {@link InLongMsgMixedDeserializer}.
*/
-public class TDMsgMixedDeserializerTest extends TestLogger {
+public class InLongMsgMixedDeserializerTest extends TestLogger {
private final long dataFlowId1 = 1L;
private final long dataFlowId2 = 2L;
private final String tid = "tid";
- private final TestingAbstractTDMsgFormatDeserializer preDeserializer = new TestingAbstractTDMsgFormatDeserializer();
+ private final TestingAbstractInLongMsgFormatDeserializer
+ preDeserializer = new TestingAbstractInLongMsgFormatDeserializer();
private final TestingConverter deserializer = new TestingConverter();
private final TestingCollector<Record> collector = new TestingCollector<>();
@@ -57,10 +58,11 @@ public class TDMsgMixedDeserializerTest extends TestLogger {
@Test
public void testUpdateAndRemoveDataFlow() {
- final TDMsgMixedDeserializer mixedDeserializer = new TDMsgMixedDeserializer();
+ final InLongMsgMixedDeserializer mixedDeserializer = new InLongMsgMixedDeserializer();
assertTrue(mixedDeserializer.isEmpty());
- mixedDeserializer.updateDataFlow(dataFlowId1, tid, new TestingAbstractTDMsgFormatDeserializer(), deserializer);
+ mixedDeserializer.updateDataFlow(dataFlowId1, tid,
+ new TestingAbstractInLongMsgFormatDeserializer(), deserializer);
mixedDeserializer.updateDataFlow(dataFlowId2, tid, preDeserializer, deserializer);
assertFalse(mixedDeserializer.isEmpty());
@@ -83,7 +85,7 @@ public class TDMsgMixedDeserializerTest extends TestLogger {
@Test
public void testDeserialize() throws Exception {
- final TDMsgMixedDeserializer mixedDeserializer = new TDMsgMixedDeserializer();
+ final InLongMsgMixedDeserializer mixedDeserializer = new InLongMsgMixedDeserializer();
mixedDeserializer.updateDataFlow(dataFlowId1, tid, preDeserializer, deserializer);
mixedDeserializer.updateDataFlow(dataFlowId2, tid, preDeserializer, deserializer);
@@ -93,7 +95,7 @@ public class TDMsgMixedDeserializerTest extends TestLogger {
row.setField(2, tid);
preDeserializer.records.add(row);
- mixedDeserializer.deserialize(new TDMsgMixedSerializedRecord(), collector);
+ mixedDeserializer.deserialize(new InLongMsgMixedSerializedRecord(), collector);
assertEquals(2, collector.results.size());
assertEquals(dataFlowId1, collector.results.get(0).getDataflowId());
assertEquals(tid, collector.results.get(0).getRow().getField(2));
@@ -101,13 +103,13 @@ public class TDMsgMixedDeserializerTest extends TestLogger {
assertEquals(tid, collector.results.get(1).getRow().getField(2));
}
- private static class TestingAbstractTDMsgFormatDeserializer extends AbstractTDMsgFormatDeserializer {
+ private static class TestingAbstractInLongMsgFormatDeserializer extends AbstractInLongMsgFormatDeserializer {
private static final long serialVersionUID = -5356395595688009428L;
public final Queue<Row> records = new ArrayDeque<>();
- public TestingAbstractTDMsgFormatDeserializer() {
+ public TestingAbstractInLongMsgFormatDeserializer() {
super(false);
}
@@ -117,17 +119,17 @@ public class TDMsgMixedDeserializerTest extends TestLogger {
}
@Override
- protected TDMsgHead parseHead(String s) throws Exception {
+ protected InLongMsgHead parseHead(String s) throws Exception {
return null;
}
@Override
- protected TDMsgBody parseBody(byte[] bytes) throws Exception {
+ protected InLongMsgBody parseBody(byte[] bytes) throws Exception {
return null;
}
@Override
- protected Row convertRow(TDMsgHead tdMsgHead, TDMsgBody tdMsgBody) throws Exception {
+ protected Row convertRow(InLongMsgHead inLongMsgHead, InLongMsgBody inLongMsgBody) throws Exception {
return null;
}
@@ -137,7 +139,7 @@ public class TDMsgMixedDeserializerTest extends TestLogger {
}
}
- private static class TestingConverter implements TDMsgMixedFormatConverter {
+ private static class TestingConverter implements InLongMsgMixedFormatConverter {
private static final long serialVersionUID = -2239130770704933846L;
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyInLongMsgMixedDeserializerTest.java
similarity index 71%
rename from inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
rename to inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyInLongMsgMixedDeserializerTest.java
index fddb22c..1f65a9e 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyInLongMsgMixedDeserializerTest.java
@@ -25,15 +25,15 @@ import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
-import org.apache.inlong.commons.msg.TDMsg1;
+import org.apache.inlong.commons.msg.InLongMsg;
import org.apache.inlong.sort.flink.Record;
-import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
+import org.apache.inlong.sort.flink.InLongMsgMixedSerializedRecord;
import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
import org.apache.inlong.sort.protocol.DataFlowInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
import org.apache.inlong.sort.util.TestLogger;
import org.apache.inlong.sort.util.TestingUtils.EmptySinkInfo;
@@ -42,30 +42,30 @@ import org.apache.inlong.sort.util.TestingUtils.TestingCollector;
import org.junit.Test;
/**
- * Unit test for {@link MultiTenancyTDMsgMixedDeserializer}.
+ * Unit test for {@link MultiTenancyInLongMsgMixedDeserializer}.
*/
-public class MultiTenancyTDMsgMixedDeserializerTest extends TestLogger {
+public class MultiTenancyInLongMsgMixedDeserializerTest extends TestLogger {
@Test
- public void testIsTDMsgDataFlow() {
+ public void testIsInLongMsgDataFlow() {
final TubeSourceInfo tubeSourceInfo = new TubeSourceInfo(
"topic",
"address",
null,
- new TDMsgCsvDeserializationInfo("tid", ',', false),
+ new InLongMsgCsvDeserializationInfo("tid", ',', false),
new FieldInfo[0]);
final EmptySinkInfo sinkInfo = new EmptySinkInfo();
final DataFlowInfo dataFlowInfo = new DataFlowInfo(1L, tubeSourceInfo, sinkInfo);
- assertTrue(MultiTenancyTDMsgMixedDeserializer.isTDMsgDataFlow(dataFlowInfo));
+ assertTrue(MultiTenancyInLongMsgMixedDeserializer.isInLongMsgDataFlow(dataFlowInfo));
- final DataFlowInfo nonTDMsgDataFlow = new DataFlowInfo(2L, new EmptySourceInfo(), sinkInfo);
- assertFalse(MultiTenancyTDMsgMixedDeserializer.isTDMsgDataFlow(nonTDMsgDataFlow));
+ final DataFlowInfo nonInLongMsgDataFlow = new DataFlowInfo(2L, new EmptySourceInfo(), sinkInfo);
+ assertFalse(MultiTenancyInLongMsgMixedDeserializer.isInLongMsgDataFlow(nonInLongMsgDataFlow));
}
@Test
public void testDeserialize() throws Exception {
- final MultiTenancyTDMsgMixedDeserializer deserializer = new MultiTenancyTDMsgMixedDeserializer();
+ final MultiTenancyInLongMsgMixedDeserializer deserializer = new MultiTenancyInLongMsgMixedDeserializer();
final FieldInfo stringField = new FieldInfo("not_important", new StringFormatInfo());
final FieldInfo longField = new FieldInfo("id", new LongFormatInfo());
@@ -73,19 +73,19 @@ public class MultiTenancyTDMsgMixedDeserializerTest extends TestLogger {
"topic",
"address",
null,
- new TDMsgCsvDeserializationInfo("tid", '|', false),
+ new InLongMsgCsvDeserializationInfo("tid", '|', false),
new FieldInfo[]{stringField, longField});
final EmptySinkInfo sinkInfo = new EmptySinkInfo();
final DataFlowInfo dataFlowInfo = new DataFlowInfo(1L, tubeSourceInfo, sinkInfo);
deserializer.addDataFlow(dataFlowInfo);
- final TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- final String attrs = "m=0&" + TDMsgUtils.TDMSG_ATTR_STREAM_ID + "=tid&t=20210513";
+ final InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ final String attrs = "m=0&" + InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID + "=tid&t=20210513";
final String body1 = "tianqiwan|29";
- tdMsg1.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body1.getBytes());
final TestingCollector<Record> collector = new TestingCollector<>();
- deserializer.deserialize(new TDMsgMixedSerializedRecord("topic", 0, tdMsg1.buildArray()), collector);
+ deserializer.deserialize(new InLongMsgMixedSerializedRecord("topic", 0, inLongMsg.buildArray()), collector);
assertEquals(1, collector.results.size());
assertEquals(1L, collector.results.get(0).getDataflowId());
@@ -94,7 +94,7 @@ public class MultiTenancyTDMsgMixedDeserializerTest extends TestLogger {
assertEquals(new Timestamp(time), collector.results.get(0).getRow().getField(0));
final Map<String, String> attributes = new HashMap<>();
attributes.put("m", "0");
- attributes.put(TDMsgUtils.TDMSG_ATTR_STREAM_ID, "tid");
+ attributes.put(InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID, "tid");
attributes.put("t", "20210513");
assertEquals(attributes, collector.results.get(0).getRow().getField(1));
assertEquals("tianqiwan", collector.results.get(0).getRow().getField(2));
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/tubemq/MultiTopicTubeSourceFunctionTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/tubemq/MultiTopicTubeSourceFunctionTest.java
index 2821c2a..dc19a2d 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/tubemq/MultiTopicTubeSourceFunctionTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/tubemq/MultiTopicTubeSourceFunctionTest.java
@@ -27,7 +27,7 @@ import org.apache.inlong.sort.configuration.Constants;
import org.apache.inlong.sort.flink.tubemq.MultiTopicTubeSourceFunction.SourceEvent;
import org.apache.inlong.sort.flink.tubemq.MultiTopicTubeSourceFunction.SourceEventType;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
import org.apache.inlong.sort.util.TestLogger;
import java.util.ArrayList;
@@ -69,7 +69,7 @@ public class MultiTopicTubeSourceFunctionTest extends TestLogger {
topic,
masterAddress,
null,
- new TDMsgCsvDeserializationInfo(tid, ',', true),
+ new InLongMsgCsvDeserializationInfo(tid, ',', true),
new FieldInfo[0]);
}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/tubemq/TubeSubscriptionDescriptionTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/tubemq/TubeSubscriptionDescriptionTest.java
index 32644b9..09f6a2c 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/tubemq/TubeSubscriptionDescriptionTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/tubemq/TubeSubscriptionDescriptionTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
import org.apache.inlong.sort.util.TestLogger;
import java.util.ArrayList;
@@ -50,7 +50,7 @@ public class TubeSubscriptionDescriptionTest extends TestLogger {
topic,
masterAddress,
null,
- new TDMsgCsvDeserializationInfo(tid, ',', true),
+ new InLongMsgCsvDeserializationInfo(tid, ',', true),
new FieldInfo[0]);
description.addDataFlow(dataFlowId, tubeSourceInfo);
@@ -68,7 +68,7 @@ public class TubeSubscriptionDescriptionTest extends TestLogger {
topic,
masterAddress,
null,
- new TDMsgCsvDeserializationInfo(tid, ',', true),
+ new InLongMsgCsvDeserializationInfo(tid, ',', true),
new FieldInfo[0]);
description.addDataFlow(dataFlowId, tubeSourceInfo1);
final String tid2 = "10002";
@@ -77,7 +77,7 @@ public class TubeSubscriptionDescriptionTest extends TestLogger {
topic,
masterAddress,
consumerGroup,
- new TDMsgCsvDeserializationInfo(tid2, ',', true),
+ new InLongMsgCsvDeserializationInfo(tid2, ',', true),
new FieldInfo[0]);
final long dataFlowId2 = 10086L;
description.addDataFlow(dataFlowId2, tubeSourceInfo2);
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/pom.xml b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
similarity index 97%
rename from inlong-sort/sort-formats/format-tdmsg-base/pom.xml
rename to inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
index 35447a9..d68f6b4 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/pom.xml
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
@@ -31,8 +31,8 @@
<relativePath>..</relativePath>
</parent>
- <artifactId>sort-format-tdmsg-base</artifactId>
- <name>Apache InLong - Sort Format-tdmsg-base</name>
+ <artifactId>sort-format-inlongmsg-base</artifactId>
+ <name>Apache InLong - Sort Format-inlongmsg-base</name>
<packaging>jar</packaging>
<dependencies>
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/AbstractTDMsgFormatDeserializer.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
similarity index 73%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/AbstractTDMsgFormatDeserializer.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
index 32d0d87..fddcd88 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/AbstractTDMsgFormatDeserializer.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
import java.util.Arrays;
import java.util.Iterator;
@@ -24,19 +24,19 @@ import java.util.Objects;
import javax.annotation.Nonnull;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
-import org.apache.inlong.commons.msg.TDMsg1;
+import org.apache.inlong.commons.msg.InLongMsg;
import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * The base for all tdmsg format deserializers.
+ * The base for all inlongmsg format deserializers.
*/
-public abstract class AbstractTDMsgFormatDeserializer implements TableFormatDeserializer {
+public abstract class AbstractInLongMsgFormatDeserializer implements TableFormatDeserializer {
private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(AbstractTDMsgFormatDeserializer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractInLongMsgFormatDeserializer.class);
/**
* True if ignore errors in the deserialization.
@@ -44,39 +44,39 @@ public abstract class AbstractTDMsgFormatDeserializer implements TableFormatDese
@Nonnull
protected final Boolean ignoreErrors;
- public AbstractTDMsgFormatDeserializer(@Nonnull Boolean ignoreErrors) {
+ public AbstractInLongMsgFormatDeserializer(@Nonnull Boolean ignoreErrors) {
this.ignoreErrors = ignoreErrors;
}
/**
- * Parses the head of the tdmsg record.
+ * Parses the head of the inlongmsg record.
*/
- protected abstract TDMsgHead parseHead(String attr) throws Exception;
+ protected abstract InLongMsgHead parseHead(String attr) throws Exception;
/**
- * Parses the body of the tdmsg record.
+ * Parses the body of the inlongmsg record.
*/
- protected abstract TDMsgBody parseBody(byte[] bytes) throws Exception;
+ protected abstract InLongMsgBody parseBody(byte[] bytes) throws Exception;
/**
- * Converts the tdmsg record into a row.
+ * Converts the inlongmsg record into a row.
*/
- protected abstract Row convertRow(TDMsgHead head, TDMsgBody body) throws Exception;
+ protected abstract Row convertRow(InLongMsgHead head, InLongMsgBody body) throws Exception;
@Override
public void flatMap(
byte[] bytes,
Collector<Row> collector
) throws Exception {
- TDMsg1 tdMsg = TDMsg1.parseFrom(bytes);
+ InLongMsg inLongMsg = InLongMsg.parseFrom(bytes);
- for (String attr : tdMsg.getAttrs()) {
- Iterator<byte[]> iterator = tdMsg.getIterator(attr);
+ for (String attr : inLongMsg.getAttrs()) {
+ Iterator<byte[]> iterator = inLongMsg.getIterator(attr);
if (iterator == null) {
continue;
}
- TDMsgHead head;
+ InLongMsgHead head;
try {
head = parseHead(attr);
} catch (Exception e) {
@@ -95,7 +95,7 @@ public abstract class AbstractTDMsgFormatDeserializer implements TableFormatDese
continue;
}
- TDMsgBody body;
+ InLongMsgBody body;
try {
body = parseBody(bodyBytes);
} catch (Exception e) {
@@ -113,7 +113,7 @@ public abstract class AbstractTDMsgFormatDeserializer implements TableFormatDese
row = convertRow(head, body);
} catch (Exception e) {
if (ignoreErrors) {
- LOG.warn("Cannot properly convert the tdmsg ({}, {}) " + "to row.", head, body, e);
+ LOG.warn("Cannot properly convert the inlongmsg ({}, {}) " + "to row.", head, body, e);
continue;
} else {
throw e;
@@ -137,7 +137,7 @@ public abstract class AbstractTDMsgFormatDeserializer implements TableFormatDese
return false;
}
- AbstractTDMsgFormatDeserializer that = (AbstractTDMsgFormatDeserializer) o;
+ AbstractInLongMsgFormatDeserializer that = (AbstractInLongMsgFormatDeserializer) o;
return ignoreErrors.equals(that.ignoreErrors);
}
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/AbstractTDMsgMixedFormatDeserializer.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatDeserializer.java
similarity index 74%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/AbstractTDMsgMixedFormatDeserializer.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatDeserializer.java
index 3329cc6..95d0a3a 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/AbstractTDMsgMixedFormatDeserializer.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatDeserializer.java
@@ -16,17 +16,17 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
import javax.annotation.Nonnull;
/**
- * The base for all tdmsg mixed format deserializers.
+ * The base for all inlongmsg mixed format deserializers.
*/
-public abstract class AbstractTDMsgMixedFormatDeserializer
- extends AbstractTDMsgFormatDeserializer {
+public abstract class AbstractInLongMsgMixedFormatDeserializer
+ extends AbstractInLongMsgFormatDeserializer {
- public AbstractTDMsgMixedFormatDeserializer(@Nonnull Boolean ignoreErrors) {
+ public AbstractInLongMsgMixedFormatDeserializer(@Nonnull Boolean ignoreErrors) {
super(ignoreErrors);
}
}
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgBody.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
similarity index 84%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgBody.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
index 3c204f1..bb00c35 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgBody.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
@@ -16,18 +16,18 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import org.apache.inlong.commons.msg.TDMsg1;
+import org.apache.inlong.commons.msg.InLongMsg;
/**
- * The body deserialized from {@link TDMsg1}.
+ * The body deserialized from {@link InLongMsg}.
*/
-public class TDMsgBody implements Serializable {
+public class InLongMsgBody implements Serializable {
private static final long serialVersionUID = 1L;
@@ -51,7 +51,7 @@ public class TDMsgBody implements Serializable {
*/
private final Map<String, String> entries;
- public TDMsgBody(
+ public InLongMsgBody(
byte[] data,
String tid,
List<String> fields,
@@ -89,8 +89,8 @@ public class TDMsgBody implements Serializable {
return false;
}
- TDMsgBody tdMsgBody = (TDMsgBody) o;
- return Arrays.equals(data, tdMsgBody.data);
+ InLongMsgBody inLongMsgBody = (InLongMsgBody) o;
+ return Arrays.equals(data, inLongMsgBody.data);
}
@Override
@@ -100,7 +100,7 @@ public class TDMsgBody implements Serializable {
@Override
public String toString() {
- return "TDMsgBody{" + "data=" + Arrays.toString(data) + ", tid='" + tid + '\''
+ return "InLongMsgBody{" + "data=" + Arrays.toString(data) + ", tid='" + tid + '\''
+ ", fields=" + fields + ", entries=" + entries + '}';
}
}
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgHead.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgHead.java
similarity index 90%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgHead.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgHead.java
index 662c8a0..a753457 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgHead.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgHead.java
@@ -16,19 +16,20 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
import java.io.Serializable;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import org.apache.inlong.commons.msg.TDMsg1;
+
+import org.apache.inlong.commons.msg.InLongMsg;
/**
- * The head deserialized from {@link TDMsg1}.
+ * The head deserialized from {@link InLongMsg}.
*/
-public class TDMsgHead implements Serializable {
+public class InLongMsgHead implements Serializable {
private static final long serialVersionUID = 1L;
@@ -52,7 +53,7 @@ public class TDMsgHead implements Serializable {
*/
private final List<String> predefinedFields;
- public TDMsgHead(
+ public InLongMsgHead(
Map<String, String> attributes,
String tid,
Timestamp time,
@@ -90,7 +91,7 @@ public class TDMsgHead implements Serializable {
return false;
}
- TDMsgHead that = (TDMsgHead) o;
+ InLongMsgHead that = (InLongMsgHead) o;
return Objects.equals(attributes, that.attributes)
&& Objects.equals(tid, that.tid)
&& Objects.equals(time, that.time)
@@ -104,7 +105,7 @@ public class TDMsgHead implements Serializable {
@Override
public String toString() {
- return "TDMsgHead{"
+ return "InLongMsgHead{"
+ "attributes=" + attributes
+ ", tid='" + tid + '\''
+ ", time=" + time
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedFormatConverter.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverter.java
similarity index 87%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedFormatConverter.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverter.java
index 55d3b16..25f6843 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedFormatConverter.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverter.java
@@ -16,16 +16,16 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.types.Row;
/**
- * The converter for a mixed tdmsg format.
+ * The converter for a mixed inlongmsg format.
*/
-public interface TDMsgMixedFormatConverter
+public interface InLongMsgMixedFormatConverter
extends FlatMapFunction<Row, Row>, ResultTypeQueryable<Row> {
}
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedFormatFactory.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatFactory.java
similarity index 79%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedFormatFactory.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatFactory.java
index 1f07eb9..48ed703 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedFormatFactory.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatFactory.java
@@ -16,25 +16,25 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
import java.util.Map;
import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
/**
- * Factory for creating configured instances of {@link TDMsgMixedFormatConverter}.
+ * Factory for creating configured instances of {@link InLongMsgMixedFormatConverter}.
*/
-public interface TDMsgMixedFormatFactory {
+public interface InLongMsgMixedFormatFactory {
/**
- * Creates and configures a {@link TDMsgMixedFormatConverter} using the given
+ * Creates and configures a {@link InLongMsgMixedFormatConverter} using the given
* properties.
*
* @param properties The normalized properties describing the format.
* @return The configured serialization schema or null if the factory cannot
* provide an instance of the class.
*/
- TDMsgMixedFormatConverter createMixedFormatConverter(
+ InLongMsgMixedFormatConverter createMixedFormatConverter(
final Map<String, String> properties
);
@@ -46,7 +46,7 @@ public interface TDMsgMixedFormatFactory {
* @return The configured serialization schema or null if the factory cannot
* provide an instance of the class.
*/
- AbstractTDMsgMixedFormatDeserializer createMixedFormatDeserializer(
+ AbstractInLongMsgMixedFormatDeserializer createMixedFormatDeserializer(
final Map<String, String> properties
);
}
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedValidator.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedValidator.java
similarity index 91%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedValidator.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedValidator.java
index e26d2eb..1e9514c 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedValidator.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedValidator.java
@@ -16,16 +16,16 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FormatDescriptorValidator;
import org.apache.inlong.sort.formats.base.TableFormatConstants;
/**
- * Validator for mixed tdmsg formats.
+ * Validator for mixed inlongmsg formats.
*/
-public class TDMsgMixedValidator extends FormatDescriptorValidator {
+public class InLongMsgMixedValidator extends FormatDescriptorValidator {
@Override
public void validate(DescriptorProperties properties) {
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgUtils.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
similarity index 90%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgUtils.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
index 33d715f..e2b8e8f 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgUtils.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
import static org.apache.flink.table.factories.TableFormatFactoryBase.deriveSchema;
@@ -36,7 +36,7 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
-import org.apache.inlong.commons.msg.TDMsg1;
+import org.apache.inlong.commons.msg.InLongMsg;
import org.apache.inlong.sort.formats.base.TableFormatConstants;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.inlong.sort.formats.common.FormatInfo;
@@ -44,24 +44,24 @@ import org.apache.inlong.sort.formats.common.RowFormatInfo;
import org.apache.inlong.sort.formats.util.StringUtils;
/**
- * A utility class for parsing TDMsg {@link TDMsg1}.
+ * A utility class for parsing InLongMsg {@link InLongMsg}.
*/
-public class TDMsgUtils {
+public class InLongMsgUtils {
- public static final char TDMSG_ATTR_ENTRY_DELIMITER = '&';
- public static final char TDMSG_ATTR_KV_DELIMITER = '=';
+ public static final char INLONGMSG_ATTR_ENTRY_DELIMITER = '&';
+ public static final char INLONGMSG_ATTR_KV_DELIMITER = '=';
// keys in attributes
- public static final String TDMSG_ATTR_STREAM_ID = "streamId";
- public static final String TDMSG_ATTR_TIME_T = "t";
- public static final String TDMSG_ATTR_TIME_DT = "dt";
- public static final String TDMSG_ATTR_ADD_COLUMN_PREFIX = "__addcol";
+ public static final String INLONGMSG_ATTR_STREAM_ID = "streamId";
+ public static final String INLONGMSG_ATTR_TIME_T = "t";
+ public static final String INLONGMSG_ATTR_TIME_DT = "dt";
+ public static final String INLONGMSG_ATTR_ADD_COLUMN_PREFIX = "__addcol";
public static final String FORMAT_TIME_FIELD_NAME = "format.time-field-name";
public static final String FORMAT_ATTRIBUTES_FIELD_NAME = "format.attributes-field-name";
- public static final String DEFAULT_TIME_FIELD_NAME = "tdmsg_time";
- public static final String DEFAULT_ATTRIBUTES_FIELD_NAME = "tdmsg_attributes";
+ public static final String DEFAULT_TIME_FIELD_NAME = "inlongmsg_time";
+ public static final String DEFAULT_ATTRIBUTES_FIELD_NAME = "inlongmsg_attributes";
public static final TypeInformation<Row> MIXED_ROW_TYPE =
Types.ROW_NAMED(
@@ -111,8 +111,8 @@ public class TDMsgUtils {
public static Map<String, String> parseAttr(String attr) {
return StringUtils.splitKv(
attr,
- TDMSG_ATTR_ENTRY_DELIMITER,
- TDMSG_ATTR_KV_DELIMITER,
+ INLONGMSG_ATTR_ENTRY_DELIMITER,
+ INLONGMSG_ATTR_KV_DELIMITER,
null,
null
);
@@ -154,15 +154,15 @@ public class TDMsgUtils {
public static List<String> getPredefinedFields(Map<String, String> head) {
Map<Integer, String> predefinedFields = new HashMap<>();
for (String key : head.keySet()) {
- if (!key.startsWith(TDMSG_ATTR_ADD_COLUMN_PREFIX)) {
+ if (!key.startsWith(INLONGMSG_ATTR_ADD_COLUMN_PREFIX)) {
continue;
}
int index =
Integer.parseInt(
key.substring(
- TDMSG_ATTR_ADD_COLUMN_PREFIX.length(),
- key.indexOf('_', TDMSG_ATTR_ADD_COLUMN_PREFIX.length())
+ INLONGMSG_ATTR_ADD_COLUMN_PREFIX.length(),
+ key.indexOf('_', INLONGMSG_ATTR_ADD_COLUMN_PREFIX.length())
)
);
@@ -179,8 +179,8 @@ public class TDMsgUtils {
}
public static Row buildMixedRow(
- TDMsgHead head,
- TDMsgBody body,
+ InLongMsgHead head,
+ InLongMsgBody body,
String tid
) {
Row row = new Row(7);
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgValidator.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgValidator.java
similarity index 91%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgValidator.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgValidator.java
index 486bf2f..c2a51ee 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgValidator.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgValidator.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FormatDescriptorValidator;
@@ -24,9 +24,9 @@ import org.apache.inlong.sort.formats.base.TableFormatConstants;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
/**
- * Validator for mixed tdmsg formats.
+ * Validator for mixed inlongmsg formats.
*/
-public class TDMsgValidator extends FormatDescriptorValidator {
+public class InLongMsgValidator extends FormatDescriptorValidator {
@Override
public void validate(DescriptorProperties properties) {
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/pom.xml b/inlong-sort/sort-formats/format-inlongmsg-csv/pom.xml
similarity index 96%
rename from inlong-sort/sort-formats/format-tdmsg-csv/pom.xml
rename to inlong-sort/sort-formats/format-inlongmsg-csv/pom.xml
index 712a0b2..0f341ec 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/pom.xml
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/pom.xml
@@ -31,8 +31,8 @@
<relativePath>..</relativePath>
</parent>
- <artifactId>sort-format-tdmsg-csv</artifactId>
- <name>Apache InLong - Sort Format-tdmsg-csv</name>
+ <artifactId>sort-format-inlongmsg-csv</artifactId>
+ <name>Apache InLong - Sort Format-inlongmsg-csv</name>
<packaging>jar</packaging>
<dependencies>
@@ -55,7 +55,7 @@
<dependency>
<groupId>org.apache.inlong</groupId>
- <artifactId>sort-format-tdmsg-base</artifactId>
+ <artifactId>sort-format-inlongmsg-base</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsv.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
similarity index 81%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsv.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
index ee5685b..f39ec6b 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsv.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
@@ -16,12 +16,12 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
import java.nio.charset.Charset;
import java.util.Map;
@@ -36,14 +36,14 @@ import org.apache.inlong.sort.formats.base.TableFormatConstants;
* for Comma-Separated Values (CSV) Files") proposed by the Internet Engineering
* Task Force (IETF).
*/
-public class TDMsgCsv extends FormatDescriptor {
+public class InLongMsgCsv extends FormatDescriptor {
- public static final String FORMAT_TYPE_VALUE = "tdmsgcsv";
+ public static final String FORMAT_TYPE_VALUE = "inlongmsgcsv";
private DescriptorProperties internalProperties =
new DescriptorProperties(true);
- public TDMsgCsv() {
+ public InLongMsgCsv() {
super(FORMAT_TYPE_VALUE, 1);
}
@@ -52,7 +52,7 @@ public class TDMsgCsv extends FormatDescriptor {
*
* @param delimiter the field delimiter character
*/
- public TDMsgCsv delimiter(char delimiter) {
+ public InLongMsgCsv delimiter(char delimiter) {
internalProperties.putCharacter(TableFormatConstants.FORMAT_DELIMITER, delimiter);
return this;
}
@@ -62,7 +62,7 @@ public class TDMsgCsv extends FormatDescriptor {
*
* @param escapeCharacter escaping character (e.g. backslash).
*/
- public TDMsgCsv escapeCharacter(char escapeCharacter) {
+ public InLongMsgCsv escapeCharacter(char escapeCharacter) {
internalProperties
.putCharacter(TableFormatConstants.FORMAT_ESCAPE_CHARACTER, escapeCharacter);
return this;
@@ -73,7 +73,7 @@ public class TDMsgCsv extends FormatDescriptor {
*
* @param quoteCharacter quoting character (e.g. quotation).
*/
- public TDMsgCsv quoteCharacter(char quoteCharacter) {
+ public InLongMsgCsv quoteCharacter(char quoteCharacter) {
internalProperties.putCharacter(TableFormatConstants.FORMAT_QUOTE_CHARACTER, quoteCharacter);
return this;
}
@@ -84,7 +84,7 @@ public class TDMsgCsv extends FormatDescriptor {
*
* @param nullLiteral null literal (e.g. "null" or "n/a")
*/
- public TDMsgCsv nullLiteral(String nullLiteral) {
+ public InLongMsgCsv nullLiteral(String nullLiteral) {
checkNotNull(nullLiteral);
internalProperties.putString(TableFormatConstants.FORMAT_NULL_LITERAL, nullLiteral);
return this;
@@ -95,7 +95,7 @@ public class TDMsgCsv extends FormatDescriptor {
*
* @param charset The charset of the text.
*/
- public TDMsgCsv charset(Charset charset) {
+ public InLongMsgCsv charset(Charset charset) {
checkNotNull(charset);
internalProperties.putString(TableFormatConstants.FORMAT_CHARSET, charset.name());
return this;
@@ -104,15 +104,15 @@ public class TDMsgCsv extends FormatDescriptor {
/**
* Retains the delimiter at the first character.
*/
- public TDMsgCsv retainHeadDelimiter() {
- internalProperties.putBoolean(TDMsgCsvUtils.FORMAT_DELETE_HEAD_DELIMITER, false);
+ public InLongMsgCsv retainHeadDelimiter() {
+ internalProperties.putBoolean(InLongMsgCsvUtils.FORMAT_DELETE_HEAD_DELIMITER, false);
return this;
}
/**
* Ignores the errors in the serialization and deserialization.
*/
- public TDMsgCsv ignoreErrors() {
+ public InLongMsgCsv ignoreErrors() {
internalProperties.putBoolean(TableFormatConstants.FORMAT_IGNORE_ERRORS, true);
return this;
}
@@ -122,7 +122,7 @@ public class TDMsgCsv extends FormatDescriptor {
*
* @param schema format schema string.
*/
- public TDMsgCsv schema(String schema) {
+ public InLongMsgCsv schema(String schema) {
checkNotNull(schema);
internalProperties.putString(TableFormatConstants.FORMAT_SCHEMA, schema);
return this;
@@ -139,7 +139,7 @@ public class TDMsgCsv extends FormatDescriptor {
* field. A "from" definition is interpreted as a field renaming in the
* format.
*/
- public TDMsgCsv deriveSchema() {
+ public InLongMsgCsv deriveSchema() {
internalProperties.putBoolean(FORMAT_DERIVE_SCHEMA, true);
return this;
}
@@ -149,7 +149,7 @@ public class TDMsgCsv extends FormatDescriptor {
*
* @param timeFieldName The name of the time field.
*/
- public TDMsgCsv timeFieldName(String timeFieldName) {
+ public InLongMsgCsv timeFieldName(String timeFieldName) {
checkNotNull(timeFieldName);
internalProperties.putString(FORMAT_TIME_FIELD_NAME, timeFieldName);
return this;
@@ -160,7 +160,7 @@ public class TDMsgCsv extends FormatDescriptor {
*
* @param attributesFieldName The name of the attributes field.
*/
- public TDMsgCsv attributesFieldName(String attributesFieldName) {
+ public InLongMsgCsv attributesFieldName(String attributesFieldName) {
checkNotNull(attributesFieldName);
internalProperties.putString(FORMAT_ATTRIBUTES_FIELD_NAME, attributesFieldName);
return this;
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializer.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
similarity index 79%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializer.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
index bb5351e..ca2bab0 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializer.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
@@ -16,10 +16,10 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
import java.util.Objects;
import javax.annotation.Nonnull;
@@ -28,15 +28,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
import org.apache.inlong.sort.formats.base.TableFormatConstants;
import org.apache.inlong.sort.formats.common.RowFormatInfo;
-import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgFormatDeserializer;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgBody;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgHead;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
/**
- * The deserializer for the records in TDMsgCsv format.
+ * The deserializer for the records in InLongMsgCsv format.
*/
-public final class TDMsgCsvFormatDeserializer extends AbstractTDMsgFormatDeserializer {
+public final class InLongMsgCsvFormatDeserializer extends AbstractInLongMsgFormatDeserializer {
private static final long serialVersionUID = 1L;
@@ -93,7 +93,7 @@ public final class TDMsgCsvFormatDeserializer extends AbstractTDMsgFormatDeseria
*/
private final boolean deleteHeadDelimiter;
- public TDMsgCsvFormatDeserializer(
+ public InLongMsgCsvFormatDeserializer(
@Nonnull RowFormatInfo rowFormatInfo,
@Nonnull String timeFieldName,
@Nonnull String attributesFieldName,
@@ -118,7 +118,7 @@ public final class TDMsgCsvFormatDeserializer extends AbstractTDMsgFormatDeseria
this.deleteHeadDelimiter = deleteHeadDelimiter;
}
- public TDMsgCsvFormatDeserializer(
+ public InLongMsgCsvFormatDeserializer(
@Nonnull RowFormatInfo rowFormatInfo
) {
this(
@@ -130,24 +130,24 @@ public final class TDMsgCsvFormatDeserializer extends AbstractTDMsgFormatDeseria
null,
null,
null,
- TDMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER,
+ InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER,
TableFormatConstants.DEFAULT_IGNORE_ERRORS
);
}
@Override
public TypeInformation<Row> getProducedType() {
- return TDMsgUtils.buildRowType(timeFieldName, attributesFieldName, rowFormatInfo);
+ return InLongMsgUtils.buildRowType(timeFieldName, attributesFieldName, rowFormatInfo);
}
@Override
- protected TDMsgHead parseHead(String attr) {
- return TDMsgCsvUtils.parseHead(attr);
+ protected InLongMsgHead parseHead(String attr) {
+ return InLongMsgCsvUtils.parseHead(attr);
}
@Override
- protected TDMsgBody parseBody(byte[] bytes) {
- return TDMsgCsvUtils.parseBody(
+ protected InLongMsgBody parseBody(byte[] bytes) {
+ return InLongMsgCsvUtils.parseBody(
bytes,
charset,
delimiter,
@@ -158,8 +158,8 @@ public final class TDMsgCsvFormatDeserializer extends AbstractTDMsgFormatDeseria
}
@Override
- protected Row convertRow(TDMsgHead head, TDMsgBody body) {
- return TDMsgCsvUtils.buildRow(
+ protected Row convertRow(InLongMsgHead head, InLongMsgBody body) {
+ return InLongMsgCsvUtils.buildRow(
rowFormatInfo,
nullLiteral,
head.getTime(),
@@ -183,7 +183,7 @@ public final class TDMsgCsvFormatDeserializer extends AbstractTDMsgFormatDeseria
return false;
}
- TDMsgCsvFormatDeserializer that = (TDMsgCsvFormatDeserializer) o;
+ InLongMsgCsvFormatDeserializer that = (InLongMsgCsvFormatDeserializer) o;
return deleteHeadDelimiter == that.deleteHeadDelimiter
&& rowFormatInfo.equals(that.rowFormatInfo)
&& timeFieldName.equals(that.timeFieldName)
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatFactory.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
similarity index 79%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatFactory.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
index 144d2af..ccbd9a6 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatFactory.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
@@ -27,11 +27,11 @@ import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ES
import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.FORMAT_TIME_FIELD_NAME;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.getDataFormatInfo;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.validateFieldNames;
-import static org.apache.inlong.sort.formats.tdmsgcsv.TDMsgCsvUtils.FORMAT_DELETE_HEAD_DELIMITER;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getDataFormatInfo;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateFieldNames;
+import static org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.FORMAT_DELETE_HEAD_DELIMITER;
import java.util.ArrayList;
import java.util.List;
@@ -43,21 +43,21 @@ import org.apache.inlong.sort.formats.base.TableFormatConstants;
import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
import org.apache.inlong.sort.formats.base.TableFormatDeserializerFactory;
import org.apache.inlong.sort.formats.common.RowFormatInfo;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgMixedFormatFactory;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgMixedValidator;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgValidator;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatFactory;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedValidator;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgValidator;
/**
- * Table format factory for providing configured instances of TDMsgCsv-to-row
+ * Table format factory for providing configured instances of InLongMsgCsv-to-row
* serializer and deserializer.
*/
-public final class TDMsgCsvFormatFactory
+public final class InLongMsgCsvFormatFactory
extends TableFormatFactoryBase<Row>
- implements TableFormatDeserializerFactory, TDMsgMixedFormatFactory {
+ implements TableFormatDeserializerFactory, InLongMsgMixedFormatFactory {
- public TDMsgCsvFormatFactory() {
- super(TDMsgCsv.FORMAT_TYPE_VALUE, 1, true);
+ public InLongMsgCsvFormatFactory() {
+ super(InLongMsgCsv.FORMAT_TYPE_VALUE, 1, true);
}
@Override
@@ -84,7 +84,7 @@ public final class TDMsgCsvFormatFactory
new DescriptorProperties(true);
descriptorProperties.putProperties(properties);
- final TDMsgValidator validator = new TDMsgValidator();
+ final InLongMsgValidator validator = new InLongMsgValidator();
validator.validate(descriptorProperties);
RowFormatInfo rowFormatInfo = getDataFormatInfo(descriptorProperties);
@@ -92,11 +92,11 @@ public final class TDMsgCsvFormatFactory
String timeFieldName =
descriptorProperties
.getOptionalString(FORMAT_TIME_FIELD_NAME)
- .orElse(TDMsgUtils.DEFAULT_TIME_FIELD_NAME);
+ .orElse(InLongMsgUtils.DEFAULT_TIME_FIELD_NAME);
String attributesFieldName =
descriptorProperties
.getOptionalString(FORMAT_ATTRIBUTES_FIELD_NAME)
- .orElse(TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME);
+ .orElse(InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME);
validateFieldNames(timeFieldName, attributesFieldName, rowFormatInfo);
@@ -123,13 +123,13 @@ public final class TDMsgCsvFormatFactory
Boolean deleteHeadDelimiter =
descriptorProperties
.getOptionalBoolean(FORMAT_DELETE_HEAD_DELIMITER)
- .orElse(TDMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER);
+ .orElse(InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER);
boolean ignoreErrors =
descriptorProperties
.getOptionalBoolean(FORMAT_IGNORE_ERRORS)
.orElse(DEFAULT_IGNORE_ERRORS);
- return new TDMsgCsvFormatDeserializer(
+ return new InLongMsgCsvFormatDeserializer(
rowFormatInfo,
timeFieldName,
attributesFieldName,
@@ -144,14 +144,14 @@ public final class TDMsgCsvFormatFactory
}
@Override
- public TDMsgCsvMixedFormatDeserializer createMixedFormatDeserializer(
+ public InLongMsgCsvMixedFormatDeserializer createMixedFormatDeserializer(
Map<String, String> properties
) {
final DescriptorProperties descriptorProperties =
new DescriptorProperties(true);
descriptorProperties.putProperties(properties);
- final TDMsgMixedValidator validator = new TDMsgMixedValidator();
+ final InLongMsgMixedValidator validator = new InLongMsgMixedValidator();
validator.validate(descriptorProperties);
String charset =
@@ -173,13 +173,13 @@ public final class TDMsgCsvFormatFactory
Boolean deleteHeadDelimiter =
descriptorProperties
.getOptionalBoolean(FORMAT_DELETE_HEAD_DELIMITER)
- .orElse(TDMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER);
+ .orElse(InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER);
boolean ignoreErrors =
descriptorProperties
.getOptionalBoolean(FORMAT_IGNORE_ERRORS)
.orElse(DEFAULT_IGNORE_ERRORS);
- return new TDMsgCsvMixedFormatDeserializer(
+ return new InLongMsgCsvMixedFormatDeserializer(
charset,
delimiter,
escapeCharacter,
@@ -190,7 +190,7 @@ public final class TDMsgCsvFormatFactory
}
@Override
- public TDMsgCsvMixedFormatConverter createMixedFormatConverter(
+ public InLongMsgCsvMixedFormatConverter createMixedFormatConverter(
Map<String, String> properties
) {
final DescriptorProperties descriptorProperties =
@@ -202,11 +202,11 @@ public final class TDMsgCsvFormatFactory
String timeFieldName =
descriptorProperties
.getOptionalString(FORMAT_TIME_FIELD_NAME)
- .orElse(TDMsgUtils.DEFAULT_TIME_FIELD_NAME);
+ .orElse(InLongMsgUtils.DEFAULT_TIME_FIELD_NAME);
String attributesFieldName =
descriptorProperties
.getOptionalString(FORMAT_ATTRIBUTES_FIELD_NAME)
- .orElse(TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME);
+ .orElse(InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME);
validateFieldNames(timeFieldName, attributesFieldName, rowFormatInfo);
@@ -219,7 +219,7 @@ public final class TDMsgCsvFormatFactory
.getOptionalBoolean(FORMAT_IGNORE_ERRORS)
.orElse(DEFAULT_IGNORE_ERRORS);
- return new TDMsgCsvMixedFormatConverter(
+ return new InLongMsgCsvMixedFormatConverter(
rowFormatInfo,
timeFieldName,
attributesFieldName,
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvMixedFormatConverter.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
similarity index 75%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvMixedFormatConverter.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
index d0e5e2e..6bc995c 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvMixedFormatConverter.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
import java.sql.Timestamp;
import java.util.List;
@@ -27,19 +27,19 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.inlong.sort.formats.common.RowFormatInfo;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgMixedFormatConverter;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverter;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Converter used to deserialize a mixed row in tdmsg-csv format.
+ * Converter used to deserialize a mixed row in inlongmsg-csv format.
*/
-public class TDMsgCsvMixedFormatConverter implements TDMsgMixedFormatConverter {
+public class InLongMsgCsvMixedFormatConverter implements InLongMsgMixedFormatConverter {
private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(TDMsgCsvMixedFormatConverter.class);
+ private static final Logger LOG = LoggerFactory.getLogger(InLongMsgCsvMixedFormatConverter.class);
/**
* The schema of the rows.
@@ -69,7 +69,7 @@ public class TDMsgCsvMixedFormatConverter implements TDMsgMixedFormatConverter {
*/
private final boolean ignoreErrors;
- public TDMsgCsvMixedFormatConverter(
+ public InLongMsgCsvMixedFormatConverter(
@Nonnull RowFormatInfo rowFormatInfo,
@Nonnull String timeFieldName,
@Nonnull String attributesFieldName,
@@ -85,7 +85,7 @@ public class TDMsgCsvMixedFormatConverter implements TDMsgMixedFormatConverter {
@Override
public TypeInformation<Row> getProducedType() {
- return TDMsgUtils.buildRowType(timeFieldName, attributesFieldName, rowFormatInfo);
+ return InLongMsgUtils.buildRowType(timeFieldName, attributesFieldName, rowFormatInfo);
}
@Override
@@ -94,12 +94,12 @@ public class TDMsgCsvMixedFormatConverter implements TDMsgMixedFormatConverter {
Row row;
try {
- Timestamp time = TDMsgUtils.getTimeFromMixedRow(mixedRow);
- Map<String, String> attributes = TDMsgUtils.getAttributesFromMixedRow(mixedRow);
- List<String> predefinedFields = TDMsgUtils.getPredefinedFieldsFromMixedRow(mixedRow);
- List<String> fields = TDMsgUtils.getFieldsFromMixedRow(mixedRow);
+ Timestamp time = InLongMsgUtils.getTimeFromMixedRow(mixedRow);
+ Map<String, String> attributes = InLongMsgUtils.getAttributesFromMixedRow(mixedRow);
+ List<String> predefinedFields = InLongMsgUtils.getPredefinedFieldsFromMixedRow(mixedRow);
+ List<String> fields = InLongMsgUtils.getFieldsFromMixedRow(mixedRow);
- row = TDMsgCsvUtils.buildRow(rowFormatInfo, nullLiteral, time, attributes,
+ row = InLongMsgCsvUtils.buildRow(rowFormatInfo, nullLiteral, time, attributes,
predefinedFields, fields);
} catch (Exception e) {
if (ignoreErrors) {
@@ -124,7 +124,7 @@ public class TDMsgCsvMixedFormatConverter implements TDMsgMixedFormatConverter {
return false;
}
- TDMsgCsvMixedFormatConverter that = (TDMsgCsvMixedFormatConverter) o;
+ InLongMsgCsvMixedFormatConverter that = (InLongMsgCsvMixedFormatConverter) o;
return ignoreErrors == that.ignoreErrors
&& rowFormatInfo.equals(that.rowFormatInfo)
&& timeFieldName.equals(that.timeFieldName)
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvMixedFormatDeserializer.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
similarity index 75%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvMixedFormatDeserializer.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
index 2352fde..3d20422 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvMixedFormatDeserializer.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
import java.util.Objects;
import javax.annotation.Nonnull;
@@ -24,15 +24,15 @@ import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
import org.apache.inlong.sort.formats.base.TableFormatConstants;
-import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgMixedFormatDeserializer;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgBody;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgHead;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
/**
- * The deserializer for the records in TDMsgCsv format.
+ * The deserializer for the records in InLongMsgCsv format.
*/
-public final class TDMsgCsvMixedFormatDeserializer extends AbstractTDMsgMixedFormatDeserializer {
+public final class InLongMsgCsvMixedFormatDeserializer extends AbstractInLongMsgMixedFormatDeserializer {
private static final long serialVersionUID = 1L;
@@ -65,7 +65,7 @@ public final class TDMsgCsvMixedFormatDeserializer extends AbstractTDMsgMixedFor
*/
private final boolean deleteHeadDelimiter;
- public TDMsgCsvMixedFormatDeserializer(
+ public InLongMsgCsvMixedFormatDeserializer(
@Nonnull String charset,
@Nonnull Character delimiter,
@Nullable Character escapeChar,
@@ -82,30 +82,30 @@ public final class TDMsgCsvMixedFormatDeserializer extends AbstractTDMsgMixedFor
this.deleteHeadDelimiter = deleteHeadDelimiter;
}
- public TDMsgCsvMixedFormatDeserializer() {
+ public InLongMsgCsvMixedFormatDeserializer() {
this(
TableFormatConstants.DEFAULT_CHARSET,
TableFormatConstants.DEFAULT_DELIMITER,
null,
null,
- TDMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER,
+ InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER,
TableFormatConstants.DEFAULT_IGNORE_ERRORS
);
}
@Override
public TypeInformation<Row> getProducedType() {
- return TDMsgUtils.MIXED_ROW_TYPE;
+ return InLongMsgUtils.MIXED_ROW_TYPE;
}
@Override
- protected TDMsgHead parseHead(String attr) {
- return TDMsgCsvUtils.parseHead(attr);
+ protected InLongMsgHead parseHead(String attr) {
+ return InLongMsgCsvUtils.parseHead(attr);
}
@Override
- protected TDMsgBody parseBody(byte[] bytes) {
- return TDMsgCsvUtils.parseBody(
+ protected InLongMsgBody parseBody(byte[] bytes) {
+ return InLongMsgCsvUtils.parseBody(
bytes,
charset,
delimiter,
@@ -116,8 +116,8 @@ public final class TDMsgCsvMixedFormatDeserializer extends AbstractTDMsgMixedFor
}
@Override
- protected Row convertRow(TDMsgHead head, TDMsgBody body) {
- return TDMsgUtils.buildMixedRow(head, body, head.getTid());
+ protected Row convertRow(InLongMsgHead head, InLongMsgBody body) {
+ return InLongMsgUtils.buildMixedRow(head, body, head.getTid());
}
@Override
@@ -134,7 +134,7 @@ public final class TDMsgCsvMixedFormatDeserializer extends AbstractTDMsgMixedFor
return false;
}
- TDMsgCsvMixedFormatDeserializer that = (TDMsgCsvMixedFormatDeserializer) o;
+ InLongMsgCsvMixedFormatDeserializer that = (InLongMsgCsvMixedFormatDeserializer) o;
return deleteHeadDelimiter == that.deleteHeadDelimiter
&& charset.equals(that.charset)
&& delimiter.equals(that.delimiter)
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvUtils.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
similarity index 73%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvUtils.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index f935e6d..84c32bb 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvUtils.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -16,15 +16,15 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_STREAM_ID;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_TIME_DT;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_TIME_T;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.getPredefinedFields;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.parseAttr;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.parseDateTime;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.parseEpochTime;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
import java.nio.charset.Charset;
import java.sql.Timestamp;
@@ -36,57 +36,57 @@ import org.apache.flink.types.Row;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.formats.common.RowFormatInfo;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgBody;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgHead;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
import org.apache.inlong.sort.formats.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Utilities for {@link TDMsgCsv}.
+ * Utilities for {@link InLongMsgCsv}.
*/
-public class TDMsgCsvUtils {
+public class InLongMsgCsvUtils {
- private static final Logger LOG = LoggerFactory.getLogger(TDMsgUtils.class);
+ private static final Logger LOG = LoggerFactory.getLogger(InLongMsgUtils.class);
public static final String FORMAT_DELETE_HEAD_DELIMITER = "format.delete-head-delimiter";
public static final boolean DEFAULT_DELETE_HEAD_DELIMITER = true;
- public static TDMsgHead parseHead(String attr) {
+ public static InLongMsgHead parseHead(String attr) {
Map<String, String> attributes = parseAttr(attr);
// Extracts interface from the attributes.
String streamId;
- if (attributes.containsKey(TDMSG_ATTR_STREAM_ID)) {
- streamId = attributes.get(TDMSG_ATTR_STREAM_ID);
+ if (attributes.containsKey(INLONGMSG_ATTR_STREAM_ID)) {
+ streamId = attributes.get(INLONGMSG_ATTR_STREAM_ID);
} else {
- throw new IllegalArgumentException("Could not find " + TDMSG_ATTR_STREAM_ID + " in attributes!");
+ throw new IllegalArgumentException("Could not find " + INLONGMSG_ATTR_STREAM_ID + " in attributes!");
}
// Extracts time from the attributes
Timestamp time;
- if (attributes.containsKey(TDMSG_ATTR_TIME_T)) {
- String date = attributes.get(TDMSG_ATTR_TIME_T).trim();
+ if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
+ String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
time = parseDateTime(date);
- } else if (attributes.containsKey(TDMSG_ATTR_TIME_DT)) {
- String epoch = attributes.get(TDMSG_ATTR_TIME_DT).trim();
+ } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) {
+ String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim();
time = parseEpochTime(epoch);
} else {
throw new IllegalArgumentException(
- "Could not find " + TDMSG_ATTR_TIME_T
- + " or " + TDMSG_ATTR_TIME_DT + " in attributes!");
+ "Could not find " + INLONGMSG_ATTR_TIME_T
+ + " or " + INLONGMSG_ATTR_TIME_DT + " in attributes!");
}
// Extracts predefined fields from the attributes
List<String> predefinedFields = getPredefinedFields(attributes);
- return new TDMsgHead(attributes, streamId, time, predefinedFields);
+ return new InLongMsgHead(attributes, streamId, time, predefinedFields);
}
- public static TDMsgBody parseBody(
+ public static InLongMsgBody parseBody(
byte[] bytes,
String charset,
char delimiter,
@@ -104,7 +104,7 @@ public class TDMsgCsvUtils {
String[] fieldTexts = StringUtils.splitCsv(bodyText, delimiter, escapeChar, quoteChar);
- return new TDMsgBody(
+ return new InLongMsgBody(
bytes,
null,
Arrays.asList(fieldTexts),
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
similarity index 91%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 3cd93f0..52a23d9 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.inlong.sort.formats.tdmsgcsv.TDMsgCsvFormatFactory
+org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvFormatFactory
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
similarity index 73%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializerTest.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
index 91026f7..9c93c14 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializerTest.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
@@ -16,11 +16,11 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_TIME_FIELD_NAME;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_STREAM_ID;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
import static org.junit.Assert.assertEquals;
import java.nio.charset.Charset;
@@ -36,7 +36,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
-import org.apache.inlong.commons.msg.TDMsg1;
+import org.apache.inlong.commons.msg.InLongMsg;
import org.apache.inlong.sort.formats.base.TableFormatConstants;
import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.formats.common.IntFormatInfo;
@@ -45,9 +45,9 @@ import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.junit.Test;
/**
- * Unit tests for {@link TDMsgCsvFormatDeserializer}.
+ * Unit tests for {@link InLongMsgCsvFormatDeserializer}.
*/
-public class TDMsgCsvFormatDeserializerTest {
+public class InLongMsgCsvFormatDeserializerTest {
private static final RowFormatInfo TEST_ROW_INFO =
new RowFormatInfo(
@@ -64,8 +64,8 @@ public class TDMsgCsvFormatDeserializerTest {
@Test
public void testRowType() {
- TDMsgCsvFormatDeserializer deserializer =
- new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer(TEST_ROW_INFO);
TypeInformation<Row> expectedRowType =
Types.ROW_NAMED(
@@ -95,19 +95,19 @@ public class TDMsgCsvFormatDeserializerTest {
@Test
public void testNormal() throws Exception {
- final TDMsgCsvFormatDeserializer deserializer =
- new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
+ final InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer(TEST_ROW_INFO);
- TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&" + INLONGMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
String body1 = "123,field11,field12,field13";
String body2 = "123,field21,field22,field23";
- tdMsg1.addMsg(attrs, body1.getBytes());
- tdMsg1.addMsg(attrs, body2.getBytes());
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
Map<String, String> expectedAttributes = new HashMap<>();
expectedAttributes.put("m", "0");
- expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
+ expectedAttributes.put(INLONGMSG_ATTR_STREAM_ID, "testInterfaceId");
expectedAttributes.put("t", "20200322");
expectedAttributes.put("__addcol1__", "1");
expectedAttributes.put("__addcol2__", "2");
@@ -136,7 +136,7 @@ public class TDMsgCsvFormatDeserializerTest {
testRowDeserialization(
deserializer,
- tdMsg1.buildArray(),
+ inLongMsg.buildArray(),
Arrays.asList(expectedRow1, expectedRow2)
);
}
@@ -144,19 +144,19 @@ public class TDMsgCsvFormatDeserializerTest {
@Test
public void testEmptyField() throws Exception {
- final TDMsgCsvFormatDeserializer deserializer =
- new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
+ final InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer(TEST_ROW_INFO);
- TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&" + INLONGMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
String body1 = "123,field11,field12,";
String body2 = "123,field21,,field23";
- tdMsg1.addMsg(attrs, body1.getBytes());
- tdMsg1.addMsg(attrs, body2.getBytes());
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
Map<String, String> expectedAttributes = new HashMap<>();
expectedAttributes.put("m", "0");
- expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
+ expectedAttributes.put(INLONGMSG_ATTR_STREAM_ID, "testInterfaceId");
expectedAttributes.put("t", "20200322");
expectedAttributes.put("__addcol1__", "1");
expectedAttributes.put("__addcol2__", "2");
@@ -185,7 +185,7 @@ public class TDMsgCsvFormatDeserializerTest {
testRowDeserialization(
deserializer,
- tdMsg1.buildArray(),
+ inLongMsg.buildArray(),
Arrays.asList(expectedRow1, expectedRow2)
);
}
@@ -193,19 +193,19 @@ public class TDMsgCsvFormatDeserializerTest {
@Test
public void testNoPredefinedFields() throws Exception {
- final TDMsgCsvFormatDeserializer deserializer =
- new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
+ final InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer(TEST_ROW_INFO);
- TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322";
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&" + INLONGMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322";
String body1 = "1,2,123,field11,field12,";
String body2 = "1,2,123,field21,,field23";
- tdMsg1.addMsg(attrs, body1.getBytes());
- tdMsg1.addMsg(attrs, body2.getBytes());
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
Map<String, String> expectedAttributes = new HashMap<>();
expectedAttributes.put("m", "0");
- expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
+ expectedAttributes.put(INLONGMSG_ATTR_STREAM_ID, "testInterfaceId");
expectedAttributes.put("t", "20200322");
Row expectedRow1 = Row.of(
@@ -232,15 +232,15 @@ public class TDMsgCsvFormatDeserializerTest {
testRowDeserialization(
deserializer,
- tdMsg1.buildArray(),
+ inLongMsg.buildArray(),
Arrays.asList(expectedRow1, expectedRow2)
);
}
@Test
public void testIgnoreAttributeErrors() throws Exception {
- TDMsgCsvFormatDeserializer deserializer =
- new TDMsgCsvFormatDeserializer(
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer(
TEST_ROW_INFO,
DEFAULT_TIME_FIELD_NAME,
DEFAULT_ATTRIBUTES_FIELD_NAME,
@@ -253,22 +253,22 @@ public class TDMsgCsvFormatDeserializerTest {
true
);
- TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
String attrs = "m=0&&&&";
String body1 = "123,field11,field12,field13";
- tdMsg1.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body1.getBytes());
testRowDeserialization(
deserializer,
- tdMsg1.buildArray(),
+ inLongMsg.buildArray(),
Collections.emptyList()
);
}
@Test
public void testIgnoreBodyErrors() throws Exception {
- final TDMsgCsvFormatDeserializer deserializer =
- new TDMsgCsvFormatDeserializer(
+ final InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer(
TEST_ROW_INFO,
DEFAULT_TIME_FIELD_NAME,
DEFAULT_ATTRIBUTES_FIELD_NAME,
@@ -281,16 +281,16 @@ public class TDMsgCsvFormatDeserializerTest {
true
);
- TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&" + INLONGMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
String body1 = "aaa,field11,field12,field13";
String body2 = "123,field21,field22,field23";
- tdMsg1.addMsg(attrs, body1.getBytes());
- tdMsg1.addMsg(attrs, body2.getBytes());
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
Map<String, String> expectedAttributes = new HashMap<>();
expectedAttributes.put("m", "0");
- expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
+ expectedAttributes.put(INLONGMSG_ATTR_STREAM_ID, "testInterfaceId");
expectedAttributes.put("t", "20200322");
expectedAttributes.put("__addcol1__", "1");
expectedAttributes.put("__addcol2__", "2");
@@ -308,15 +308,15 @@ public class TDMsgCsvFormatDeserializerTest {
testRowDeserialization(
deserializer,
- tdMsg1.buildArray(),
+ inLongMsg.buildArray(),
Collections.singletonList(expectedRow2)
);
}
@Test
public void testDeleteHeadDelimiter() throws Exception {
- final TDMsgCsvFormatDeserializer deserializer =
- new TDMsgCsvFormatDeserializer(
+ final InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer(
TEST_ROW_INFO,
DEFAULT_TIME_FIELD_NAME,
DEFAULT_ATTRIBUTES_FIELD_NAME,
@@ -329,15 +329,15 @@ public class TDMsgCsvFormatDeserializerTest {
true
);
- TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322";
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&" + INLONGMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322";
String body = ",1,2,3,field1,field2,field3";
- tdMsg1.addMsg(attrs, body.getBytes());
+ inLongMsg.addMsg(attrs, body.getBytes());
Map<String, String> expectedAttributes = new HashMap<>();
expectedAttributes.put("m", "0");
- expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
+ expectedAttributes.put(INLONGMSG_ATTR_STREAM_ID, "testInterfaceId");
expectedAttributes.put("t", "20200322");
Row expectedRow = Row.of(
@@ -353,15 +353,15 @@ public class TDMsgCsvFormatDeserializerTest {
testRowDeserialization(
deserializer,
- tdMsg1.buildArray(),
+ inLongMsg.buildArray(),
Collections.singletonList(expectedRow)
);
}
@Test
public void testRetainHeadDelimiter() throws Exception {
- final TDMsgCsvFormatDeserializer deserializer =
- new TDMsgCsvFormatDeserializer(
+ final InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer(
TEST_ROW_INFO,
DEFAULT_TIME_FIELD_NAME,
DEFAULT_ATTRIBUTES_FIELD_NAME,
@@ -374,15 +374,15 @@ public class TDMsgCsvFormatDeserializerTest {
false
);
- TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322";
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&" + INLONGMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322";
String body = ",1,2,field1,field2,field3";
- tdMsg1.addMsg(attrs, body.getBytes());
+ inLongMsg.addMsg(attrs, body.getBytes());
Map<String, String> expectedAttributes = new HashMap<>();
expectedAttributes.put("m", "0");
- expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
+ expectedAttributes.put(INLONGMSG_ATTR_STREAM_ID, "testInterfaceId");
expectedAttributes.put("t", "20200322");
Row expectedRow = Row.of(
@@ -398,26 +398,26 @@ public class TDMsgCsvFormatDeserializerTest {
testRowDeserialization(
deserializer,
- tdMsg1.buildArray(),
+ inLongMsg.buildArray(),
Collections.singletonList(expectedRow)
);
}
@Test
public void testUnmatchedFields1() throws Exception {
- final TDMsgCsvFormatDeserializer deserializer =
- new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
+ final InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer(TEST_ROW_INFO);
- TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&" + INLONGMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
String body1 = "123,field11,field12";
String body2 = "123,field21,field22,field23,field24";
- tdMsg1.addMsg(attrs, body1.getBytes());
- tdMsg1.addMsg(attrs, body2.getBytes());
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
Map<String, String> expectedAttributes = new HashMap<>();
expectedAttributes.put("m", "0");
- expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
+ expectedAttributes.put(INLONGMSG_ATTR_STREAM_ID, "testInterfaceId");
expectedAttributes.put("t", "20200322");
expectedAttributes.put("__addcol1__", "1");
expectedAttributes.put("__addcol2__", "2");
@@ -446,25 +446,25 @@ public class TDMsgCsvFormatDeserializerTest {
testRowDeserialization(
deserializer,
- tdMsg1.buildArray(),
+ inLongMsg.buildArray(),
Arrays.asList(expectedRow1, expectedRow2)
);
}
@Test
public void testUnmatchedFields2() throws Exception {
- final TDMsgCsvFormatDeserializer deserializer =
- new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
+ final InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer(TEST_ROW_INFO);
- TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&"
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&" + INLONGMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&"
+ "__addcol2__=2&__addcol3__=3&__addcol4__=4&__addcol5__=5&__addcol6__=6&__addcol7__=7";
String body = "field11,field12";
- tdMsg1.addMsg(attrs, body.getBytes());
+ inLongMsg.addMsg(attrs, body.getBytes());
Map<String, String> expectedAttributes = new HashMap<>();
expectedAttributes.put("m", "0");
- expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
+ expectedAttributes.put(INLONGMSG_ATTR_STREAM_ID, "testInterfaceId");
expectedAttributes.put("t", "20200322");
expectedAttributes.put("__addcol1__", "1");
expectedAttributes.put("__addcol2__", "2");
@@ -487,13 +487,13 @@ public class TDMsgCsvFormatDeserializerTest {
testRowDeserialization(
deserializer,
- tdMsg1.buildArray(),
+ inLongMsg.buildArray(),
Collections.singletonList(expectedRow)
);
}
private void testRowDeserialization(
- TDMsgCsvFormatDeserializer deserializer,
+ InLongMsgCsvFormatDeserializer deserializer,
byte[] bytes,
List<Row> expectedRows
) throws Exception {
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatFactoryTest.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
similarity index 86%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatFactoryTest.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
index 3c86df8..f3bff15 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatFactoryTest.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -37,13 +37,13 @@ import org.apache.inlong.sort.formats.common.FormatUtils;
import org.apache.inlong.sort.formats.common.IntFormatInfo;
import org.apache.inlong.sort.formats.common.RowFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
import org.junit.Test;
/**
- * Tests for {@link TDMsgCsvFormatFactory}.
+ * Tests for {@link InLongMsgCsvFormatFactory}.
*/
-public class TDMsgCsvFormatFactoryTest {
+public class InLongMsgCsvFormatFactoryTest {
private static final TypeInformation<Row> SCHEMA =
Types.ROW(
@@ -70,7 +70,7 @@ public class TDMsgCsvFormatFactoryTest {
@Test
public void testCreateTableFormatDeserializer() throws Exception {
final Map<String, String> properties =
- new TDMsgCsv()
+ new InLongMsgCsv()
.schema(FormatUtils.marshall(TEST_FORMAT_SCHEMA))
.delimiter(';')
.charset(StandardCharsets.ISO_8859_1)
@@ -80,11 +80,11 @@ public class TDMsgCsvFormatFactoryTest {
.toProperties();
assertNotNull(properties);
- final TDMsgCsvFormatDeserializer expectedDeser =
- new TDMsgCsvFormatDeserializer(
+ final InLongMsgCsvFormatDeserializer expectedDeser =
+ new InLongMsgCsvFormatDeserializer(
TEST_FORMAT_SCHEMA,
- TDMsgUtils.DEFAULT_TIME_FIELD_NAME,
- TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME,
+ InLongMsgUtils.DEFAULT_TIME_FIELD_NAME,
+ InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME,
StandardCharsets.ISO_8859_1.name(),
';',
'\\',
@@ -111,10 +111,10 @@ public class TDMsgCsvFormatFactoryTest {
.schema(TableSchema.fromTypeInfo(SCHEMA))
.toProperties()
);
- properties.putAll(new TDMsgCsv().deriveSchema().toProperties());
+ properties.putAll(new InLongMsgCsv().deriveSchema().toProperties());
- final TDMsgCsvFormatDeserializer expectedDeser =
- new TDMsgCsvFormatDeserializer(TEST_FORMAT_SCHEMA);
+ final InLongMsgCsvFormatDeserializer expectedDeser =
+ new InLongMsgCsvFormatDeserializer(TEST_FORMAT_SCHEMA);
final TableFormatDeserializer actualDeser =
TableFormatUtils.getTableFormatDeserializer(
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvTest.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvTest.java
similarity index 91%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvTest.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvTest.java
index 9478430..831f9c9 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvTest.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -27,13 +27,13 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.descriptors.Descriptor;
import org.apache.flink.table.descriptors.DescriptorTestBase;
import org.apache.flink.table.descriptors.DescriptorValidator;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgValidator;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgValidator;
import org.junit.Test;
/**
- * Tests for the {@link TDMsgCsv} descriptor.
+ * Tests for the {@link InLongMsgCsv} descriptor.
*/
-public class TDMsgCsvTest extends DescriptorTestBase {
+public class InLongMsgCsvTest extends DescriptorTestBase {
private static final String TEST_SCHEMA =
"{"
@@ -54,7 +54,7 @@ public class TDMsgCsvTest extends DescriptorTestBase {
+ "}";
private static final Descriptor CUSTOM_DESCRIPTOR_WITH_SCHEMA =
- new TDMsgCsv()
+ new InLongMsgCsv()
.schema(TEST_SCHEMA)
.timeFieldName("time")
.attributesFieldName("attributes")
@@ -67,7 +67,7 @@ public class TDMsgCsvTest extends DescriptorTestBase {
.ignoreErrors();
private static final Descriptor MINIMAL_DESCRIPTOR_WITH_DERIVED_SCHEMA =
- new TDMsgCsv()
+ new InLongMsgCsv()
.deriveSchema();
@Test(expected = ValidationException.class)
@@ -100,7 +100,7 @@ public class TDMsgCsvTest extends DescriptorTestBase {
@Override
public List<Map<String, String>> properties() {
final Map<String, String> props1 = new HashMap<>();
- props1.put("format.type", "tdmsgcsv");
+ props1.put("format.type", "inlongmsgcsv");
props1.put("format.property-version", "1");
props1.put("format.schema", TEST_SCHEMA);
props1.put("format.time-field-name", "time");
@@ -114,7 +114,7 @@ public class TDMsgCsvTest extends DescriptorTestBase {
props1.put("format.ignore-errors", "true");
final Map<String, String> props2 = new HashMap<>();
- props2.put("format.type", "tdmsgcsv");
+ props2.put("format.type", "inlongmsgcsv");
props2.put("format.property-version", "1");
props2.put("format.derive-schema", "true");
@@ -123,6 +123,6 @@ public class TDMsgCsvTest extends DescriptorTestBase {
@Override
public DescriptorValidator validator() {
- return new TDMsgValidator();
+ return new InLongMsgValidator();
}
}
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/resources/log4j-test.properties b/inlong-sort/sort-formats/format-inlongmsg-csv/src/test/resources/log4j-test.properties
similarity index 100%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/test/resources/log4j-test.properties
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/test/resources/log4j-test.properties
diff --git a/inlong-sort/sort-formats/pom.xml b/inlong-sort/sort-formats/pom.xml
index 0349dd3..d6aad64 100644
--- a/inlong-sort/sort-formats/pom.xml
+++ b/inlong-sort/sort-formats/pom.xml
@@ -54,8 +54,8 @@
<module>format-base</module>
<module>format-csv</module>
<module>format-kv</module>
- <module>format-tdmsg-base</module>
- <module>format-tdmsg-csv</module>
+ <module>format-inlongmsg-base</module>
+ <module>format-inlongmsg-csv</module>
</modules>
<properties>
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc
index 9049ab0..fa8841c 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc
@@ -1,147 +1,148 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <stdio.h>
-#include <string.h>
-#include <unistd.h>
-#include <signal.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <time.h>
-#include <fcntl.h>
-#include <errno.h>
-#include <libgen.h>
-#include <sys/time.h>
-#include <chrono>
-#include <set>
-#include <string>
-#include <thread>
-#include "tubemq/tubemq_client.h"
-#include "tubemq/tubemq_config.h"
-#include "tubemq/tubemq_errcode.h"
-#include "tubemq/tubemq_message.h"
-#include "tubemq/tubemq_return.h"
-
-
-using namespace std;
-using namespace tubemq;
-
-using std::set;
-using std::string;
-using tubemq::ConsumerConfig;
-using tubemq::ConsumerResult;
-using tubemq::TubeMQConsumer;
-
-
-
-
-
-
-int main(int argc, char* argv[]) {
- bool result;
- string err_info;
- if (argc < 4) {
- printf("\n must ./comd master_addr group_name topic_name [config_file_path]");
- return -1;
- }
- // set parameters
- string master_addr = argv[1];
- string group_name = argv[2];
- string topic_name = argv[3];
- string conf_file = "../conf/client.conf";
- if (argc > 4) {
- conf_file = argv[4];
- }
-
- TubeMQConsumer consumer_1;
- set<string> topic_list;
- topic_list.insert(topic_name);
- ConsumerConfig consumer_config;
- TubeMQServiceConfig serviceConfig;
-
- consumer_config.SetRpcReadTimeoutMs(20000);
- result = consumer_config.SetMasterAddrInfo(err_info, master_addr);
- if (!result) {
- printf("\n Set Master AddrInfo failure: %s", err_info.c_str());
- return -1;
- }
- result = consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
- if (!result) {
- printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
- return -1;
- }
- result = StartTubeMQService(err_info, serviceConfig);
- if (!result) {
- printf("\n StartTubeMQService failure: %s", err_info.c_str());
- return -1;
- }
-
- result = consumer_1.Start(err_info, consumer_config);
- if (!result) {
- printf("\n Initial consumer failure, error is: %s ", err_info.c_str());
- return -2;
- }
-
- ConsumerResult gentRet;
- ConsumerResult confirm_result;
- int64_t start_time = time(NULL);
- do {
- // 1. get Message;
- result = consumer_1.GetMessage(gentRet);
- if (result) {
- // 2.1.1 if success, process message
- list<Message> msgs = gentRet.GetMessageList();
- printf("\n GetMessage success, msssage count =%ld ", msgs.size());
- // 2.1.2 confirm message result
- consumer_1.Confirm(gentRet.GetConfirmContext(), true, confirm_result);
- } else {
- // 2.2.1 if failure, check error code
- // print error message if errcode not in
- // [no partitions assigned, all partitions in use,
- // or all partitons idle, reach max position]
- if (!(gentRet.GetErrCode() == err_code::kErrNotFound
- || gentRet.GetErrCode() == err_code::kErrNoPartAssigned
- || gentRet.GetErrCode() == err_code::kErrAllPartInUse
- || gentRet.GetErrCode() == err_code::kErrAllPartWaiting)) {
- printf("\n GetMessage failure, err_code=%d, err_msg is: %s ",
- gentRet.GetErrCode(), gentRet.GetErrMessage().c_str());
- }
- }
- // used for test, consume 10 minutes only
- if (time(NULL) - start_time > 10 * 60) {
- break;
- }
- } while (true);
-
- getchar(); // for test hold the test thread
- consumer_1.ShutDown();
-
- getchar(); // for test hold the test thread
- result = StopTubeMQService(err_info);
- if (!result) {
- printf("\n *** StopTubeMQService failure, reason is %s ", err_info.c_str());
- }
-
- printf("\n finishe test exist ");
- return 0;
-}
-
-
-
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <time.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <libgen.h>
+#include <sys/time.h>
+#include <chrono>
+#include <set>
+#include <string>
+#include <thread>
+#include "tubemq/tubemq_client.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_errcode.h"
+#include "tubemq/tubemq_message.h"
+#include "tubemq/tubemq_return.h"
+
+
+using namespace std;
+using namespace tubemq;
+
+using std::set;
+using std::string;
+using tubemq::ConsumerConfig;
+using tubemq::ConsumerResult;
+using tubemq::TubeMQConsumer;
+
+
+
+
+
+
+int main(int argc, char* argv[]) {
+ bool result;
+ string err_info;
+ if (argc < 4) {
+ printf("\n must ./comd master_addr group_name topic_name [config_file_path]");
+ return -1;
+ }
+ // set parameters
+ string master_addr = argv[1];
+ string group_name = argv[2];
+ string topic_name = argv[3];
+ string conf_file = "../conf/client.conf";
+ if (argc > 4) {
+ conf_file = argv[4];
+ }
+
+ TubeMQConsumer consumer_1;
+ set<string> topic_list;
+ topic_list.insert(topic_name);
+ ConsumerConfig consumer_config;
+ TubeMQServiceConfig serviceConfig;
+
+ consumer_config.SetRpcReadTimeoutMs(20000);
+ result = consumer_config.SetMasterAddrInfo(err_info, master_addr);
+ if (!result) {
+ printf("\n Set Master AddrInfo failure: %s", err_info.c_str());
+ return -1;
+ }
+ result = consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
+ if (!result) {
+ printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
+ return -1;
+ }
+ result = StartTubeMQService(err_info, serviceConfig);
+ if (!result) {
+ printf("\n StartTubeMQService failure: %s", err_info.c_str());
+ return -1;
+ }
+
+ result = consumer_1.Start(err_info, consumer_config);
+ if (!result) {
+
+ printf("\n Initial consumer failure, error is: %s ", err_info.c_str());
+ return -2;
+ }
+
+ ConsumerResult gentRet;
+ ConsumerResult confirm_result;
+ int64_t start_time = time(NULL);
+ do {
+ // 1. get Message;
+ result = consumer_1.GetMessage(gentRet);
+ if (result) {
+ // 2.1.1 if success, process message
+ list<Message> msgs = gentRet.GetMessageList();
+ printf("\n GetMessage success, msssage count =%ld ", msgs.size());
+ // 2.1.2 confirm message result
+ consumer_1.Confirm(gentRet.GetConfirmContext(), true, confirm_result);
+ } else {
+ // 2.2.1 if failure, check error code
+ // print error message if errcode not in
+ // [no partitions assigned, all partitions in use,
+ // or all partitons idle, reach max position]
+ if (!(gentRet.GetErrCode() == err_code::kErrNotFound
+ || gentRet.GetErrCode() == err_code::kErrNoPartAssigned
+ || gentRet.GetErrCode() == err_code::kErrAllPartInUse
+ || gentRet.GetErrCode() == err_code::kErrAllPartWaiting)) {
+ printf("\n GetMessage failure, err_code=%d, err_msg is: %s ",
+ gentRet.GetErrCode(), gentRet.GetErrMessage().c_str());
+ }
+ }
+ // used for test, consume 10 minutes only
+ if (time(NULL) - start_time > 10 * 60) {
+ break;
+ }
+ } while (true);
+
+ getchar(); // for test hold the test thread
+ consumer_1.ShutDown();
+
+ getchar(); // for test hold the test thread
+ result = StopTubeMQService(err_info);
+ if (!result) {
+ printf("\n *** StopTubeMQService failure, reason is %s ", err_info.c_str());
+ }
+
+ printf("\n finishe test exist ");
+ return 0;
+}
+
+
+
+
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multi_thread_filter.cc b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multi_thread_filter.cc
index c20fb15..cbd4a45 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multi_thread_filter.cc
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multi_thread_filter.cc
@@ -1,266 +1,267 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <stdio.h>
-#include <string.h>
-#include <unistd.h>
-#include <signal.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <time.h>
-#include <fcntl.h>
-#include <errno.h>
-#include <libgen.h>
-#include <sys/time.h>
-#include <chrono>
-#include <map>
-#include <list>
-#include <set>
-#include <string>
-#include <thread>
-#include "tubemq/tubemq_atomic.h"
-#include "tubemq/tubemq_client.h"
-#include "tubemq/tubemq_config.h"
-#include "tubemq/tubemq_errcode.h"
-#include "tubemq/tubemq_message.h"
-#include "tubemq/tubemq_return.h"
-#include "tubemq/tubemq_tdmsg.h"
-
-
-
-
-using namespace std;
-using namespace tubemq;
-
-TubeMQConsumer consumer_1;
-
-
-
-AtomicLong last_print_time(0);
-AtomicLong last_msg_count(0);
-AtomicLong last_print_count(0);
-
-
-
-bool parse_TDMsg1_type_msg(const list<Message>& messageSet) {
- string err_info;
- list<Message>::const_iterator it_list;
- map<string, string>::const_iterator it_attr;
- list<DataItem>::const_iterator it_item;
- map<string, list<DataItem> >::const_iterator it_map;
- for (it_list = messageSet.begin(); it_list != messageSet.end(); ++it_list) {
- printf("\nMessage id is %ld, topic is %s",
- it_list->GetMessageId(), it_list->GetTopic().c_str());
- TubeMQTDMsg tubemq_tdmsg;
- if (tubemq_tdmsg.ParseTDMsg(it_list->GetData(), it_list->GetDataLength(), err_info)) {
- printf("\n parse data success, version is %d, createTime is %ld",
- tubemq_tdmsg.GetVersion(), tubemq_tdmsg.GetCreateTime());
- map<string, list<DataItem> > data_map = tubemq_tdmsg.GetAttr2DataMap();
- for (it_map = data_map.begin(); it_map != data_map.end(); ++it_map) {
- map<string, string> key_vals;
- if (!tubemq_tdmsg.ParseAttrValue(it_map->first, key_vals, err_info)) {
- printf("\n parse attribute error, attr is %s, reason is %s",
- (it_map->first).c_str(), err_info.c_str());
- continue;
- }
- printf("\n parsed attribute:");
- for (it_attr = key_vals.begin(); it_attr != key_vals.end(); ++it_attr) {
- printf("\nkey is %s, value is %s",
- (it_attr->first).c_str(), (it_attr->second).c_str());
- }
- list<DataItem> data_items = it_map->second;
- printf("\n parsed msg count is %ld", data_items.size());
- for (it_item = data_items.begin(); it_item != data_items.end(); ++it_item) {
- printf("\n parsed msg data' length is %d, value is: %s",
- it_item->GetLength(), it_item->GetData());
- }
- }
- } else {
- printf("\n \n parse data error, reason is %s\n", err_info.c_str());
- break;
- }
- }
- return true;
-}
-
-
-bool parse_raw_type_msg(const list<Message>& messageSet) {
- int64_t last_time = last_print_time.Get();
- int64_t cur_count = last_msg_count.AddAndGet(messageSet.size());
- int64_t cur_time = time(NULL);
- if (cur_count - last_print_count.Get() >= 50000
- || cur_time - last_time > 90) {
- if (last_print_time.CompareAndSet(last_time, cur_time)) {
- printf("\n %ld Current message count=%ld", cur_time, last_msg_count.Get());
- last_print_count.Set(cur_count);
- }
- }
- return true;
-}
-
-void thread_task_pull(int32_t thread_no) {
- bool result;
- ConsumerResult gentRet;
- ConsumerResult confirm_result;
- printf("\n thread_task_pull start: %d", thread_no);
- do {
- // 1. get Message;
- result = consumer_1.GetMessage(gentRet);
- if (result) {
- // 2.1.1 if success, process message
- list<Message> msgs = gentRet.GetMessageList();
- // 2.1.2 confirm message result
- consumer_1.Confirm(gentRet.GetConfirmContext(), true, confirm_result);
- parse_TDMsg1_type_msg(msgs);
- // parse_raw_type_msg(msgs);
- } else {
- // 2.2.1 if failure, check error code
- // print error message if errcode not in
- // [no partitions assigned, all partitions in use,
- // or all partitons idle, reach max position]
- if (!(gentRet.GetErrCode() == err_code::kErrNotFound
- || gentRet.GetErrCode() == err_code::kErrNoPartAssigned
- || gentRet.GetErrCode() == err_code::kErrAllPartInUse
- || gentRet.GetErrCode() == err_code::kErrAllPartWaiting)) {
- if (gentRet.GetErrCode() == err_code::kErrMQServiceStop
- || gentRet.GetErrCode() == err_code::kErrClientStop) {
- break;
- }
- printf("\n GetMessage failure, err_code=%d, err_msg is: %s ",
- gentRet.GetErrCode(), gentRet.GetErrMessage().c_str());
- }
- }
- } while (true);
- printf("\n thread_task_pull finished: %d", thread_no);
-}
-
-
-int main(int argc, char* argv[]) {
- bool result;
- string err_info;
- if (argc < 4) {
- printf("\n must ./comd master_addr group_name topic_name [config_file_path]");
- return -1;
- }
- // set parameters
- string master_addr = argv[1];
- string group_name = argv[2];
- string topic_name = argv[3];
- string conf_file = "../conf/client.conf";
- if (argc > 4) {
- conf_file = argv[4];
- }
- int32_t thread_num = 15;
- ConsumerConfig consumer_config;
- TubeMQServiceConfig serviceConfig;
- consumer_config.SetRpcReadTimeoutMs(20000);
- result = consumer_config.SetMasterAddrInfo(err_info, master_addr);
- if (!result) {
- printf("\n Set Master AddrInfo failure: %s ", err_info.c_str());
- return -1;
- }
-
- // non-filter consume begin
- set<string> topic_list;
- topic_list.insert(topic_name);
- result = consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
- if (!result) {
- printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
- return -1;
- }
- // non-filter consume end
-
-
-/*
- // filter consume begin
- set<string> filters;
- filters.insert("aaa");
- filters.insert("bbb");
- filters.insert("xxb");
- map<string, set<string> > subscribed_topic_and_filter_map;
- subscribed_topic_and_filter_map["test_1"] = filters;
- result = consumer_config.SetGroupConsumeTarget(err_info,
- group_name, subscribed_topic_and_filter_map);
- if (!result) {
- printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
- return -1;
- }
- // filter consume end
-
- // bound consume begin
- set<string> filters;
- filters.insert("aaa");
- filters.insert("bbb");
- filters.insert("xxb");
- map<string, set<string> > subscribed_topic_and_filter_map;
- subscribed_topic_and_filter_map[topic_name] = filters;
- string session_key = "test";
- uint32_t source_count = 2;
- bool is_select_big = true;
- map<string, int64_t> part_offset_map;
- part_offset_map["181895251:test_1:0"] = 0;
- part_offset_map["181895251:test_1:10"] = 0;
- part_offset_map["181895251:test_1:20"] = 0;
- part_offset_map["181895251:test_1:30"] = 0;
- result = consumer_config.SetGroupConsumeTarget(err_info, group_name,
- subscribed_topic_and_filter_map, session_key,
- source_count, is_select_big, part_offset_map);
- if (!result) {
- printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
- return -1;
- }
-// bound consume end
-*/
-
- result = StartTubeMQService(err_info, serviceConfig);
- if (!result) {
- printf("\n StartTubeMQService failure: %s", err_info.c_str());
- return -1;
- }
-
- result = consumer_1.Start(err_info, consumer_config);
- if (!result) {
- consumer_1.ShutDown();
- printf("\n Initial consumer failure, error is: %s ", err_info.c_str());
- return -2;
- }
- std::thread pull_threads[thread_num];
- for (int32_t i = 0; i < thread_num; i++) {
- pull_threads[i] = std::thread(thread_task_pull, i);
- }
-
- getchar(); // for test hold the test thread
- consumer_1.ShutDown();
- //
- for (int32_t i = 0; i < thread_num; i++) {
- if (pull_threads[i].joinable()) {
- pull_threads[i].join();
- }
- }
-
- getchar(); // for test hold the test thread
- result = StopTubeMQService(err_info);
- if (!result) {
- printf("\n *** StopTubeMQService failure, reason is %s", err_info.c_str());
- }
-
- printf("\n finishe test exist");
- return 0;
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <time.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <libgen.h>
+#include <sys/time.h>
+#include <chrono>
+#include <map>
+#include <list>
+#include <set>
+#include <string>
+#include <thread>
+#include "tubemq/tubemq_atomic.h"
+#include "tubemq/tubemq_client.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_errcode.h"
+#include "tubemq/tubemq_message.h"
+#include "tubemq/tubemq_return.h"
+#include "tubemq/tubemq_tdmsg.h"
+
+
+
+
+using namespace std;
+using namespace tubemq;
+
+TubeMQConsumer consumer_1;
+
+
+
+AtomicLong last_print_time(0);
+AtomicLong last_msg_count(0);
+AtomicLong last_print_count(0);
+
+
+
+bool parse_TDMsg1_type_msg(const list<Message>& messageSet) {
+ string err_info;
+ list<Message>::const_iterator it_list;
+ map<string, string>::const_iterator it_attr;
+ list<DataItem>::const_iterator it_item;
+ map<string, list<DataItem> >::const_iterator it_map;
+ for (it_list = messageSet.begin(); it_list != messageSet.end(); ++it_list) {
+ printf("\nMessage id is %ld, topic is %s",
+ it_list->GetMessageId(), it_list->GetTopic().c_str());
+ TubeMQTDMsg tubemq_tdmsg;
+ if (tubemq_tdmsg.ParseTDMsg(it_list->GetData(), it_list->GetDataLength(), err_info)) {
+ printf("\n parse data success, version is %d, createTime is %ld",
+ tubemq_tdmsg.GetVersion(), tubemq_tdmsg.GetCreateTime());
+ map<string, list<DataItem> > data_map = tubemq_tdmsg.GetAttr2DataMap();
+ for (it_map = data_map.begin(); it_map != data_map.end(); ++it_map) {
+ map<string, string> key_vals;
+ if (!tubemq_tdmsg.ParseAttrValue(it_map->first, key_vals, err_info)) {
+ printf("\n parse attribute error, attr is %s, reason is %s",
+ (it_map->first).c_str(), err_info.c_str());
+ continue;
+ }
+ printf("\n parsed attribute:");
+ for (it_attr = key_vals.begin(); it_attr != key_vals.end(); ++it_attr) {
+ printf("\nkey is %s, value is %s",
+ (it_attr->first).c_str(), (it_attr->second).c_str());
+ }
+ list<DataItem> data_items = it_map->second;
+ printf("\n parsed msg count is %ld", data_items.size());
+ for (it_item = data_items.begin(); it_item != data_items.end(); ++it_item) {
+ printf("\n parsed msg data' length is %d, value is: %s",
+ it_item->GetLength(), it_item->GetData());
+ }
+ }
+ } else {
+ printf("\n \n parse data error, reason is %s\n", err_info.c_str());
+ break;
+ }
+ }
+ return true;
+}
+
+
+bool parse_raw_type_msg(const list<Message>& messageSet) {
+ int64_t last_time = last_print_time.Get();
+ int64_t cur_count = last_msg_count.AddAndGet(messageSet.size());
+ int64_t cur_time = time(NULL);
+ if (cur_count - last_print_count.Get() >= 50000
+ || cur_time - last_time > 90) {
+ if (last_print_time.CompareAndSet(last_time, cur_time)) {
+ printf("\n %ld Current message count=%ld", cur_time, last_msg_count.Get());
+ last_print_count.Set(cur_count);
+ }
+ }
+ return true;
+}
+
+void thread_task_pull(int32_t thread_no) {
+ bool result;
+ ConsumerResult gentRet;
+ ConsumerResult confirm_result;
+ printf("\n thread_task_pull start: %d", thread_no);
+ do {
+ // 1. get Message;
+ result = consumer_1.GetMessage(gentRet);
+ if (result) {
+ // 2.1.1 if success, process message
+ list<Message> msgs = gentRet.GetMessageList();
+ // 2.1.2 confirm message result
+ consumer_1.Confirm(gentRet.GetConfirmContext(), true, confirm_result);
+ parse_TDMsg1_type_msg(msgs);
+ // parse_raw_type_msg(msgs);
+ } else {
+ // 2.2.1 if failure, check error code
+ // print error message if errcode not in
+ // [no partitions assigned, all partitions in use,
+ // or all partitons idle, reach max position]
+ if (!(gentRet.GetErrCode() == err_code::kErrNotFound
+ || gentRet.GetErrCode() == err_code::kErrNoPartAssigned
+ || gentRet.GetErrCode() == err_code::kErrAllPartInUse
+ || gentRet.GetErrCode() == err_code::kErrAllPartWaiting)) {
+ if (gentRet.GetErrCode() == err_code::kErrMQServiceStop
+ || gentRet.GetErrCode() == err_code::kErrClientStop) {
+ break;
+ }
+ printf("\n GetMessage failure, err_code=%d, err_msg is: %s ",
+ gentRet.GetErrCode(), gentRet.GetErrMessage().c_str());
+ }
+ }
+ } while (true);
+ printf("\n thread_task_pull finished: %d", thread_no);
+}
+
+
+int main(int argc, char* argv[]) {
+ bool result;
+ string err_info;
+ if (argc < 4) {
+ printf("\n must ./comd master_addr group_name topic_name [config_file_path]");
+ return -1;
+ }
+ // set parameters
+ string master_addr = argv[1];
+ string group_name = argv[2];
+ string topic_name = argv[3];
+ string conf_file = "../conf/client.conf";
+ if (argc > 4) {
+ conf_file = argv[4];
+ }
+ int32_t thread_num = 15;
+ ConsumerConfig consumer_config;
+ TubeMQServiceConfig serviceConfig;
+ consumer_config.SetRpcReadTimeoutMs(20000);
+ result = consumer_config.SetMasterAddrInfo(err_info, master_addr);
+ if (!result) {
+ printf("\n Set Master AddrInfo failure: %s ", err_info.c_str());
+ return -1;
+ }
+
+ // non-filter consume begin
+ set<string> topic_list;
+ topic_list.insert(topic_name);
+ result = consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
+ if (!result) {
+ printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
+ return -1;
+ }
+ // non-filter consume end
+
+
+/*
+ // filter consume begin
+ set<string> filters;
+ filters.insert("aaa");
+ filters.insert("bbb");
+ filters.insert("xxb");
+ map<string, set<string> > subscribed_topic_and_filter_map;
+ subscribed_topic_and_filter_map["test_1"] = filters;
+ result = consumer_config.SetGroupConsumeTarget(err_info,
+ group_name, subscribed_topic_and_filter_map);
+ if (!result) {
+ printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
+ return -1;
+ }
+ // filter consume end
+
+ // bound consume begin
+ set<string> filters;
+ filters.insert("aaa");
+ filters.insert("bbb");
+ filters.insert("xxb");
+ map<string, set<string> > subscribed_topic_and_filter_map;
+ subscribed_topic_and_filter_map[topic_name] = filters;
+ string session_key = "test";
+ uint32_t source_count = 2;
+ bool is_select_big = true;
+ map<string, int64_t> part_offset_map;
+ part_offset_map["181895251:test_1:0"] = 0;
+ part_offset_map["181895251:test_1:10"] = 0;
+ part_offset_map["181895251:test_1:20"] = 0;
+ part_offset_map["181895251:test_1:30"] = 0;
+ result = consumer_config.SetGroupConsumeTarget(err_info, group_name,
+ subscribed_topic_and_filter_map, session_key,
+ source_count, is_select_big, part_offset_map);
+ if (!result) {
+ printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
+ return -1;
+ }
+// bound consume end
+*/
+
+ result = StartTubeMQService(err_info, serviceConfig);
+ if (!result) {
+ printf("\n StartTubeMQService failure: %s", err_info.c_str());
+ return -1;
+ }
+
+ result = consumer_1.Start(err_info, consumer_config);
+ if (!result) {
+
+ consumer_1.ShutDown();
+ printf("\n Initial consumer failure, error is: %s ", err_info.c_str());
+ return -2;
+ }
+ std::thread pull_threads[thread_num];
+ for (int32_t i = 0; i < thread_num; i++) {
+ pull_threads[i] = std::thread(thread_task_pull, i);
+ }
+
+ getchar(); // for test hold the test thread
+ consumer_1.ShutDown();
+ //
+ for (int32_t i = 0; i < thread_num; i++) {
+ if (pull_threads[i].joinable()) {
+ pull_threads[i].join();
+ }
+ }
+
+ getchar(); // for test hold the test thread
+ result = StopTubeMQService(err_info);
+ if (!result) {
+ printf("\n *** StopTubeMQService failure, reason is %s", err_info.c_str());
+ }
+
+ printf("\n finishe test exist");
+ return 0;
+}
+
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc
index 1e33cea..ef09691 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc
@@ -1,177 +1,178 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <errno.h>
-#include <fcntl.h>
-#include <libgen.h>
-#include <signal.h>
-#include <stdio.h>
-#include <string.h>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <unistd.h>
-#include <time.h>
-#include <string>
-#include <chrono>
-#include <set>
-#include <thread>
-#include "tubemq/tubemq_atomic.h"
-#include "tubemq/tubemq_client.h"
-#include "tubemq/tubemq_config.h"
-#include "tubemq/tubemq_errcode.h"
-#include "tubemq/tubemq_message.h"
-#include "tubemq/tubemq_return.h"
-
-
-using namespace std;
-using namespace tubemq;
-
-TubeMQConsumer consumer_1;
-
-
-
-AtomicLong last_print_time(0);
-AtomicLong last_msg_count(0);
-AtomicLong last_print_count(0);
-
-
-
-void calc_message_count(int64_t msg_count) {
- int64_t last_time = last_print_time.Get();
- int64_t cur_count = last_msg_count.AddAndGet(msg_count);
- int64_t cur_time = time(NULL);
- if (cur_count - last_print_count.Get() >= 50000
- || cur_time - last_time > 90) {
- if (last_print_time.CompareAndSet(last_time, cur_time)) {
- printf("\n %ld Current message count=%ld", cur_time, last_msg_count.Get());
- last_print_count.Set(cur_count);
- }
- }
-}
-
-void thread_task_pull(int32_t thread_no) {
- bool result;
- int64_t msg_count = 0;
- ConsumerResult gentRet;
- ConsumerResult confirm_result;
- printf("\n thread_task_pull start: %d", thread_no);
- do {
- msg_count = 0;
- // 1. get Message;
- result = consumer_1.GetMessage(gentRet);
- if (result) {
- // 2.1.1 if success, process message
- list<Message> msgs = gentRet.GetMessageList();
- msg_count = msgs.size();
- // 2.1.2 confirm message result
- consumer_1.Confirm(gentRet.GetConfirmContext(), true, confirm_result);
- } else {
- // 2.2.1 if failure, check error code
- // print error message if errcode not in
- // [no partitions assigned, all partitions in use,
- // or all partitons idle, reach max position]
- if (!(gentRet.GetErrCode() == err_code::kErrNotFound
- || gentRet.GetErrCode() == err_code::kErrNoPartAssigned
- || gentRet.GetErrCode() == err_code::kErrAllPartInUse
- || gentRet.GetErrCode() == err_code::kErrAllPartWaiting)) {
- if (gentRet.GetErrCode() == err_code::kErrMQServiceStop
- || gentRet.GetErrCode() == err_code::kErrClientStop) {
- break;
- }
- printf("\n GetMessage failure, err_code=%d, err_msg is: %s ",
- gentRet.GetErrCode(), gentRet.GetErrMessage().c_str());
- }
- }
- calc_message_count(msg_count);
- } while (true);
- printf("\n thread_task_pull finished: %d", thread_no);
-}
-
-
-int main(int argc, char* argv[]) {
- bool result;
- string err_info;
-
- if (argc < 4) {
- printf("\n must ./comd master_addr group_name topic_name [config_file_path]");
- return -1;
- }
- // set parameters
- string master_addr = argv[1];
- string group_name = argv[2];
- string topic_name = argv[3];
- string conf_file = "../conf/client.conf";
- if (argc > 4) {
- conf_file = argv[4];
- }
-
- int32_t thread_num = 15;
- set<string> topic_list;
- topic_list.insert(topic_name);
- ConsumerConfig consumer_config;
- TubeMQServiceConfig serviceConfig;
-
- consumer_config.SetRpcReadTimeoutMs(20000);
- result = consumer_config.SetMasterAddrInfo(err_info, master_addr);
- if (!result) {
- printf("\n Set Master AddrInfo failure: %s ", err_info.c_str());
- return -1;
- }
- result = consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
- if (!result) {
- printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
- return -1;
- }
- result = StartTubeMQService(err_info, serviceConfig);
- if (!result) {
- printf("\n StartTubeMQService failure: %s", err_info.c_str());
- return -1;
- }
-
- result = consumer_1.Start(err_info, consumer_config);
- if (!result) {
- printf("\n Initial consumer failure, error is: %s ", err_info.c_str());
- return -2;
- }
-
- std::thread pull_threads[thread_num];
- for (int32_t i = 0; i < thread_num; i++) {
- pull_threads[i] = std::thread(thread_task_pull, i);
- }
-
- getchar(); // for test hold the test thread
- consumer_1.ShutDown();
- //
- for (int32_t i = 0; i < thread_num; i++) {
- if (pull_threads[i].joinable()) {
- pull_threads[i].join();
- }
- }
-
- getchar(); // for test hold the test thread
- result = StopTubeMQService(err_info);
- if (!result) {
- printf("\n *** StopTubeMQService failure, reason is %s", err_info.c_str());
- }
-
- printf("\n finishe test exist");
- return 0;
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <errno.h>
+#include <fcntl.h>
+#include <libgen.h>
+#include <signal.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <time.h>
+#include <string>
+#include <chrono>
+#include <set>
+#include <thread>
+#include "tubemq/tubemq_atomic.h"
+#include "tubemq/tubemq_client.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_errcode.h"
+#include "tubemq/tubemq_message.h"
+#include "tubemq/tubemq_return.h"
+
+
+using namespace std;
+using namespace tubemq;
+
+TubeMQConsumer consumer_1;
+
+
+
+AtomicLong last_print_time(0);
+AtomicLong last_msg_count(0);
+AtomicLong last_print_count(0);
+
+
+
+void calc_message_count(int64_t msg_count) {
+ int64_t last_time = last_print_time.Get();
+ int64_t cur_count = last_msg_count.AddAndGet(msg_count);
+ int64_t cur_time = time(NULL);
+ if (cur_count - last_print_count.Get() >= 50000
+ || cur_time - last_time > 90) {
+ if (last_print_time.CompareAndSet(last_time, cur_time)) {
+ printf("\n %ld Current message count=%ld", cur_time, last_msg_count.Get());
+ last_print_count.Set(cur_count);
+ }
+ }
+}
+
+void thread_task_pull(int32_t thread_no) {
+ bool result;
+ int64_t msg_count = 0;
+ ConsumerResult gentRet;
+ ConsumerResult confirm_result;
+ printf("\n thread_task_pull start: %d", thread_no);
+ do {
+ msg_count = 0;
+ // 1. get Message;
+ result = consumer_1.GetMessage(gentRet);
+ if (result) {
+ // 2.1.1 if success, process message
+ list<Message> msgs = gentRet.GetMessageList();
+ msg_count = msgs.size();
+ // 2.1.2 confirm message result
+ consumer_1.Confirm(gentRet.GetConfirmContext(), true, confirm_result);
+ } else {
+ // 2.2.1 if failure, check error code
+ // print error message if errcode not in
+ // [no partitions assigned, all partitions in use,
+ // or all partitons idle, reach max position]
+ if (!(gentRet.GetErrCode() == err_code::kErrNotFound
+ || gentRet.GetErrCode() == err_code::kErrNoPartAssigned
+ || gentRet.GetErrCode() == err_code::kErrAllPartInUse
+ || gentRet.GetErrCode() == err_code::kErrAllPartWaiting)) {
+ if (gentRet.GetErrCode() == err_code::kErrMQServiceStop
+ || gentRet.GetErrCode() == err_code::kErrClientStop) {
+ break;
+ }
+ printf("\n GetMessage failure, err_code=%d, err_msg is: %s ",
+ gentRet.GetErrCode(), gentRet.GetErrMessage().c_str());
+ }
+ }
+ calc_message_count(msg_count);
+ } while (true);
+ printf("\n thread_task_pull finished: %d", thread_no);
+}
+
+
+int main(int argc, char* argv[]) {
+ bool result;
+ string err_info;
+
+ if (argc < 4) {
+ printf("\n must ./comd master_addr group_name topic_name [config_file_path]");
+ return -1;
+ }
+ // set parameters
+ string master_addr = argv[1];
+ string group_name = argv[2];
+ string topic_name = argv[3];
+ string conf_file = "../conf/client.conf";
+ if (argc > 4) {
+ conf_file = argv[4];
+ }
+
+ int32_t thread_num = 15;
+ set<string> topic_list;
+ topic_list.insert(topic_name);
+ ConsumerConfig consumer_config;
+ TubeMQServiceConfig serviceConfig;
+
+ consumer_config.SetRpcReadTimeoutMs(20000);
+ result = consumer_config.SetMasterAddrInfo(err_info, master_addr);
+ if (!result) {
+ printf("\n Set Master AddrInfo failure: %s ", err_info.c_str());
+ return -1;
+ }
+ result = consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
+ if (!result) {
+ printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
+ return -1;
+ }
+ result = StartTubeMQService(err_info, serviceConfig);
+ if (!result) {
+ printf("\n StartTubeMQService failure: %s", err_info.c_str());
+ return -1;
+ }
+
+ result = consumer_1.Start(err_info, consumer_config);
+ if (!result) {
+
+ printf("\n Initial consumer failure, error is: %s ", err_info.c_str());
+ return -2;
+ }
+
+ std::thread pull_threads[thread_num];
+ for (int32_t i = 0; i < thread_num; i++) {
+ pull_threads[i] = std::thread(thread_task_pull, i);
+ }
+
+ getchar(); // for test hold the test thread
+ consumer_1.ShutDown();
+ //
+ for (int32_t i = 0; i < thread_num; i++) {
+ if (pull_threads[i].joinable()) {
+ pull_threads[i].join();
+ }
+ }
+
+ getchar(); // for test hold the test thread
+ result = StopTubeMQService(err_info);
+ if (!result) {
+ printf("\n *** StopTubeMQService failure, reason is %s", err_info.c_str());
+ }
+
+ printf("\n finishe test exist");
+ return 0;
+}
+
diff --git a/licenses/inlong-dataproxy/LICENSE-binary b/licenses/inlong-dataproxy/LICENSE-binary
index ffa1b91..4cac0c0 100644
--- a/licenses/inlong-dataproxy/LICENSE-binary
+++ b/licenses/inlong-dataproxy/LICENSE-binary
@@ -1,4 +1,4 @@
-
+
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
diff --git a/licenses/inlong-dataproxy/licenses-binary/LICENSE-google-protobuf-java-format.txt b/licenses/inlong-dataproxy/licenses-binary/LICENSE-google-protobuf-java-format.txt
index 083dfff..f9ea500 100644
--- a/licenses/inlong-dataproxy/licenses-binary/LICENSE-google-protobuf-java-format.txt
+++ b/licenses/inlong-dataproxy/licenses-binary/LICENSE-google-protobuf-java-format.txt
@@ -1,27 +1,27 @@
-
- Copyright (c) 2009, Orbitz World Wide
- All rights reserved.
-
- Redistribution and use in source and binary forms, with or without modification,
- are permitted provided that the following conditions are met:
-
- * Redistributions of source code must retain the above copyright notice,
- this list of conditions and the following disclaimer.
- * Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
- * Neither the name of the Orbitz World Wide nor the names of its contributors
- may be used to endorse or promote products derived from this software
- without specific prior written permission.
-
- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+ Copyright (c) 2009, Orbitz World Wide
+ All rights reserved.
+
+ Redistribution and use in source and binary forms, with or without modification,
+ are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+ * Neither the name of the Orbitz World Wide nor the names of its contributors
+ may be used to endorse or promote products derived from this software
+ without specific prior written permission.
+
+ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/licenses/inlong-tubemq/LICENSE-binary b/licenses/inlong-tubemq/LICENSE-binary
index 46a0b3a..af2b7c1 100644
--- a/licenses/inlong-tubemq/LICENSE-binary
+++ b/licenses/inlong-tubemq/LICENSE-binary
@@ -1,4 +1,4 @@
-
+
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
diff --git a/licenses/inlong-tubemq/licenses-binary/LICENSE-google-protobuf-java-format.txt b/licenses/inlong-tubemq/licenses-binary/LICENSE-google-protobuf-java-format.txt
index 083dfff..f9ea500 100644
--- a/licenses/inlong-tubemq/licenses-binary/LICENSE-google-protobuf-java-format.txt
+++ b/licenses/inlong-tubemq/licenses-binary/LICENSE-google-protobuf-java-format.txt
@@ -1,27 +1,27 @@
-
- Copyright (c) 2009, Orbitz World Wide
- All rights reserved.
-
- Redistribution and use in source and binary forms, with or without modification,
- are permitted provided that the following conditions are met:
-
- * Redistributions of source code must retain the above copyright notice,
- this list of conditions and the following disclaimer.
- * Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
- * Neither the name of the Orbitz World Wide nor the names of its contributors
- may be used to endorse or promote products derived from this software
- without specific prior written permission.
-
- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+ Copyright (c) 2009, Orbitz World Wide
+ All rights reserved.
+
+ Redistribution and use in source and binary forms, with or without modification,
+ are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+ * Neither the name of the Orbitz World Wide nor the names of its contributors
+ may be used to endorse or promote products derived from this software
+ without specific prior written permission.
+
+ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.