You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/17 12:29:01 UTC

[incubator-inlong] branch master updated: [INLONG-3173][Sort-Standalone] Unify SortClusterConfig in manager and sort-standalone (#3174)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1594016  [INLONG-3173][Sort-Standalone] Unify SortClusterConfig in manager and sort-standalone (#3174)
1594016 is described below

commit 1594016c5dac4a4029517ced46c38198f2c87ed7
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Thu Mar 17 20:28:51 2022 +0800

    [INLONG-3173][Sort-Standalone] Unify SortClusterConfig in manager and sort-standalone (#3174)
---
 .../pojo/sortstandalone/SortClusterConfig.java     |  36 +++--
 .../pojo/sortstandalone/SortClusterResponse.java   |  49 ++-----
 .../common/pojo/sortstandalone/SortTaskConfig.java |  47 ++-----
 .../inlong/manager/service/core/SortService.java   |   6 +-
 .../manager/service/core/impl/SortServiceImpl.java |  37 ++++--
 .../core/impl/SortTaskSinkParamServiceImpl.java    |   3 -
 .../web/controller/openapi/SortController.java     |   4 +-
 .../config/holder/SortClusterConfigHolder.java     |   4 +-
 .../ClassResourceSortClusterConfigLoader.java      |   4 +-
 .../loader/ManagerSortClusterConfigLoader.java     |  10 +-
 .../config/loader/SortClusterConfigLoader.java     |   2 +-
 .../standalone/config/pojo/SortClusterConfig.java  |  68 ----------
 .../config/pojo/SortClusterResponse.java           | 107 ---------------
 .../FlumeConfigGenerator.java}                     | 145 +++++----------------
 .../apache/inlong/sort/standalone/SortCluster.java |   4 +-
 .../apache/inlong/sort/standalone/SortTask.java    |   5 +-
 .../inlong/sort/standalone/sink/SinkContext.java   |   5 +-
 .../sort/standalone/sink/cls/ClsSinkContext.java   |   2 +-
 .../sink/elasticsearch/EsSinkContext.java          |   2 +-
 .../sort/standalone/sink/hive/HiveSinkContext.java |   2 +-
 .../sink/kafka/KafkaFederationSinkContext.java     |   2 +-
 .../sink/pulsar/PulsarFederationSinkContext.java   |   2 +-
 .../standalone/source/sortsdk/SortSdkSource.java   |   2 +-
 .../sink/elasticsearch/TestEsSinkContext.java      |   3 +-
 .../source/sortsdk/TestSortSdkSource.java          |   8 +-
 25 files changed, 131 insertions(+), 428 deletions(-)

diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterConfig.java
similarity index 51%
copy from inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
copy to inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterConfig.java
index 333c33d..2abcc79 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterConfig.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -15,24 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.standalone.config.loader;
+package org.apache.inlong.common.pojo.sortstandalone;
 
-import org.apache.flume.conf.Configurable;
-import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
-/**
- * 
- * SortClusterConfigLoader
- */
-public interface SortClusterConfigLoader extends Configurable {
-
-    String SORT_CLUSTER_CONFIG_TYPE = "sortClusterConfig.type";
-    String SORT_CLUSTER_CONFIG_MANAGER = "sortClusterConfig.managerUrl";
+import java.util.List;
 
-    /**
-     * load
-     * 
-     * @return
-     */
-    SortClusterConfig load();
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class SortClusterConfig {
+    String clusterName;
+    List<SortTaskConfig> sortTasks;
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/SortClusterConfigResponse.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterResponse.java
similarity index 52%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/SortClusterConfigResponse.java
copy to inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterResponse.java
index 7d3fcf6..679031e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/SortClusterConfigResponse.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterResponse.java
@@ -15,50 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.sort;
+package org.apache.inlong.common.pojo.sortstandalone;
 
-import io.swagger.annotations.ApiModel;
+import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
-
-import java.util.List;
-import java.util.Map;
+import lombok.NoArgsConstructor;
 
 @Data
 @Builder
-@ApiModel("Sort cluster config")
-public class SortClusterConfigResponse {
+@NoArgsConstructor
+@AllArgsConstructor
+public class SortClusterResponse {
+
+    public static final int SUCC = 0;
+    public static final int NOUPDATE = 1;
+    public static final int FAIL = -1;
+    public static final int REQ_PARAMS_ERROR = -101;
+
     String msg;
     int code;
     String md5;
-    List<SortTaskConfig> tasks;
-
-    @Builder
-    @Data
-    public static class SortClusterConfig {
-        String clusterName;
-        List<SortTaskConfig> sortTasks;
-    }
-
-    @Builder
-    @Data
-    public static class SortTaskConfig {
-        String taskName;
-        SinkType sinkType;
-        List<Map<String, String>> idParams;
-        Map<String, String> sinkParams;
-    }
+    SortClusterConfig data;
 
-    public enum SinkType {
-        /** kafka */
-        KAFKA,
-        /** pulsar */
-        PULSAR,
-        /** hive */
-        HIVE,
-        /** es */
-        ElasticSearch,
-        /** unknown */
-        UNKNOWN
-    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/SortClusterConfigResponse.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortTaskConfig.java
similarity index 52%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/SortClusterConfigResponse.java
rename to inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortTaskConfig.java
index 7d3fcf6..0b44fee 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/SortClusterConfigResponse.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortTaskConfig.java
@@ -15,50 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.sort;
+package org.apache.inlong.common.pojo.sortstandalone;
 
-import io.swagger.annotations.ApiModel;
+import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 import java.util.List;
 import java.util.Map;
 
 @Data
 @Builder
-@ApiModel("Sort cluster config")
-public class SortClusterConfigResponse {
-    String msg;
-    int code;
-    String md5;
-    List<SortTaskConfig> tasks;
-
-    @Builder
-    @Data
-    public static class SortClusterConfig {
-        String clusterName;
-        List<SortTaskConfig> sortTasks;
-    }
-
-    @Builder
-    @Data
-    public static class SortTaskConfig {
-        String taskName;
-        SinkType sinkType;
-        List<Map<String, String>> idParams;
-        Map<String, String> sinkParams;
-    }
-
-    public enum SinkType {
-        /** kafka */
-        KAFKA,
-        /** pulsar */
-        PULSAR,
-        /** hive */
-        HIVE,
-        /** es */
-        ElasticSearch,
-        /** unknown */
-        UNKNOWN
-    }
+@NoArgsConstructor
+@AllArgsConstructor
+public class SortTaskConfig {
+    String name;
+    String type;
+    List<Map<String, String>> idParams;
+    Map<String, String> sinkParams;
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
index 40a5421..f5743af 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core;
 
-import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
 import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse;
 
 /**
@@ -36,9 +36,9 @@ public interface SortService {
      *
      * @param clusterName Name of sort cluster.
      * @param md5 Last update md5.
-     * @return Response of sort cluster config {@link SortClusterConfigResponse}
+     * @return Response of sort cluster config {@link SortClusterResponse}
      */
-    SortClusterConfigResponse getClusterConfig(String clusterName, String md5);
+    SortClusterResponse getClusterConfig(String clusterName, String md5);
 
     /**
      * Get sort source config.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 4972e81..08b6566 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -19,13 +19,13 @@ package org.apache.inlong.manager.service.core.impl;
 
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
 import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse;
 import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse.CacheZone;
 import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse.SortSourceConfig;
 import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
-import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse.SinkType;
-import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse.SortTaskConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.manager.service.core.SortClusterConfigService;
 import org.apache.inlong.manager.service.core.SortSourceService;
 import org.apache.inlong.manager.service.core.SortTaskIdParamService;
@@ -61,14 +61,14 @@ public class SortServiceImpl implements SortService {
     @Autowired private SortSourceService sortSourceService;
 
     @Override
-    public SortClusterConfigResponse getClusterConfig(String clusterName, String md5) {
+    public SortClusterResponse getClusterConfig(String clusterName, String md5) {
         LOGGER.info("start getClusterConfig");
 
         // check if cluster name is valid or not.
         if (StringUtils.isBlank(clusterName)) {
             String errMsg = "Blank cluster name, return nothing";
             LOGGER.info(errMsg);
-            return SortClusterConfigResponse.builder().msg(errMsg).build();
+            return SortClusterResponse.builder().msg(errMsg).build();
         }
 
         // check if there is any task.
@@ -77,7 +77,10 @@ public class SortServiceImpl implements SortService {
         if (tasks == null || tasks.isEmpty()) {
             String errMsg = "There is not any task for cluster" + clusterName;
             LOGGER.info(errMsg);
-            return SortClusterConfigResponse.builder().msg(errMsg).build();
+            return SortClusterResponse.builder()
+                    .code(RESPONSE_CODE_REQ_PARAMS_ERROR)
+                    .msg(errMsg)
+                    .build();
         }
 
         // add task configs
@@ -87,22 +90,34 @@ public class SortServiceImpl implements SortService {
         } catch (IllegalArgumentException ex) {
             String errMsg = "Got illegal sink type from db, " + ex.getMessage();
             LOGGER.info(errMsg);
-            return SortClusterConfigResponse.builder().msg(errMsg).build();
+            return SortClusterResponse.builder()
+                    .code(RESPONSE_CODE_FAIL)
+                    .msg(errMsg)
+                    .build();
         }
 
-        return SortClusterConfigResponse.builder().tasks(taskConfigs).msg("success").build();
+        SortClusterConfig clusterConfig = SortClusterConfig.builder()
+                .clusterName(clusterName)
+                .sortTasks(taskConfigs)
+                .build();
+
+        return SortClusterResponse.builder()
+                .code(RESPONSE_CODE_SUCCESS)
+                .data(clusterConfig)
+                .msg("success")
+                .build();
     }
 
     private SortTaskConfig getTaskConfig(SortClusterConfigEntity clusterConfig) {
-        SinkType sinkType = SinkType.valueOf(clusterConfig.getSinkType().toUpperCase());
+        String sinkType = clusterConfig.getSinkType().toUpperCase();
         List<Map<String, String>> idParams =
                 sortTaskIdParamService.selectByTaskName(clusterConfig.getTaskName());
         Map<String, String> sinkParams =
                 sortTaskSinkParamService
                         .selectByTaskNameAndType(clusterConfig.getTaskName(), clusterConfig.getSinkType());
         return SortTaskConfig.builder()
-                .taskName(clusterConfig.getTaskName())
-                .sinkType(sinkType)
+                .name(clusterConfig.getTaskName())
+                .type(sinkType)
                 .idParams(idParams)
                 .sinkParams(sinkParams)
                 .build();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java
index 39b7e8a..ea1bcad 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java
@@ -34,8 +34,6 @@ import java.util.Map;
 public class SortTaskSinkParamServiceImpl implements SortTaskSinkParamService {
     private static final Logger LOGGER = LoggerFactory.getLogger(SortTaskSinkParamServiceImpl.class);
 
-    private static final String KEY_SINK_TYPE = "type";
-
     @Autowired
     private SortTaskSinkParamEntityMapper sortTaskSinkParamEntityMapper;
 
@@ -45,7 +43,6 @@ public class SortTaskSinkParamServiceImpl implements SortTaskSinkParamService {
         List<SortTaskSinkParamEntity> taskSinkParamEntityList =
                 sortTaskSinkParamEntityMapper.selectByTaskNameAndType(taskName);
         Map<String, String> sinkParams = new HashMap<>();
-        sinkParams.put(KEY_SINK_TYPE, sinkType);
         taskSinkParamEntityList.forEach(entity -> sinkParams.put(entity.getParamKey(), entity.getParamValue()));
         return sinkParams;
     }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
index 2caa7bd..da2e55f 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.web.controller.openapi;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
-import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
 import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse;
 import org.apache.inlong.manager.service.core.SortService;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -38,7 +38,7 @@ public class SortController {
 
     @GetMapping("/getClusterConfig")
     @ApiOperation(value = "get sort cluster config")
-    public SortClusterConfigResponse getSortClusterConfig(
+    public SortClusterResponse getSortClusterConfig(
             @RequestParam String clusterName,
             @RequestParam String md5) {
         return sortService.getClusterConfig(clusterName, md5);
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
index d593db1..42aaf0b 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
@@ -27,10 +27,10 @@ import java.util.TimerTask;
 import org.apache.commons.lang.ClassUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Context;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sort.standalone.config.loader.ClassResourceSortClusterConfigLoader;
 import org.apache.inlong.sort.standalone.config.loader.SortClusterConfigLoader;
-import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import org.slf4j.Logger;
 
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
index 6193680..a9ccf63 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
@@ -21,7 +21,7 @@ import java.io.UnsupportedEncodingException;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.flume.Context;
-import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import org.slf4j.Logger;
 
@@ -53,7 +53,7 @@ public class ClassResourceSortClusterConfigLoader implements SortClusterConfigLo
         } catch (Exception e) {
             LOG.error("fail to load properties, file ={}, and e= {}", FILENAME, e);
         }
-        return new SortClusterConfig();
+        return SortClusterConfig.builder().build();
     }
 
     /**
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
index 249b5a4..eb9e8c1 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
@@ -28,9 +28,9 @@ import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
-import org.apache.inlong.sort.standalone.config.pojo.SortClusterResponse;
 import org.slf4j.Logger;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
@@ -101,8 +101,10 @@ public class ManagerSortClusterConfigLoader implements SortClusterConfigLoader {
             // get groupId <-> topic and m value.
 
             SortClusterResponse clusterResponse = gson.fromJson(returnStr, SortClusterResponse.class);
-            if (!clusterResponse.isResult()) {
-                LOG.info("Fail to get config info from url:{}, error code is {}", url, clusterResponse.getErrCode());
+            int errCode = clusterResponse.getCode();
+            if (errCode != SortClusterResponse.SUCC && errCode != SortClusterResponse.NOUPDATE) {
+                LOG.info("Fail to get config info from url:{}, error code is {}, msg is {}",
+                        url, clusterResponse.getCode(), clusterResponse.getMsg());
                 return null;
             }
 
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
index 333c33d..8081f87 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.sort.standalone.config.loader;
 
 import org.apache.flume.conf.Configurable;
-import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
 
 /**
  * 
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterConfig.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterConfig.java
deleted file mode 100644
index a720ae0..0000000
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterConfig.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.standalone.config.pojo;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * 
- * SortClusterConfig
- */
-public class SortClusterConfig {
-
-    private String clusterName;
-    private List<SortTaskConfig> sortTasks = new ArrayList<>();
-
-    /**
-     * get clusterName
-     * 
-     * @return the clusterName
-     */
-    public String getClusterName() {
-        return clusterName;
-    }
-
-    /**
-     * set clusterName
-     * 
-     * @param clusterName the clusterName to set
-     */
-    public void setClusterName(String clusterName) {
-        this.clusterName = clusterName;
-    }
-
-    /**
-     * get sortTasks
-     * 
-     * @return the sortTasks
-     */
-    public List<SortTaskConfig> getSortTasks() {
-        return sortTasks;
-    }
-
-    /**
-     * set sortTasks
-     * 
-     * @param sortTasks the sortTasks to set
-     */
-    public void setSortTasks(List<SortTaskConfig> sortTasks) {
-        this.sortTasks = sortTasks;
-    }
-
-}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java
deleted file mode 100644
index 3d0faa6..0000000
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.standalone.config.pojo;
-
-/**
- * 
- * SortClusterResponse
- */
-public class SortClusterResponse {
-
-    public static final int SUCC = 0;
-    public static final int NOUPDATE = 1;
-    public static final int FAIL = -1;
-    public static final int REQ_PARAMS_ERROR = -101;
-
-    private boolean result;
-    private int errCode;
-    private String md5;
-    private SortClusterConfig data;
-
-    /**
-     * get result
-     * 
-     * @return the result
-     */
-    public boolean isResult() {
-        return result;
-    }
-
-    /**
-     * set result
-     * 
-     * @param result the result to set
-     */
-    public void setResult(boolean result) {
-        this.result = result;
-    }
-
-    /**
-     * get errCode
-     * 
-     * @return the errCode
-     */
-    public int getErrCode() {
-        return errCode;
-    }
-
-    /**
-     * set errCode
-     * 
-     * @param errCode the errCode to set
-     */
-    public void setErrCode(int errCode) {
-        this.errCode = errCode;
-    }
-
-    /**
-     * get md5
-     * 
-     * @return the md5
-     */
-    public String getMd5() {
-        return md5;
-    }
-
-    /**
-     * set md5
-     * 
-     * @param md5 the md5 to set
-     */
-    public void setMd5(String md5) {
-        this.md5 = md5;
-    }
-
-    /**
-     * get data
-     * 
-     * @return the data
-     */
-    public SortClusterConfig getData() {
-        return data;
-    }
-
-    /**
-     * set data
-     * 
-     * @param data the data to set
-     */
-    public void setData(SortClusterConfig data) {
-        this.data = data;
-    }
-}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortTaskConfig.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
similarity index 61%
rename from inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortTaskConfig.java
rename to inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
index ca99a4b..8b30759 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortTaskConfig.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -15,127 +15,40 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.standalone.config.pojo;
+package org.apache.inlong.sort.standalone.utils;
+
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
-import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.pojo.type.SortType;
-
-/**
- * 
- * SortTaskConfig
- */
-public class SortTaskConfig {
+public class FlumeConfigGenerator {
 
     public static final String KEY_TASK_NAME = "taskName";
     public static final String KEY_SORT_CHANNEL_TYPE = "sortChannel.type";
     public static final String KEY_SORT_SINK_TYPE = "sortSink.type";
     public static final String KEY_SORT_SOURCE_TYPE = "sortSource.type";
 
-    private String name;
-    private SortType type;
-    private List<Map<String, String>> idParams = new ArrayList<>();
-    private Map<String, String> sinkParams = new HashMap<>();
-
-    /**
-     * get name
-     * 
-     * @return the name
-     */
-    public String getName() {
-        return name;
-    }
-
-    /**
-     * set name
-     * 
-     * @param name the name to set
-     */
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    /**
-     * get type
-     * 
-     * @return the type
-     */
-    public SortType getType() {
-        return type;
-    }
-
-    /**
-     * set type
-     * 
-     * @param type the type to set
-     */
-    public void setType(SortType type) {
-        this.type = type;
-    }
-
-    /**
-     * get idParams
-     * 
-     * @return the idParams
-     */
-    public List<Map<String, String>> getIdParams() {
-        return idParams;
-    }
-
-    /**
-     * set idParams
-     * 
-     * @param idParams the idParams to set
-     */
-    public void setIdParams(List<Map<String, String>> idParams) {
-        this.idParams = idParams;
-    }
-
-    /**
-     * get sinkParams
-     * 
-     * @return the sinkParams
-     */
-    public Map<String, String> getSinkParams() {
-        return sinkParams;
-    }
-
-    /**
-     * set sinkParams
-     * 
-     * @param sinkParams the sinkParams to set
-     */
-    public void setSinkParams(Map<String, String> sinkParams) {
-        this.sinkParams = sinkParams;
-    }
-
-    /**
-     * getFlumeConfProperties
-     * 
-     * @return Map
-     */
-    public Map<String, String> generateFlumeConfiguration() {
+    public static Map<String, String> generateFlumeConfiguration(SortTaskConfig taskConfig) {
         Map<String, String> flumeConf = new HashMap<>();
+        String name = taskConfig.getName();
+        Map<String, String> sinkParams = taskConfig.getSinkParams();
         // channels
-        this.appendChannels(flumeConf);
+        appendChannels(flumeConf, name);
         // sinks
-        this.appendSinks(flumeConf);
+        appendSinks(flumeConf, name, sinkParams);
         // sources
-        this.appendSources(flumeConf);
+        appendSources(flumeConf, name);
         return flumeConf;
     }
 
     /**
      * appendChannels
-     * 
+     *
      * @param flumeConf
      */
-    private void appendChannels(Map<String, String> flumeConf) {
+    private static void appendChannels(Map<String, String> flumeConf, String name) {
         StringBuilder builder = new StringBuilder();
         String channelName = name + "Channel";
         flumeConf.put(name + ".channels", channelName);
@@ -144,29 +57,33 @@ public class SortTaskConfig {
         String channelType = builder.append(prefix).append("type").toString();
         String channelClass = CommonPropertiesHolder.getString(KEY_SORT_CHANNEL_TYPE);
         flumeConf.put(channelType, channelClass);
-        this.appendCommon(flumeConf, prefix, null);
+        appendCommon(flumeConf, prefix, null, name);
     }
 
     /**
      * appendCommon
-     * 
+     *
      * @param flumeConf
      * @param prefix
      * @param componentParams
      */
-    private void appendCommon(Map<String, String> flumeConf, String prefix, Map<String, String> componentParams) {
+    private static void appendCommon(
+            Map<String, String> flumeConf,
+            String prefix,
+            Map<String, String> componentParams,
+            String name) {
         StringBuilder builder = new StringBuilder();
         String taskName = builder.append(prefix).append(KEY_TASK_NAME).toString();
         flumeConf.put(taskName, name);
         // CommonProperties
-        for (Entry<String, String> entry : CommonPropertiesHolder.get().entrySet()) {
+        for (Map.Entry<String, String> entry : CommonPropertiesHolder.get().entrySet()) {
             builder.setLength(0);
             String key = builder.append(prefix).append(entry.getKey()).toString();
             flumeConf.put(key, entry.getValue());
         }
         // componentParams
         if (componentParams != null) {
-            for (Entry<String, String> entry : componentParams.entrySet()) {
+            for (Map.Entry<String, String> entry : componentParams.entrySet()) {
                 builder.setLength(0);
                 String key = builder.append(prefix).append(entry.getKey()).toString();
                 flumeConf.put(key, entry.getValue());
@@ -176,10 +93,10 @@ public class SortTaskConfig {
 
     /**
      * appendSinks
-     * 
+     *
      * @param flumeConf
      */
-    private void appendSinks(Map<String, String> flumeConf) {
+    private static void appendSinks(Map<String, String> flumeConf, String name, Map<String, String> sinkParams) {
         // sinks
         String sinkName = name + "Sink";
         flumeConf.put(name + ".sinks", sinkName);
@@ -196,15 +113,15 @@ public class SortTaskConfig {
         String channelName = name + "Channel";
         flumeConf.put(channelKey, channelName);
         //
-        this.appendCommon(flumeConf, prefix, sinkParams);
+        appendCommon(flumeConf, prefix, sinkParams, name);
     }
 
     /**
      * appendSources
-     * 
+     *
      * @param flumeConf
      */
-    private void appendSources(Map<String, String> flumeConf) {
+    private static void appendSources(Map<String, String> flumeConf, String name) {
         // sources
         String sourceName = name + "Source";
         flumeConf.put(name + ".sources", sourceName);
@@ -225,6 +142,6 @@ public class SortTaskConfig {
         String selectorTypeKey = builder.append(prefix).append("selector.type").toString();
         flumeConf.put(selectorTypeKey, "org.apache.flume.channel.ReplicatingChannelSelector");
         //
-        this.appendCommon(flumeConf, prefix, null);
+        appendCommon(flumeConf, prefix, null, name);
     }
 }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
index c91f985..67252e0 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
@@ -28,10 +28,10 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import org.slf4j.Logger;
 
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
index 7748ae6..246e86e 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
@@ -28,8 +28,9 @@ import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.lifecycle.LifecycleSupervisor;
 import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
 import org.apache.flume.node.MaterializedConfiguration;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.apache.inlong.sort.standalone.utils.FlumeConfigGenerator;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import org.slf4j.Logger;
 
@@ -77,7 +78,7 @@ public class SortTask {
         }
 
         //
-        Map<String, String> flumeConfiguration = config.generateFlumeConfiguration();
+        Map<String, String> flumeConfiguration = FlumeConfigGenerator.generateFlumeConfiguration(config);
         LOG.info("Start sort task:{},config:{}", taskName, flumeConfiguration);
         PropertiesConfigurationProvider configurationProvider = new PropertiesConfigurationProvider(
                 config.getName(), flumeConfiguration);
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index e8f14c7..a7d0357 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -26,10 +26,10 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.inlong.common.metric.MetricRegister;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
@@ -46,6 +46,7 @@ public class SinkContext {
     public static final String KEY_MAX_THREADS = "maxThreads";
     public static final String KEY_PROCESSINTERVAL = "processInterval";
     public static final String KEY_RELOADINTERVAL = "reloadInterval";
+    public static final String KEY_TASK_NAME = "taskName";
 
     protected final String clusterId;
     protected final String taskName;
@@ -75,7 +76,7 @@ public class SinkContext {
         this.sinkContext = context;
         this.channel = channel;
         this.clusterId = context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
-        this.taskName = context.getString(SortTaskConfig.KEY_TASK_NAME);
+        this.taskName = context.getString(KEY_TASK_NAME);
         this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
         this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL, 100);
         this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index 9872ab9..aa05fba 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -26,11 +26,11 @@ import org.apache.commons.lang.ClassUtils;
 import org.apache.commons.lang.math.NumberUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
 import org.apache.inlong.sort.standalone.sink.SinkContext;
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index e89413f..1db9a2f 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -31,11 +31,11 @@ import org.apache.commons.lang.math.NumberUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.http.HttpHost;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
 import org.apache.inlong.sort.standalone.sink.SinkContext;
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSinkContext.java
index 545de7a..37a5cc1 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSinkContext.java
@@ -33,10 +33,10 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.math.NumberUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
 import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
index caf612f..b82d729 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
@@ -19,10 +19,10 @@ package org.apache.inlong.sort.standalone.sink.kafka;
 
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
 import org.apache.inlong.sort.standalone.sink.SinkContext;
 import org.apache.inlong.sort.standalone.utils.Constants;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index 9142c2c..55ab5e3 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -24,10 +24,10 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
 import org.apache.inlong.sort.standalone.sink.SinkContext;
 import org.apache.inlong.sort.standalone.utils.Constants;
 import org.slf4j.Logger;
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index 731c46d..4d67718 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -30,12 +30,12 @@ import org.apache.flume.Context;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.source.AbstractSource;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sdk.sort.api.SortClient;
 import org.apache.inlong.sdk.sort.api.SortClientConfig;
 import org.apache.inlong.sdk.sort.api.SortClientFactory;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
index 1cafb7c..3982bd6 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
@@ -31,7 +31,6 @@ import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.BufferQueueChannel;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
 import org.apache.inlong.sort.standalone.utils.Constants;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -65,7 +64,7 @@ public class TestEsSinkContext {
         PowerMockito.doNothing().when(MetricRegister.class, "register", any());
         Context context = CommonPropertiesHolder.getContext();
         String sinkName = CommonPropertiesHolder.getClusterId() + "Sink";
-        context.put(SortTaskConfig.KEY_TASK_NAME, "sid_es_es-rmrv7g7a_v3");
+        context.put("taskName", "sid_es_es-rmrv7g7a_v3");
         Channel channel = new BufferQueueChannel();
         EsSinkContext esSinkContext = new EsSinkContext(sinkName, context, channel, dispatchQueue);
         esSinkContext.reload();
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/source/sortsdk/TestSortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/source/sortsdk/TestSortSdkSource.java
index 7aee6b1..5bac874 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/source/sortsdk/TestSortSdkSource.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/source/sortsdk/TestSortSdkSource.java
@@ -19,9 +19,9 @@ package org.apache.inlong.sort.standalone.source.sortsdk;
 
 import org.apache.flume.Context;
 import org.apache.inlong.common.metric.MetricRegister;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -64,7 +64,7 @@ public class TestSortSdkSource {
     }
 
     private SortClusterConfig prepareSortClusterConfig(final int size) {
-        final SortClusterConfig testConfig = new SortClusterConfig();
+        final SortClusterConfig testConfig = SortClusterConfig.builder().build();
         testConfig.setClusterName("testConfig");
         testConfig.setSortTasks(prepareSortTaskConfig(size));
         return testConfig;
@@ -74,7 +74,7 @@ public class TestSortSdkSource {
         List<SortTaskConfig> configs = new ArrayList<>();
 
         for (int i = 0; i < size; i++) {
-            SortTaskConfig config = new SortTaskConfig();
+            SortTaskConfig config = SortTaskConfig.builder().build();
             config.setName("testConfig" + i);
             configs.add(config);
         }