You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/10 09:10:23 UTC

[incubator-inlong] branch master updated: [INLONG-3039][Manager] Add properties in dataflow info (#3040)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b4fea7e  [INLONG-3039][Manager] Add properties in dataflow info (#3040)
b4fea7e is described below

commit b4fea7e576ff5e328dbc40f15b76cfcdeaacec31
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu Mar 10 16:53:05 2022 +0800

    [INLONG-3039][Manager] Add properties in dataflow info (#3040)
---
 .../inlong/manager/client/api/ClientConfiguration.java      |  5 +++--
 .../org/apache/inlong/manager/client/api/DataFormat.java    |  4 +++-
 .../org/apache/inlong/manager/client/api/DataSeparator.java |  6 +++---
 .../apache/inlong/manager/client/api/FlinkSortBaseConf.java |  3 ++-
 .../org/apache/inlong/manager/client/api/InlongClient.java  |  5 +++--
 .../inlong/manager/client/api/InlongGroupContext.java       |  7 ++++---
 .../inlong/manager/client/api/InlongStreamBuilder.java      |  1 +
 .../org/apache/inlong/manager/client/api/MqBaseConf.java    |  3 ++-
 .../org/apache/inlong/manager/client/api/StreamField.java   |  3 ++-
 .../org/apache/inlong/manager/client/api/StreamSink.java    |  4 ++++
 .../inlong/manager/client/api/UserDefinedSortConf.java      |  3 ++-
 .../manager/client/api/auth/DefaultAuthentication.java      |  3 ++-
 .../manager/client/api/auth/SecretAuthentication.java       |  3 ++-
 .../manager/client/api/auth/SecretTokenAuthentication.java  |  3 ++-
 .../inlong/manager/client/api/auth/TokenAuthentication.java |  3 ++-
 .../inlong/manager/client/api/impl/BlankInlongGroup.java    |  3 ++-
 .../inlong/manager/client/api/impl/InlongClientImpl.java    | 13 +++++++------
 .../inlong/manager/client/api/impl/InlongGroupImpl.java     |  7 ++++---
 .../inlong/manager/client/api/impl/InlongStreamImpl.java    |  7 ++++---
 .../inlong/manager/client/api/inner/InnerGroupContext.java  |  5 +++--
 .../manager/client/api/inner/InnerInlongManagerClient.java  |  3 ++-
 .../inlong/manager/client/api/inner/InnerStreamContext.java |  5 +++--
 .../inlong/manager/client/api/sink/ClickHouseSink.java      |  4 ----
 .../org/apache/inlong/manager/client/api/sink/HiveSink.java |  9 ++++++---
 .../apache/inlong/manager/client/api/sink/KafkaSink.java    |  4 ----
 .../apache/inlong/manager/client/api/util/AssertUtil.java   |  3 ++-
 .../org/apache/inlong/manager/client/api/util/GsonUtil.java |  5 +++--
 .../inlong/manager/client/api/util/InlongGroupTransfer.java |  7 ++++---
 .../apache/inlong/manager/client/api/util/InlongParser.java |  9 +++++----
 .../manager/client/api/util/InlongStreamSinkTransfer.java   |  9 ++++++---
 .../manager/client/api/util/InlongStreamTransfer.java       |  5 +++--
 .../inlong/manager/common/pojo/sink/SinkListResponse.java   |  3 +++
 .../apache/inlong/manager/common/pojo/sink/SinkRequest.java |  4 ++++
 .../inlong/manager/common/pojo/sink/SinkResponse.java       |  4 ++++
 .../manager/common/pojo/sink/ck/ClickHouseSinkDTO.java      |  5 +++++
 .../inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java   |  5 +++++
 .../manager/common/pojo/sink/iceberg/IcebergSinkDTO.java    |  5 +++++
 .../inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java |  7 ++++++-
 .../common/pojo/sink/kafka/KafkaSinkListResponse.java       |  2 +-
 .../manager/common/pojo/sink/kafka/KafkaSinkRequest.java    |  4 +---
 .../manager/common/pojo/sink/kafka/KafkaSinkResponse.java   |  2 +-
 .../inlong/manager/common/pojo/source/SourceRequest.java    |  2 +-
 .../inlong/manager/common/pojo/source/SourceResponse.java   |  2 +-
 .../manager/common/pojo/source/kafka/KafkaSourceDTO.java    |  2 +-
 .../common/pojo/source/kafka/KafkaSourceListResponse.java   |  2 +-
 .../service/thirdparty/sort/CreateSortConfigListener.java   |  8 +++++++-
 46 files changed, 137 insertions(+), 74 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/ClientConfiguration.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/ClientConfiguration.java
index 068fb69..4903841 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/ClientConfiguration.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/ClientConfiguration.java
@@ -17,13 +17,14 @@
 
 package org.apache.inlong.manager.client.api;
 
-import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.client.api.auth.Authentication;
 
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
 /**
  * A simple class to hold client configuration values.
  */
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
index 2d9960e..07ac2a9 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
@@ -17,13 +17,15 @@
 
 package org.apache.inlong.manager.client.api;
 
-import java.util.Locale;
 import lombok.Getter;
 
+import java.util.Locale;
+
 public enum DataFormat {
     CSV("csv"),
     AVRO("avro"),
     CANAL("canal"),
+    JSON("json"),
     NONE("none");
 
     @Getter
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java
index 7e0f99a..6b795a8 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java
@@ -23,9 +23,9 @@ public enum DataSeparator {
     COLON(":", 58),
     SEMICOLON(";", 59),
     DASH("-", 45),
-    SOH("\001",1),
-    STX("\002",2),
-    ETX("\003",3);
+    SOH("\001", 1),
+    STX("\002", 2),
+    ETX("\003", 3);
 
     private String seperator;
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/FlinkSortBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/FlinkSortBaseConf.java
index c077966..0cced82 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/FlinkSortBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/FlinkSortBaseConf.java
@@ -20,10 +20,11 @@ package org.apache.inlong.manager.client.api;
 import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.util.Map;
 import lombok.Data;
 import org.apache.inlong.manager.client.api.auth.Authentication;
 
+import java.util.Map;
+
 @Data
 @ApiModel("Base configuration for flink cluster")
 public class FlinkSortBaseConf extends SortBaseConf {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
index 18a03ef..bc13b16 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
@@ -17,9 +17,10 @@
 
 package org.apache.inlong.manager.client.api;
 
-import java.util.List;
 import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
 
+import java.util.List;
+
 /**
  * An interface to manipulate Inlong Cluster
  * <p/>
@@ -47,7 +48,7 @@ public interface InlongClient {
     /**
      * Create inlong client.
      *
-     * @param serviceUrl    the service url
+     * @param serviceUrl the service url
      * @param configuration the configuration
      * @return the inlong client
      */
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
index dd788f8..68789e1 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
@@ -18,9 +18,6 @@
 package org.apache.inlong.manager.client.api;
 
 import com.google.common.collect.Maps;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
 import lombok.Data;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
@@ -28,6 +25,10 @@ import org.apache.inlong.manager.client.api.util.AssertUtil;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
 @Data
 public class InlongGroupContext implements Serializable {
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamBuilder.java
index 6e202b9..9972c29 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamBuilder.java
@@ -31,6 +31,7 @@ public abstract class InlongStreamBuilder {
     /**
      * create sink in stream.
      * *
+     *
      * @return inlong stream builder
      */
     public abstract InlongStreamBuilder sink(StreamSink sink);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
index 619906b..d3993b2 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
@@ -19,10 +19,11 @@ package org.apache.inlong.manager.client.api;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.io.Serializable;
 import lombok.Data;
 import org.apache.inlong.manager.common.enums.MqType;
 
+import java.io.Serializable;
+
 @Data
 @ApiModel("Base configuration for message queue")
 public abstract class MqBaseConf implements Serializable {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
index 53d4363..cc1661a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
@@ -19,11 +19,12 @@ package org.apache.inlong.manager.client.api;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.util.Locale;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import java.util.Locale;
+
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
index df6f610..0c18cd5 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
@@ -23,6 +23,7 @@ import lombok.Data;
 import org.apache.inlong.manager.common.enums.SinkType;
 
 import java.util.List;
+import java.util.Map;
 
 @Data
 @ApiModel("Stream sink configuration")
@@ -31,6 +32,9 @@ public abstract class StreamSink {
     @ApiModelProperty(value = "DataSink name", required = true)
     private String sinkName;
 
+    @ApiModelProperty("Other properties if need")
+    private Map<String, Object> properties;
+
     public abstract SinkType getSinkType();
 
     public abstract List<SinkField> getSinkFields();
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/UserDefinedSortConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/UserDefinedSortConf.java
index 3b60272..fd1756b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/UserDefinedSortConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/UserDefinedSortConf.java
@@ -20,9 +20,10 @@ package org.apache.inlong.manager.client.api;
 import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.util.Map;
 import lombok.Data;
 
+import java.util.Map;
+
 @Data
 @ApiModel("Base configuration for user defined sort functions")
 public class UserDefinedSortConf extends SortBaseConf {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/DefaultAuthentication.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/DefaultAuthentication.java
index c6fad8a..e6ddf66 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/DefaultAuthentication.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/DefaultAuthentication.java
@@ -18,11 +18,12 @@
 package org.apache.inlong.manager.client.api.auth;
 
 import com.alibaba.fastjson.JSONObject;
-import java.util.Map;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.client.api.util.AssertUtil;
 
+import java.util.Map;
+
 @NoArgsConstructor
 public class DefaultAuthentication implements Authentication {
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/SecretAuthentication.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/SecretAuthentication.java
index 23d566f..6d46e6e 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/SecretAuthentication.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/SecretAuthentication.java
@@ -18,11 +18,12 @@
 package org.apache.inlong.manager.client.api.auth;
 
 import com.alibaba.fastjson.JSONObject;
-import java.util.Map;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.client.api.util.AssertUtil;
 
+import java.util.Map;
+
 @NoArgsConstructor
 public class SecretAuthentication implements Authentication {
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/SecretTokenAuthentication.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/SecretTokenAuthentication.java
index e0f8ddc..0837b02 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/SecretTokenAuthentication.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/SecretTokenAuthentication.java
@@ -18,11 +18,12 @@
 package org.apache.inlong.manager.client.api.auth;
 
 import com.alibaba.fastjson.JSONObject;
-import java.util.Map;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
 
+import java.util.Map;
+
 @NoArgsConstructor
 public class SecretTokenAuthentication extends SecretAuthentication {
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/TokenAuthentication.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/TokenAuthentication.java
index c813943..cd63bed 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/TokenAuthentication.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/TokenAuthentication.java
@@ -18,11 +18,12 @@
 package org.apache.inlong.manager.client.api.auth;
 
 import com.alibaba.fastjson.JSONObject;
-import java.util.Map;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.client.api.util.AssertUtil;
 
+import java.util.Map;
+
 @NoArgsConstructor
 public class TokenAuthentication implements Authentication {
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
index c0e076d..37c62cb 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.manager.client.api.impl;
 
-import java.util.List;
 import org.apache.inlong.manager.client.api.InlongGroup;
 import org.apache.inlong.manager.client.api.InlongGroupConf;
 import org.apache.inlong.manager.client.api.InlongGroupContext;
@@ -25,6 +24,8 @@ import org.apache.inlong.manager.client.api.InlongStream;
 import org.apache.inlong.manager.client.api.InlongStreamBuilder;
 import org.apache.inlong.manager.client.api.InlongStreamConf;
 
+import java.util.List;
+
 public class BlankInlongGroup implements InlongGroup {
 
     @Override
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 50d30ac..017b95b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -20,12 +20,6 @@ package org.apache.inlong.manager.client.api.impl;
 import com.github.pagehelper.PageInfo;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
@@ -39,6 +33,13 @@ import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupResponse;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 @Slf4j
 public class InlongClientImpl implements InlongClient {
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 58cf865..08947d0 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -19,9 +19,6 @@ package org.apache.inlong.manager.client.api.impl;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -53,6 +50,10 @@ import org.apache.inlong.manager.common.pojo.workflow.TaskResponse;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 public class InlongGroupImpl implements InlongGroup {
 
     private InlongGroupConf groupConf;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index ce89b8c..1228871 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -18,9 +18,6 @@
 package org.apache.inlong.manager.client.api.impl;
 
 import com.google.common.collect.Maps;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
@@ -40,6 +37,10 @@ import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 @Data
 @EqualsAndHashCode(callSuper = true)
 @NoArgsConstructor
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
index fbb8312..dfe0290 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
@@ -18,8 +18,6 @@
 package org.apache.inlong.manager.client.api.inner;
 
 import com.google.common.collect.Maps;
-import java.util.List;
-import java.util.Map;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.commons.collections.MapUtils;
@@ -31,6 +29,9 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
 
+import java.util.List;
+import java.util.Map;
+
 @Data
 @NoArgsConstructor
 public class InnerGroupContext {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index 5d4ce4a..eb674c9 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.client.api.inner;
 import com.alibaba.fastjson.JSONObject;
 import com.github.pagehelper.PageInfo;
 import com.google.common.collect.Lists;
-import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import okhttp3.Call;
 import okhttp3.MediaType;
@@ -53,6 +52,8 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
 
+import java.util.List;
+
 /**
  * InnerInlongManagerClient is used to invoke http api of inlong manager.
  */
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
index 46979cf..5b85758 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
@@ -18,8 +18,6 @@
 package org.apache.inlong.manager.client.api.inner;
 
 import com.google.common.collect.Maps;
-import java.util.List;
-import java.util.Map;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
@@ -27,6 +25,9 @@ import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 
+import java.util.List;
+import java.util.Map;
+
 @Data
 @NoArgsConstructor
 public class InnerStreamContext {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/ClickHouseSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/ClickHouseSink.java
index 1bd24ad..b507b9e 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/ClickHouseSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/ClickHouseSink.java
@@ -30,7 +30,6 @@ import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
 import org.apache.inlong.manager.common.enums.SinkType;
 
 import java.util.List;
-import java.util.Map;
 
 @Data
 @EqualsAndHashCode(callSuper = true)
@@ -81,9 +80,6 @@ public class ClickHouseSink extends StreamSink {
     @ApiModelProperty("Field definitions for clickhouse")
     private List<SinkField> sinkFields;
 
-    @ApiModelProperty("Other properties if need")
-    private Map<String, String> properties;
-
     @Override
     public DataFormat getDataFormat() {
         return DataFormat.NONE;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
index d9c1dde..a5a96a0 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
@@ -33,7 +33,6 @@ import org.apache.inlong.manager.common.enums.SinkType;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
-import java.util.Map;
 
 @Data
 @EqualsAndHashCode(callSuper = true)
@@ -68,18 +67,22 @@ public class HiveSink extends StreamSink {
 
     @ApiModelProperty("Data separator, stored as ASCII code")
     private DataSeparator dataSeparator = DataSeparator.SOH;
+
     @ApiModelProperty("File format, support: TextFile, RCFile, SequenceFile, Avro")
     private FileFormat fileFormat;
+
     @ApiModelProperty("Create table or not")
     private boolean needCreated;
+
     @ApiModelProperty("Primary partition field, default null")
     private String primaryPartition;
+
     @ApiModelProperty("Secondary partition field, default null")
     private String secondaryPartition;
+
     @ApiModelProperty("Field definitions for hive")
     private List<SinkField> sinkFields;
-    @ApiModelProperty("Other properties if need")
-    private Map<String, String> properties;
+
     @ApiModelProperty("Data format type for stream sink")
     private DataFormat dataFormat;
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/KafkaSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/KafkaSink.java
index 935f383..801d2d8 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/KafkaSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/KafkaSink.java
@@ -29,7 +29,6 @@ import org.apache.inlong.manager.client.api.StreamSink;
 import org.apache.inlong.manager.common.enums.SinkType;
 
 import java.util.List;
-import java.util.Map;
 
 @Data
 @EqualsAndHashCode(callSuper = true)
@@ -55,7 +54,4 @@ public class KafkaSink extends StreamSink {
 
     @ApiModelProperty("Field definitions for kafka")
     private List<SinkField> sinkFields;
-
-    @ApiModelProperty("Other properties if need")
-    private Map<String, String> properties;
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/AssertUtil.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/AssertUtil.java
index ccd1806..86ac32d 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/AssertUtil.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/AssertUtil.java
@@ -17,9 +17,10 @@
 
 package org.apache.inlong.manager.client.api.util;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.Collection;
 import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
 
 public class AssertUtil {
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/GsonUtil.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/GsonUtil.java
index b048b58..771bf3c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/GsonUtil.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/GsonUtil.java
@@ -26,6 +26,9 @@ import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParseException;
 import com.google.gson.JsonSyntaxException;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
 import java.lang.reflect.Type;
 import java.text.SimpleDateFormat;
 import java.util.Date;
@@ -33,8 +36,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class GsonUtil {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index 32660d4..a79c36e 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -19,9 +19,6 @@ package org.apache.inlong.manager.client.api.util;
 
 import com.google.common.collect.Lists;
 import com.google.gson.reflect.TypeToken;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -46,6 +43,10 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupResponse;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.common.util.JsonUtils;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 public class InlongGroupTransfer {
 
     public static InlongGroupConf parseGroupResponse(InlongGroupResponse groupResponse) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index e1ec229..21e288b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -17,15 +17,11 @@
 
 package org.apache.inlong.manager.client.api.util;
 
-import static org.apache.inlong.manager.common.enums.SourceType.BINLOG;
-import static org.apache.inlong.manager.common.enums.SourceType.KAFKA;
-
 import com.github.pagehelper.PageInfo;
 import com.google.common.collect.Lists;
 import com.google.common.reflect.TypeToken;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
-import java.util.List;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.enums.Constant;
@@ -54,6 +50,11 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
 
+import java.util.List;
+
+import static org.apache.inlong.manager.common.enums.SourceType.BINLOG;
+import static org.apache.inlong.manager.common.enums.SourceType.KAFKA;
+
 /**
  * Parser for Inlong entity
  */
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index 8755dfe..8c517c3 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -103,10 +103,10 @@ public class InlongStreamSinkTransfer {
         clickHouseSinkRequest.setKeyFieldNames(clickHouseSink.getKeyFieldNames());
         clickHouseSinkRequest.setPartitionKey(clickHouseSink.getPartitionKey());
         clickHouseSinkRequest.setPartitionStrategy(clickHouseSink.getPartitionStrategy());
-        clickHouseSinkRequest.setPartitionKey(clickHouseSink.getPartitionKey());
         clickHouseSinkRequest.setWriteMaxRetryTimes(clickHouseSink.getWriteMaxRetryTimes());
         clickHouseSinkRequest.setInlongGroupId(streamInfo.getInlongGroupId());
         clickHouseSinkRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+        clickHouseSinkRequest.setProperties(clickHouseSink.getProperties());
         clickHouseSinkRequest.setEnableCreateResource(clickHouseSink.isNeedCreated() ? 1 : 0);
         if (CollectionUtils.isNotEmpty(clickHouseSink.getSinkFields())) {
             List<SinkFieldRequest> fieldRequests = createSinkFieldRequests(streamSink.getSinkFields());
@@ -138,7 +138,7 @@ public class InlongStreamSinkTransfer {
             clickHouseSink.setWriteMaxRetryTimes(sinkResponse.getWriteMaxRetryTimes());
             clickHouseSink.setDistributedTable(sinkResponse.getDistributedTable());
         }
-
+        clickHouseSink.setProperties(sinkResponse.getProperties());
         clickHouseSink.setNeedCreated(sinkResponse.getEnableCreateResource() == 1);
         if (CollectionUtils.isNotEmpty(sinkResponse.getFieldList())) {
             clickHouseSink.setSinkFields(convertToSinkFields(sinkResponse.getFieldList()));
@@ -167,6 +167,7 @@ public class InlongStreamSinkTransfer {
         kafkaSinkRequest.setInlongStreamId(streamInfo.getInlongStreamId());
         kafkaSinkRequest.setSerializationType(kafkaSink.getDataFormat().name());
         kafkaSinkRequest.setEnableCreateResource(kafkaSink.isNeedCreated() ? 1 : 0);
+        kafkaSinkRequest.setProperties(kafkaSink.getProperties());
         if (CollectionUtils.isNotEmpty(kafkaSink.getSinkFields())) {
             List<SinkFieldRequest> fieldRequests = createSinkFieldRequests(kafkaSink.getSinkFields());
             kafkaSinkRequest.setFieldList(fieldRequests);
@@ -190,6 +191,7 @@ public class InlongStreamSinkTransfer {
             kafkaSink.setTopicName(sinkResponse.getTopicName());
             kafkaSink.setDataFormat(DataFormat.forName(sinkResponse.getSerializationType()));
         }
+        kafkaSink.setProperties(sinkResponse.getProperties());
         kafkaSink.setNeedCreated(sinkResponse.getEnableCreateResource() == 1);
         if (CollectionUtils.isNotEmpty(sinkResponse.getFieldList())) {
             kafkaSink.setSinkFields(convertToSinkFields(sinkResponse.getFieldList()));
@@ -220,6 +222,7 @@ public class InlongStreamSinkTransfer {
         hiveSinkRequest.setPassword(defaultAuthentication.getPassword());
         hiveSinkRequest.setPrimaryPartition(hiveSink.getPrimaryPartition());
         hiveSinkRequest.setSecondaryPartition(hiveSink.getSecondaryPartition());
+        hiveSinkRequest.setProperties(hiveSink.getProperties());
         if (CollectionUtils.isNotEmpty(hiveSink.getSinkFields())) {
             List<SinkFieldRequest> fieldRequests = createSinkFieldRequests(streamSink.getSinkFields());
             hiveSinkRequest.setFieldList(fieldRequests);
@@ -275,7 +278,7 @@ public class InlongStreamSinkTransfer {
             hiveSink.setSecondaryPartition(sinkResponse.getSecondaryPartition());
             hiveSink.setPrimaryPartition(sinkResponse.getPrimaryPartition());
         }
-
+        hiveSink.setProperties(sinkResponse.getProperties());
         hiveSink.setSinkType(SinkType.HIVE);
         hiveSink.setNeedCreated(sinkResponse.getEnableCreateResource() == 1);
         if (CollectionUtils.isNotEmpty(sinkResponse.getFieldList())) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index 12988b3..522affc 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.client.api.util;
 
-import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.client.api.InlongStreamConf;
@@ -27,6 +25,9 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 public class InlongStreamTransfer {
 
     public static InlongStreamInfo createStreamInfo(InlongStreamConf streamConf, InlongGroupInfo groupInfo) {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkListResponse.java
index cf218e2..4c46022 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkListResponse.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.common.pojo.sink;
 
 import com.fasterxml.jackson.annotation.JsonFormat;
 import io.swagger.annotations.ApiModelProperty;
+import java.util.Map;
 import lombok.Data;
 
 import java.util.Date;
@@ -59,4 +60,6 @@ public class SinkListResponse {
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
     private Date modifyTime;
 
+    @ApiModelProperty("Properties for sink")
+    private Map<String, Object> properties;
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkRequest.java
index 34cec44..68bbc24 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkRequest.java
@@ -19,8 +19,10 @@ package org.apache.inlong.manager.common.pojo.sink;
 
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
+import java.util.Map;
 import lombok.Data;
 
 import javax.validation.constraints.NotNull;
@@ -61,4 +63,6 @@ public class SinkRequest {
     @ApiModelProperty("Sink field list")
     private List<SinkFieldRequest> fieldList;
 
+    @ApiModelProperty("Properties for sink")
+    private Map<String, Object> properties = Maps.newHashMap();
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java
index f4d3a1e..e0683fa 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.pojo.sink;
 import com.fasterxml.jackson.annotation.JsonFormat;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
+import java.util.Map;
 import lombok.Data;
 
 import java.util.Date;
@@ -79,4 +80,7 @@ public class SinkResponse {
     @ApiModelProperty("Sink field list")
     private List<SinkFieldResponse> fieldList;
 
+    @ApiModelProperty("Properties for sink")
+    private Map<String, Object> properties;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
index 93f7d9b..bce8e70 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.pojo.sink.ck;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
+import java.util.Map;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -76,6 +77,9 @@ public class ClickHouseSinkDTO {
     @ApiModelProperty("Write max retry times")
     private Integer writeMaxRetryTimes;
 
+    @ApiModelProperty("Properties for clickhouse")
+    private Map<String, Object> properties;
+
     /**
      * Get the dto instance from the request
      */
@@ -93,6 +97,7 @@ public class ClickHouseSinkDTO {
                 .flushInterval(request.getFlushInterval())
                 .flushRecordNumber(request.getFlushRecordNumber())
                 .writeMaxRetryTimes(request.getWriteMaxRetryTimes())
+                .properties(request.getProperties())
                 .build();
     }
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
index ed5eef6..65f3ea8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.pojo.sink.hive;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
+import java.util.Map;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -85,6 +86,9 @@ public class HiveSinkDTO {
     @ApiModelProperty("Data field separator")
     private String dataSeparator;
 
+    @ApiModelProperty("Properties for hive")
+    private Map<String, Object> properties;
+
     /**
      * Get the dto instance from the request
      */
@@ -105,6 +109,7 @@ public class HiveSinkDTO {
                 .fileFormat(request.getFileFormat())
                 .dataEncoding(request.getDataEncoding())
                 .dataSeparator(request.getDataSeparator())
+                .properties(request.getProperties())
                 .build();
     }
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
index 0af7778..7d4737c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.pojo.sink.iceberg;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
+import java.util.Map;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -43,12 +44,16 @@ public class IcebergSinkDTO {
     @ApiModelProperty("table Location like hdfs://")
     private String tableLocation;
 
+    @ApiModelProperty("Properties for iceberg")
+    private Map<String, Object> properties;
+
     /**
      * Get the dto instance from the request
      */
     public static IcebergSinkDTO getFromRequest(IcebergSinkRequest request) {
         return IcebergSinkDTO.builder()
                 .tableLocation(request.getTableLocation())
+                .properties(request.getProperties())
                 .build();
     }
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java
index 58b54e4..54b4fc6 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.pojo.sink.kafka;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
+import java.util.Map;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -46,9 +47,12 @@ public class KafkaSinkDTO {
     @ApiModelProperty("Kafka topicName")
     private String topicName;
 
-    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
+    @ApiModelProperty("Data Serialization, support: json, canal, avro")
     private String serializationType;
 
+    @ApiModelProperty("Properties for kafka")
+    private Map<String, Object> properties;
+
     /**
      * Get the dto instance from the request
      */
@@ -57,6 +61,7 @@ public class KafkaSinkDTO {
                 .address(request.getAddress())
                 .topicName(request.getTopicName())
                 .serializationType(request.getSerializationType())
+                .properties(request.getProperties())
                 .build();
     }
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkListResponse.java
index 9aebfb9..eff2b46 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkListResponse.java
@@ -37,7 +37,7 @@ public class KafkaSinkListResponse extends SinkListResponse {
     @ApiModelProperty("Kafka topicName")
     private String topicName;
 
-    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
+    @ApiModelProperty("Data Serialization, support: json, canal, avro")
     private String serializationType;
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkRequest.java
index 98cefb5..74ca8e4 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkRequest.java
@@ -42,8 +42,6 @@ public class KafkaSinkRequest extends SinkRequest {
     @ApiModelProperty("Kafka topicName")
     private String topicName;
 
-    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
+    @ApiModelProperty("Data Serialization, support: json, canal, avro")
     private String serializationType;
-
-
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java
index 665a74a..b60e2b1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java
@@ -44,7 +44,7 @@ public class KafkaSinkResponse extends SinkResponse {
     @ApiModelProperty("Kafka topicName")
     private String topicName;
 
-    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
+    @ApiModelProperty("Data Serialization, support: json, canal, avro")
     private String serializationType;
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
index ffd719f..069da43 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
@@ -72,6 +72,6 @@ public class SourceRequest {
     @ApiModelProperty("Snapshot of the source task")
     private String snapshot;
 
-    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro, etc")
+    @ApiModelProperty("Data Serialization, support: json, canal, avro, etc")
     private String serializationType;
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
index 67834d4..a35a36d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
@@ -84,6 +84,6 @@ public class SourceResponse {
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
     private Date modifyTime;
 
-    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro, etc")
+    @ApiModelProperty("Data Serialization, support: json, canal, avro, etc")
     private String serializationType;
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index dffdda0..2b81870 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -61,7 +61,7 @@ public class KafkaSourceDTO {
             notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
     private String topicPartitionOffset;
 
-    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro, etc")
+    @ApiModelProperty("Data Serialization, support: json, canal, avro, etc")
     private String serializationType;
 
     /**
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
index 6c1c235..318eabb 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
@@ -46,7 +46,7 @@ public class KafkaSourceListResponse extends SourceListResponse {
     @ApiModelProperty("Limit the number of bytes read per second")
     private String byteSpeedLimit;
 
-    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro, etc")
+    @ApiModelProperty("Data Serialization, support: json, canal, avro, etc")
     private String serializationType = "none";
 
     @ApiModelProperty(value = "Topic partition offset",
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
index 969dd69..a752757 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
@@ -19,7 +19,9 @@ package org.apache.inlong.manager.service.thirdparty.sort;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
@@ -155,7 +157,11 @@ public class CreateSortConfigListener implements SortOperateListener {
         // Get sink info
         SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(sourceResponse, sinkResponse);
 
-        return new DataFlowInfo(sinkId, sourceInfo, sinkInfo);
+        Map<String, Object> properties = Maps.newHashMap();
+        if (MapUtils.isNotEmpty(sinkResponse.getProperties())) {
+            properties.putAll(sinkResponse.getProperties());
+        }
+        return new DataFlowInfo(sinkId, sourceInfo, sinkInfo, properties);
     }
 
     private InlongGroupInfo getGroupInfo(ProcessForm processForm) {