You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/25 12:00:16 UTC

[incubator-inlong] branch master updated: [INLONG-2717][Manager][Sort] Support none of middleware and Add DataTypeEnum (#2718)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 958b182  [INLONG-2717][Manager][Sort] Support none of middleware and Add DataTypeEnum (#2718)
958b182 is described below

commit 958b18211b39dd5f8dfe7d37cafbcf4683ca551c
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Fri Feb 25 20:00:11 2022 +0800

    [INLONG-2717][Manager][Sort] Support none of middleware and Add DataTypeEnum (#2718)
---
 .../apache/inlong/common/enums/DataTypeEnum.java   | 36 ++++++++------
 .../manager/client/api/FlinkSortBaseConf.java      |  5 +-
 .../inlong/manager/client/api/MqBaseConf.java      | 10 +++-
 .../inlong/manager/client/api/SortBaseConf.java    |  3 +-
 .../api/{MqBaseConf.java => TubeBaseConf.java}     | 28 +++++++----
 .../{MqBaseConf.java => UserDefinedSortConf.java}  | 20 ++++----
 .../client/api/util/InlongGroupTransfer.java       | 56 +++++++++++++++++++++-
 .../inlong/manager/common/enums/Constant.java      |  2 +
 .../common/settings/InlongGroupSettings.java       | 10 ++++
 .../thirdparty/sort/ZkDisabledEventSelector.java   |  7 ++-
 .../thirdparty/sort/ZkEnabledEventSelector.java    |  3 +-
 .../deserialization/DeserializationInfo.java       | 10 ++--
 12 files changed, 141 insertions(+), 49 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
similarity index 57%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
copy to inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
index b44b162..7d32aa3 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
@@ -15,24 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.client.api;
+package org.apache.inlong.common.enums;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import java.io.Serializable;
-import lombok.Data;
+import lombok.Getter;
 
-@Data
-@ApiModel("Base configuration for message queue")
-public abstract class MqBaseConf implements Serializable {
+public enum DataTypeEnum {
+    CSV("csv"),
+    AVRO("avro"),
+    JSON("json"),
+    CANAL("canal"),
+    DEBEZIUM_JSON("debezium_json");
 
-    public enum MqType {
-        PULSAR,
-        TUBE;
-    }
+    @Getter
+    private String name;
 
-    @ApiModelProperty("The number of partitions of Topic, 1-20")
-    private int topicPartitionNum = 3;
+    DataTypeEnum(String name) {
+        this.name = name;
+    }
 
-    public abstract MqType getType();
+    public static DataTypeEnum forName(String name) {
+        for (DataTypeEnum dataType : values()) {
+            if (dataType.getName().equals(name)) {
+                return dataType;
+            }
+        }
+        throw new IllegalArgumentException(String.format("Unsupport dataType for Inlong:%s", name));
+    }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/FlinkSortBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/FlinkSortBaseConf.java
index 706420a..c077966 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/FlinkSortBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/FlinkSortBaseConf.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.client.api;
 
+import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import java.util.Map;
@@ -39,6 +40,6 @@ public class FlinkSortBaseConf extends SortBaseConf {
     @ApiModelProperty("Region for flink cluster, for example:ap-beijing|ap-chengdu|ap-chongqing or null if not exists")
     private String region;
 
-    @ApiModelProperty("Other properties if need")
-    private Map<String, String> properties;
+    @ApiModelProperty("Other properties if needed")
+    private Map<String, String> properties = Maps.newHashMap();
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
index b44b162..8a500b3 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
@@ -26,9 +26,17 @@ import lombok.Data;
 @ApiModel("Base configuration for message queue")
 public abstract class MqBaseConf implements Serializable {
 
+    public static final MqBaseConf BLANK_MQ_CONF = new MqBaseConf() {
+        @Override
+        public MqType getType() {
+            return MqType.NONE;
+        }
+    };
+
     public enum MqType {
         PULSAR,
-        TUBE;
+        TUBE,
+        NONE;
     }
 
     @ApiModelProperty("The number of partitions of Topic, 1-20")
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SortBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SortBaseConf.java
index eb3b6b3..b670f5a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SortBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SortBaseConf.java
@@ -26,7 +26,8 @@ public abstract class SortBaseConf {
 
     public enum SortType {
         FLINK,
-        LOCAL;
+        LOCAL,
+        USER_DEFINED;
     }
 
     public abstract SortType getType();
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TubeBaseConf.java
similarity index 62%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
copy to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TubeBaseConf.java
index b44b162..274fb87 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TubeBaseConf.java
@@ -19,20 +19,28 @@ package org.apache.inlong.manager.client.api;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.io.Serializable;
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 @Data
-@ApiModel("Base configuration for message queue")
-public abstract class MqBaseConf implements Serializable {
+@AllArgsConstructor
+@NoArgsConstructor
+@ApiModel("Base configuration for Tube")
+public class TubeBaseConf extends MqBaseConf {
 
-    public enum MqType {
-        PULSAR,
-        TUBE;
-    }
+    @ApiModelProperty("Message queue type")
+    private MqType type = MqType.TUBE;
 
-    @ApiModelProperty("The number of partitions of Topic, 1-20")
-    private int topicPartitionNum = 3;
+    @ApiModelProperty("Tube manager URL")
+    private String tubeManagerUrl;
 
-    public abstract MqType getType();
+    @ApiModelProperty("Tube master URL")
+    private String tubeMasterUrl;
+
+    @ApiModelProperty("Tube Cluster Id")
+    private int tubeClusterId = 1;
+
+    @ApiModelProperty("GroupName for tube producer")
+    private String groupName;
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/UserDefinedSortConf.java
similarity index 65%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
copy to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/UserDefinedSortConf.java
index b44b162..3b60272 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/UserDefinedSortConf.java
@@ -17,22 +17,22 @@
 
 package org.apache.inlong.manager.client.api;
 
+import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.io.Serializable;
+import java.util.Map;
 import lombok.Data;
 
 @Data
-@ApiModel("Base configuration for message queue")
-public abstract class MqBaseConf implements Serializable {
+@ApiModel("Base configuration for user defined sort functions")
+public class UserDefinedSortConf extends SortBaseConf {
 
-    public enum MqType {
-        PULSAR,
-        TUBE;
-    }
+    @ApiModelProperty(value = "Sort type")
+    private SortType type = SortType.USER_DEFINED;
 
-    @ApiModelProperty("The number of partitions of Topic, 1-20")
-    private int topicPartitionNum = 3;
+    @ApiModelProperty("Name for user defined sort functions")
+    private String sortName;
 
-    public abstract MqType getType();
+    @ApiModelProperty("Properties for user defined sort functions if needed")
+    private Map<String, String> properties = Maps.newHashMap();
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index 079e46f..b614030 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -27,6 +27,8 @@ import org.apache.inlong.manager.client.api.MqBaseConf.MqType;
 import org.apache.inlong.manager.client.api.PulsarBaseConf;
 import org.apache.inlong.manager.client.api.SortBaseConf;
 import org.apache.inlong.manager.client.api.SortBaseConf.SortType;
+import org.apache.inlong.manager.client.api.TubeBaseConf;
+import org.apache.inlong.manager.client.api.UserDefinedSortConf;
 import org.apache.inlong.manager.client.api.auth.Authentication;
 import org.apache.inlong.manager.client.api.auth.Authentication.AuthType;
 import org.apache.inlong.manager.client.api.auth.SecretTokenAuthentication;
@@ -66,8 +68,12 @@ public class InlongGroupTransfer {
             List<InlongGroupExtInfo> extInfos = createPulsarExtInfo(pulsarBaseConf);
             groupInfo.getExtList().addAll(extInfos);
             groupInfo.setTopicPartitionNum(pulsarBaseConf.getTopicPartitionNum());
-        } else {
-            // todo tubemq
+        } else if (mqType == MqType.TUBE) {
+            TubeBaseConf tubeBaseConf = (TubeBaseConf) mqConf;
+            List<InlongGroupExtInfo> extInfos = createTubeExtInfo(tubeBaseConf);
+            groupInfo.setMqResourceObj(tubeBaseConf.getGroupName());
+            groupInfo.getExtList().addAll(extInfos);
+            groupInfo.setTopicPartitionNum(tubeBaseConf.getTopicPartitionNum());
         }
         SortBaseConf sortBaseConf = groupConf.getSortBaseConf();
         SortType sortType = sortBaseConf.getType();
@@ -75,6 +81,10 @@ public class InlongGroupTransfer {
             FlinkSortBaseConf flinkSortBaseConf = (FlinkSortBaseConf) sortBaseConf;
             List<InlongGroupExtInfo> sortExtInfos = createFlinkExtInfo(flinkSortBaseConf);
             groupInfo.getExtList().addAll(sortExtInfos);
+        } else if (sortType == SortType.USER_DEFINED) {
+            UserDefinedSortConf udf = (UserDefinedSortConf) sortBaseConf;
+            List<InlongGroupExtInfo> sortExtInfos = createUserDefinedSortExtInfo(udf);
+            groupInfo.getExtList().addAll(sortExtInfos);
         } else {
             //todo local
         }
@@ -128,8 +138,35 @@ public class InlongGroupTransfer {
         return extInfos;
     }
 
+    public static List<InlongGroupExtInfo> createTubeExtInfo(TubeBaseConf tubeBaseConf) {
+        List<InlongGroupExtInfo> extInfos = new ArrayList<>();
+        if (StringUtils.isNotEmpty(tubeBaseConf.getTubeMasterUrl())) {
+            InlongGroupExtInfo tubeManagerUrl = new InlongGroupExtInfo();
+            tubeManagerUrl.setKeyName(InlongGroupSettings.TUBE_MANAGER_URL);
+            tubeManagerUrl.setKeyValue(tubeBaseConf.getTubeManagerUrl());
+            extInfos.add(tubeManagerUrl);
+        }
+        if (StringUtils.isNotEmpty(tubeBaseConf.getTubeMasterUrl())) {
+            InlongGroupExtInfo tubeMasterUrl = new InlongGroupExtInfo();
+            tubeMasterUrl.setKeyName(InlongGroupSettings.TUBE_MASTER_URL);
+            tubeMasterUrl.setKeyValue(tubeBaseConf.getTubeMasterUrl());
+            extInfos.add(tubeMasterUrl);
+        }
+        if (tubeBaseConf.getTubeClusterId() > 0) {
+            InlongGroupExtInfo tubeClusterId = new InlongGroupExtInfo();
+            tubeClusterId.setKeyName(InlongGroupSettings.TUBE_CLUSTER_ID);
+            tubeClusterId.setKeyValue(String.valueOf(tubeBaseConf.getTubeClusterId()));
+            extInfos.add(tubeClusterId);
+        }
+        return extInfos;
+    }
+
     public static List<InlongGroupExtInfo> createFlinkExtInfo(FlinkSortBaseConf flinkSortBaseConf) {
         List<InlongGroupExtInfo> extInfos = new ArrayList<>();
+        InlongGroupExtInfo sortType = new InlongGroupExtInfo();
+        sortType.setKeyName(InlongGroupSettings.SORT_TYPE);
+        sortType.setKeyValue(InlongGroupSettings.DEFAULT_SORT_TYPE);
+        extInfos.add(sortType);
         if (flinkSortBaseConf.getAuthentication() != null) {
             Authentication authentication = flinkSortBaseConf.getAuthentication();
             AuthType authType = authentication.getAuthType();
@@ -159,4 +196,19 @@ public class InlongGroupTransfer {
         }
         return extInfos;
     }
+
+    public static List<InlongGroupExtInfo> createUserDefinedSortExtInfo(UserDefinedSortConf userDefinedSortConf) {
+        List<InlongGroupExtInfo> extInfos = new ArrayList<>();
+        InlongGroupExtInfo sortType = new InlongGroupExtInfo();
+        sortType.setKeyName(InlongGroupSettings.SORT_TYPE);
+        sortType.setKeyValue(userDefinedSortConf.getSortName());
+        extInfos.add(sortType);
+        if (MapUtils.isNotEmpty(userDefinedSortConf.getProperties())) {
+            InlongGroupExtInfo flinkProperties = new InlongGroupExtInfo();
+            flinkProperties.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
+            flinkProperties.setKeyValue(JsonUtils.toJson(userDefinedSortConf.getProperties()));
+            extInfos.add(flinkProperties);
+        }
+        return extInfos;
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
index 2617c8e..dce0bf8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
@@ -60,6 +60,8 @@ public class Constant {
 
     public static final String MIDDLEWARE_PULSAR = "PULSAR";
 
+    public static final String MIDDLEWARE_NONE = "NONE";
+
     public static final String SCHEMA_M0_DAY = "m0_day";
 
     public static final String CLUSTER_HIVE_TOPO = "HIVE_TOPO";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
index 76c0d8c..dd51641 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
@@ -29,6 +29,12 @@ public class InlongGroupSettings {
 
     public static String DEFAULT_PULSAR_AUTHENTICATION_TYPE = "token";
 
+    public static String TUBE_MANAGER_URL = "tube.manager.url";
+
+    public static String TUBE_MASTER_URL = "tube.master.url";
+
+    public static String TUBE_CLUSTER_ID = "tube.cluster.id";
+
     /**
      * oceanus need param start
      */
@@ -66,6 +72,10 @@ public class InlongGroupSettings {
 
     public static String FS_OFS_USER_APPID = "fs.ofs.user.appid";
 
+    public static String SORT_TYPE = "sort.type";
+
+    public static String DEFAULT_SORT_TYPE = "flink";
+
     public static String SORT_URL = "sort.url";
 
     public static String SORT_AUTHENTICATION = "sort.authentication";
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
index f1d237e..3c8be51 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.thirdparty.sort;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
@@ -36,11 +37,13 @@ public class ZkDisabledEventSelector implements EventSelector {
         if (processForm instanceof GroupResourceProcessForm) {
             GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
             InlongGroupRequest groupInfo = groupResourceForm.getGroupInfo();
-            return groupInfo.getZookeeperEnabled() == 0;
+            return groupInfo.getZookeeperEnabled() == 0
+                    && !groupInfo.getMiddlewareType().equals(Constant.MIDDLEWARE_NONE);
         } else if (processForm instanceof UpdateGroupProcessForm) {
             UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) processForm;
             InlongGroupRequest groupInfo = updateGroupProcessForm.getGroupInfo();
-            return groupInfo.getZookeeperEnabled() == 0;
+            return groupInfo.getZookeeperEnabled() == 0
+                    && !groupInfo.getMiddlewareType().equals(Constant.MIDDLEWARE_NONE);
         } else {
             return false;
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
index 88dca09..4818156 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.thirdparty.sort;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -37,7 +38,7 @@ public class ZkEnabledEventSelector implements EventSelector {
         }
         GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
         InlongGroupRequest groupInfo = groupResourceForm.getGroupInfo();
-        return groupInfo.getZookeeperEnabled() == 1;
+        return groupInfo.getZookeeperEnabled() == 1 && !groupInfo.getMiddlewareType().equals(Constant.MIDDLEWARE_NONE);
     }
 
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
index 85732fa..1ab13bc 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
@@ -36,11 +36,11 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
         @Type(value = JsonDeserializationInfo.class, name = "json"),
         @Type(value = CanalDeserializationInfo.class, name = "canal"),
         @Type(value = DebeziumDeserializationInfo.class, name = "debezium_json"),
-        @Type(value = InLongMsgCsvDeserializationInfo.class, name = "inlongmsg_csv"),
-        @Type(value = InLongMsgCsv2DeserializationInfo.class, name = "inlongmsg_csv2"),
-        @Type(value = InLongMsgKvDeserializationInfo.class, name = "inlongmsg_kv"),
-        @Type(value = InLongMsgTlogCsvDeserializationInfo.class, name = "inlongmsg_tlog_csv"),
-        @Type(value = InLongMsgTlogKvDeserializationInfo.class, name = "inlongmsg_tlog_kv")
+        @Type(value = InLongMsgCsvDeserializationInfo.class, name = "inlong_msg_csv"),
+        @Type(value = InLongMsgCsv2DeserializationInfo.class, name = "inlong_msg_csv2"),
+        @Type(value = InLongMsgKvDeserializationInfo.class, name = "inlong_msg_kv"),
+        @Type(value = InLongMsgTlogCsvDeserializationInfo.class, name = "inlong_msg_tlog_csv"),
+        @Type(value = InLongMsgTlogKvDeserializationInfo.class, name = "inlong_msg_tlog_kv")
 })
 public interface DeserializationInfo extends Serializable {