You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/17 12:29:01 UTC
[incubator-inlong] branch master updated: [INLONG-3173][Sort-Standalone] Unify SortClusterConfig in manager and sort-standalone (#3174)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1594016 [INLONG-3173][Sort-Standalone] Unify SortClusterConfig in manager and sort-standalone (#3174)
1594016 is described below
commit 1594016c5dac4a4029517ced46c38198f2c87ed7
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Thu Mar 17 20:28:51 2022 +0800
[INLONG-3173][Sort-Standalone] Unify SortClusterConfig in manager and sort-standalone (#3174)
---
.../pojo/sortstandalone/SortClusterConfig.java | 36 +++--
.../pojo/sortstandalone/SortClusterResponse.java | 49 ++-----
.../common/pojo/sortstandalone/SortTaskConfig.java | 47 ++-----
.../inlong/manager/service/core/SortService.java | 6 +-
.../manager/service/core/impl/SortServiceImpl.java | 37 ++++--
.../core/impl/SortTaskSinkParamServiceImpl.java | 3 -
.../web/controller/openapi/SortController.java | 4 +-
.../config/holder/SortClusterConfigHolder.java | 4 +-
.../ClassResourceSortClusterConfigLoader.java | 4 +-
.../loader/ManagerSortClusterConfigLoader.java | 10 +-
.../config/loader/SortClusterConfigLoader.java | 2 +-
.../standalone/config/pojo/SortClusterConfig.java | 68 ----------
.../config/pojo/SortClusterResponse.java | 107 ---------------
.../FlumeConfigGenerator.java} | 145 +++++----------------
.../apache/inlong/sort/standalone/SortCluster.java | 4 +-
.../apache/inlong/sort/standalone/SortTask.java | 5 +-
.../inlong/sort/standalone/sink/SinkContext.java | 5 +-
.../sort/standalone/sink/cls/ClsSinkContext.java | 2 +-
.../sink/elasticsearch/EsSinkContext.java | 2 +-
.../sort/standalone/sink/hive/HiveSinkContext.java | 2 +-
.../sink/kafka/KafkaFederationSinkContext.java | 2 +-
.../sink/pulsar/PulsarFederationSinkContext.java | 2 +-
.../standalone/source/sortsdk/SortSdkSource.java | 2 +-
.../sink/elasticsearch/TestEsSinkContext.java | 3 +-
.../source/sortsdk/TestSortSdkSource.java | 8 +-
25 files changed, 131 insertions(+), 428 deletions(-)
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterConfig.java
similarity index 51%
copy from inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
copy to inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterConfig.java
index 333c33d..2abcc79 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterConfig.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -15,24 +15,20 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.standalone.config.loader;
+package org.apache.inlong.common.pojo.sortstandalone;
-import org.apache.flume.conf.Configurable;
-import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
-/**
- *
- * SortClusterConfigLoader
- */
-public interface SortClusterConfigLoader extends Configurable {
-
- String SORT_CLUSTER_CONFIG_TYPE = "sortClusterConfig.type";
- String SORT_CLUSTER_CONFIG_MANAGER = "sortClusterConfig.managerUrl";
+import java.util.List;
- /**
- * load
- *
- * @return
- */
- SortClusterConfig load();
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class SortClusterConfig {
+ String clusterName;
+ List<SortTaskConfig> sortTasks;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/SortClusterConfigResponse.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterResponse.java
similarity index 52%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/SortClusterConfigResponse.java
copy to inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterResponse.java
index 7d3fcf6..679031e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/SortClusterConfigResponse.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterResponse.java
@@ -15,50 +15,27 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.sort;
+package org.apache.inlong.common.pojo.sortstandalone;
-import io.swagger.annotations.ApiModel;
+import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
-
-import java.util.List;
-import java.util.Map;
+import lombok.NoArgsConstructor;
@Data
@Builder
-@ApiModel("Sort cluster config")
-public class SortClusterConfigResponse {
+@NoArgsConstructor
+@AllArgsConstructor
+public class SortClusterResponse {
+
+ public static final int SUCC = 0;
+ public static final int NOUPDATE = 1;
+ public static final int FAIL = -1;
+ public static final int REQ_PARAMS_ERROR = -101;
+
String msg;
int code;
String md5;
- List<SortTaskConfig> tasks;
-
- @Builder
- @Data
- public static class SortClusterConfig {
- String clusterName;
- List<SortTaskConfig> sortTasks;
- }
-
- @Builder
- @Data
- public static class SortTaskConfig {
- String taskName;
- SinkType sinkType;
- List<Map<String, String>> idParams;
- Map<String, String> sinkParams;
- }
+ SortClusterConfig data;
- public enum SinkType {
- /** kafka */
- KAFKA,
- /** pulsar */
- PULSAR,
- /** hive */
- HIVE,
- /** es */
- ElasticSearch,
- /** unknown */
- UNKNOWN
- }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/SortClusterConfigResponse.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortTaskConfig.java
similarity index 52%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/SortClusterConfigResponse.java
rename to inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortTaskConfig.java
index 7d3fcf6..0b44fee 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/SortClusterConfigResponse.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortTaskConfig.java
@@ -15,50 +15,23 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.sort;
+package org.apache.inlong.common.pojo.sortstandalone;
-import io.swagger.annotations.ApiModel;
+import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
import java.util.List;
import java.util.Map;
@Data
@Builder
-@ApiModel("Sort cluster config")
-public class SortClusterConfigResponse {
- String msg;
- int code;
- String md5;
- List<SortTaskConfig> tasks;
-
- @Builder
- @Data
- public static class SortClusterConfig {
- String clusterName;
- List<SortTaskConfig> sortTasks;
- }
-
- @Builder
- @Data
- public static class SortTaskConfig {
- String taskName;
- SinkType sinkType;
- List<Map<String, String>> idParams;
- Map<String, String> sinkParams;
- }
-
- public enum SinkType {
- /** kafka */
- KAFKA,
- /** pulsar */
- PULSAR,
- /** hive */
- HIVE,
- /** es */
- ElasticSearch,
- /** unknown */
- UNKNOWN
- }
+@NoArgsConstructor
+@AllArgsConstructor
+public class SortTaskConfig {
+ String name;
+ String type;
+ List<Map<String, String>> idParams;
+ Map<String, String> sinkParams;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
index 40a5421..f5743af 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core;
-import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse;
/**
@@ -36,9 +36,9 @@ public interface SortService {
*
* @param clusterName Name of sort cluster.
* @param md5 Last update md5.
- * @return Response of sort cluster config {@link SortClusterConfigResponse}
+ * @return Response of sort cluster config {@link SortClusterResponse}
*/
- SortClusterConfigResponse getClusterConfig(String clusterName, String md5);
+ SortClusterResponse getClusterConfig(String clusterName, String md5);
/**
* Get sort source config.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 4972e81..08b6566 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -19,13 +19,13 @@ package org.apache.inlong.manager.service.core.impl;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse;
import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse.CacheZone;
import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse.SortSourceConfig;
import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
-import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse.SinkType;
-import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse.SortTaskConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.manager.service.core.SortClusterConfigService;
import org.apache.inlong.manager.service.core.SortSourceService;
import org.apache.inlong.manager.service.core.SortTaskIdParamService;
@@ -61,14 +61,14 @@ public class SortServiceImpl implements SortService {
@Autowired private SortSourceService sortSourceService;
@Override
- public SortClusterConfigResponse getClusterConfig(String clusterName, String md5) {
+ public SortClusterResponse getClusterConfig(String clusterName, String md5) {
LOGGER.info("start getClusterConfig");
// check if cluster name is valid or not.
if (StringUtils.isBlank(clusterName)) {
String errMsg = "Blank cluster name, return nothing";
LOGGER.info(errMsg);
- return SortClusterConfigResponse.builder().msg(errMsg).build();
+ return SortClusterResponse.builder().msg(errMsg).build();
}
// check if there is any task.
@@ -77,7 +77,10 @@ public class SortServiceImpl implements SortService {
if (tasks == null || tasks.isEmpty()) {
String errMsg = "There is not any task for cluster" + clusterName;
LOGGER.info(errMsg);
- return SortClusterConfigResponse.builder().msg(errMsg).build();
+ return SortClusterResponse.builder()
+ .code(RESPONSE_CODE_REQ_PARAMS_ERROR)
+ .msg(errMsg)
+ .build();
}
// add task configs
@@ -87,22 +90,34 @@ public class SortServiceImpl implements SortService {
} catch (IllegalArgumentException ex) {
String errMsg = "Got illegal sink type from db, " + ex.getMessage();
LOGGER.info(errMsg);
- return SortClusterConfigResponse.builder().msg(errMsg).build();
+ return SortClusterResponse.builder()
+ .code(RESPONSE_CODE_FAIL)
+ .msg(errMsg)
+ .build();
}
- return SortClusterConfigResponse.builder().tasks(taskConfigs).msg("success").build();
+ SortClusterConfig clusterConfig = SortClusterConfig.builder()
+ .clusterName(clusterName)
+ .sortTasks(taskConfigs)
+ .build();
+
+ return SortClusterResponse.builder()
+ .code(RESPONSE_CODE_SUCCESS)
+ .data(clusterConfig)
+ .msg("success")
+ .build();
}
private SortTaskConfig getTaskConfig(SortClusterConfigEntity clusterConfig) {
- SinkType sinkType = SinkType.valueOf(clusterConfig.getSinkType().toUpperCase());
+ String sinkType = clusterConfig.getSinkType().toUpperCase();
List<Map<String, String>> idParams =
sortTaskIdParamService.selectByTaskName(clusterConfig.getTaskName());
Map<String, String> sinkParams =
sortTaskSinkParamService
.selectByTaskNameAndType(clusterConfig.getTaskName(), clusterConfig.getSinkType());
return SortTaskConfig.builder()
- .taskName(clusterConfig.getTaskName())
- .sinkType(sinkType)
+ .name(clusterConfig.getTaskName())
+ .type(sinkType)
.idParams(idParams)
.sinkParams(sinkParams)
.build();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java
index 39b7e8a..ea1bcad 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java
@@ -34,8 +34,6 @@ import java.util.Map;
public class SortTaskSinkParamServiceImpl implements SortTaskSinkParamService {
private static final Logger LOGGER = LoggerFactory.getLogger(SortTaskSinkParamServiceImpl.class);
- private static final String KEY_SINK_TYPE = "type";
-
@Autowired
private SortTaskSinkParamEntityMapper sortTaskSinkParamEntityMapper;
@@ -45,7 +43,6 @@ public class SortTaskSinkParamServiceImpl implements SortTaskSinkParamService {
List<SortTaskSinkParamEntity> taskSinkParamEntityList =
sortTaskSinkParamEntityMapper.selectByTaskNameAndType(taskName);
Map<String, String> sinkParams = new HashMap<>();
- sinkParams.put(KEY_SINK_TYPE, sinkType);
taskSinkParamEntityList.forEach(entity -> sinkParams.put(entity.getParamKey(), entity.getParamValue()));
return sinkParams;
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
index 2caa7bd..da2e55f 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.web.controller.openapi;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
-import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse;
import org.apache.inlong.manager.service.core.SortService;
import org.springframework.beans.factory.annotation.Autowired;
@@ -38,7 +38,7 @@ public class SortController {
@GetMapping("/getClusterConfig")
@ApiOperation(value = "get sort cluster config")
- public SortClusterConfigResponse getSortClusterConfig(
+ public SortClusterResponse getSortClusterConfig(
@RequestParam String clusterName,
@RequestParam String md5) {
return sortService.getClusterConfig(clusterName, md5);
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
index d593db1..42aaf0b 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
@@ -27,10 +27,10 @@ import java.util.TimerTask;
import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sort.standalone.config.loader.ClassResourceSortClusterConfigLoader;
import org.apache.inlong.sort.standalone.config.loader.SortClusterConfigLoader;
-import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
index 6193680..a9ccf63 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
@@ -21,7 +21,7 @@ import java.io.UnsupportedEncodingException;
import org.apache.commons.io.IOUtils;
import org.apache.flume.Context;
-import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;
@@ -53,7 +53,7 @@ public class ClassResourceSortClusterConfigLoader implements SortClusterConfigLo
} catch (Exception e) {
LOG.error("fail to load properties, file ={}, and e= {}", FILENAME, e);
}
- return new SortClusterConfig();
+ return SortClusterConfig.builder().build();
}
/**
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
index 249b5a4..eb9e8c1 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
@@ -28,9 +28,9 @@ import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
-import org.apache.inlong.sort.standalone.config.pojo.SortClusterResponse;
import org.slf4j.Logger;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
@@ -101,8 +101,10 @@ public class ManagerSortClusterConfigLoader implements SortClusterConfigLoader {
// get groupId <-> topic and m value.
SortClusterResponse clusterResponse = gson.fromJson(returnStr, SortClusterResponse.class);
- if (!clusterResponse.isResult()) {
- LOG.info("Fail to get config info from url:{}, error code is {}", url, clusterResponse.getErrCode());
+ int errCode = clusterResponse.getCode();
+ if (errCode != SortClusterResponse.SUCC && errCode != SortClusterResponse.NOUPDATE) {
+ LOG.info("Fail to get config info from url:{}, error code is {}, msg is {}",
+ url, clusterResponse.getCode(), clusterResponse.getMsg());
return null;
}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
index 333c33d..8081f87 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
@@ -18,7 +18,7 @@
package org.apache.inlong.sort.standalone.config.loader;
import org.apache.flume.conf.Configurable;
-import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
/**
*
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterConfig.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterConfig.java
deleted file mode 100644
index a720ae0..0000000
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterConfig.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.standalone.config.pojo;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- *
- * SortClusterConfig
- */
-public class SortClusterConfig {
-
- private String clusterName;
- private List<SortTaskConfig> sortTasks = new ArrayList<>();
-
- /**
- * get clusterName
- *
- * @return the clusterName
- */
- public String getClusterName() {
- return clusterName;
- }
-
- /**
- * set clusterName
- *
- * @param clusterName the clusterName to set
- */
- public void setClusterName(String clusterName) {
- this.clusterName = clusterName;
- }
-
- /**
- * get sortTasks
- *
- * @return the sortTasks
- */
- public List<SortTaskConfig> getSortTasks() {
- return sortTasks;
- }
-
- /**
- * set sortTasks
- *
- * @param sortTasks the sortTasks to set
- */
- public void setSortTasks(List<SortTaskConfig> sortTasks) {
- this.sortTasks = sortTasks;
- }
-
-}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java
deleted file mode 100644
index 3d0faa6..0000000
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.standalone.config.pojo;
-
-/**
- *
- * SortClusterResponse
- */
-public class SortClusterResponse {
-
- public static final int SUCC = 0;
- public static final int NOUPDATE = 1;
- public static final int FAIL = -1;
- public static final int REQ_PARAMS_ERROR = -101;
-
- private boolean result;
- private int errCode;
- private String md5;
- private SortClusterConfig data;
-
- /**
- * get result
- *
- * @return the result
- */
- public boolean isResult() {
- return result;
- }
-
- /**
- * set result
- *
- * @param result the result to set
- */
- public void setResult(boolean result) {
- this.result = result;
- }
-
- /**
- * get errCode
- *
- * @return the errCode
- */
- public int getErrCode() {
- return errCode;
- }
-
- /**
- * set errCode
- *
- * @param errCode the errCode to set
- */
- public void setErrCode(int errCode) {
- this.errCode = errCode;
- }
-
- /**
- * get md5
- *
- * @return the md5
- */
- public String getMd5() {
- return md5;
- }
-
- /**
- * set md5
- *
- * @param md5 the md5 to set
- */
- public void setMd5(String md5) {
- this.md5 = md5;
- }
-
- /**
- * get data
- *
- * @return the data
- */
- public SortClusterConfig getData() {
- return data;
- }
-
- /**
- * set data
- *
- * @param data the data to set
- */
- public void setData(SortClusterConfig data) {
- this.data = data;
- }
-}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortTaskConfig.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
similarity index 61%
rename from inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortTaskConfig.java
rename to inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
index ca99a4b..8b30759 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortTaskConfig.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -15,127 +15,40 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.standalone.config.pojo;
+package org.apache.inlong.sort.standalone.utils;
+
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.pojo.type.SortType;
-
-/**
- *
- * SortTaskConfig
- */
-public class SortTaskConfig {
+public class FlumeConfigGenerator {
public static final String KEY_TASK_NAME = "taskName";
public static final String KEY_SORT_CHANNEL_TYPE = "sortChannel.type";
public static final String KEY_SORT_SINK_TYPE = "sortSink.type";
public static final String KEY_SORT_SOURCE_TYPE = "sortSource.type";
- private String name;
- private SortType type;
- private List<Map<String, String>> idParams = new ArrayList<>();
- private Map<String, String> sinkParams = new HashMap<>();
-
- /**
- * get name
- *
- * @return the name
- */
- public String getName() {
- return name;
- }
-
- /**
- * set name
- *
- * @param name the name to set
- */
- public void setName(String name) {
- this.name = name;
- }
-
- /**
- * get type
- *
- * @return the type
- */
- public SortType getType() {
- return type;
- }
-
- /**
- * set type
- *
- * @param type the type to set
- */
- public void setType(SortType type) {
- this.type = type;
- }
-
- /**
- * get idParams
- *
- * @return the idParams
- */
- public List<Map<String, String>> getIdParams() {
- return idParams;
- }
-
- /**
- * set idParams
- *
- * @param idParams the idParams to set
- */
- public void setIdParams(List<Map<String, String>> idParams) {
- this.idParams = idParams;
- }
-
- /**
- * get sinkParams
- *
- * @return the sinkParams
- */
- public Map<String, String> getSinkParams() {
- return sinkParams;
- }
-
- /**
- * set sinkParams
- *
- * @param sinkParams the sinkParams to set
- */
- public void setSinkParams(Map<String, String> sinkParams) {
- this.sinkParams = sinkParams;
- }
-
- /**
- * getFlumeConfProperties
- *
- * @return Map
- */
- public Map<String, String> generateFlumeConfiguration() {
+ public static Map<String, String> generateFlumeConfiguration(SortTaskConfig taskConfig) {
Map<String, String> flumeConf = new HashMap<>();
+ String name = taskConfig.getName();
+ Map<String, String> sinkParams = taskConfig.getSinkParams();
// channels
- this.appendChannels(flumeConf);
+ appendChannels(flumeConf, name);
// sinks
- this.appendSinks(flumeConf);
+ appendSinks(flumeConf, name, sinkParams);
// sources
- this.appendSources(flumeConf);
+ appendSources(flumeConf, name);
return flumeConf;
}
/**
* appendChannels
- *
+ *
* @param flumeConf
*/
- private void appendChannels(Map<String, String> flumeConf) {
+ private static void appendChannels(Map<String, String> flumeConf, String name) {
StringBuilder builder = new StringBuilder();
String channelName = name + "Channel";
flumeConf.put(name + ".channels", channelName);
@@ -144,29 +57,33 @@ public class SortTaskConfig {
String channelType = builder.append(prefix).append("type").toString();
String channelClass = CommonPropertiesHolder.getString(KEY_SORT_CHANNEL_TYPE);
flumeConf.put(channelType, channelClass);
- this.appendCommon(flumeConf, prefix, null);
+ appendCommon(flumeConf, prefix, null, name);
}
/**
* appendCommon
- *
+ *
* @param flumeConf
* @param prefix
* @param componentParams
*/
- private void appendCommon(Map<String, String> flumeConf, String prefix, Map<String, String> componentParams) {
+ private static void appendCommon(
+ Map<String, String> flumeConf,
+ String prefix,
+ Map<String, String> componentParams,
+ String name) {
StringBuilder builder = new StringBuilder();
String taskName = builder.append(prefix).append(KEY_TASK_NAME).toString();
flumeConf.put(taskName, name);
// CommonProperties
- for (Entry<String, String> entry : CommonPropertiesHolder.get().entrySet()) {
+ for (Map.Entry<String, String> entry : CommonPropertiesHolder.get().entrySet()) {
builder.setLength(0);
String key = builder.append(prefix).append(entry.getKey()).toString();
flumeConf.put(key, entry.getValue());
}
// componentParams
if (componentParams != null) {
- for (Entry<String, String> entry : componentParams.entrySet()) {
+ for (Map.Entry<String, String> entry : componentParams.entrySet()) {
builder.setLength(0);
String key = builder.append(prefix).append(entry.getKey()).toString();
flumeConf.put(key, entry.getValue());
@@ -176,10 +93,10 @@ public class SortTaskConfig {
/**
* appendSinks
- *
+ *
* @param flumeConf
*/
- private void appendSinks(Map<String, String> flumeConf) {
+ private static void appendSinks(Map<String, String> flumeConf, String name, Map<String, String> sinkParams) {
// sinks
String sinkName = name + "Sink";
flumeConf.put(name + ".sinks", sinkName);
@@ -196,15 +113,15 @@ public class SortTaskConfig {
String channelName = name + "Channel";
flumeConf.put(channelKey, channelName);
//
- this.appendCommon(flumeConf, prefix, sinkParams);
+ appendCommon(flumeConf, prefix, sinkParams, name);
}
/**
* appendSources
- *
+ *
* @param flumeConf
*/
- private void appendSources(Map<String, String> flumeConf) {
+ private static void appendSources(Map<String, String> flumeConf, String name) {
// sources
String sourceName = name + "Source";
flumeConf.put(name + ".sources", sourceName);
@@ -225,6 +142,6 @@ public class SortTaskConfig {
String selectorTypeKey = builder.append(prefix).append("selector.type").toString();
flumeConf.put(selectorTypeKey, "org.apache.flume.channel.ReplicatingChannelSelector");
//
- this.appendCommon(flumeConf, prefix, null);
+ appendCommon(flumeConf, prefix, null, name);
}
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
index c91f985..67252e0 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
@@ -28,10 +28,10 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
index 7748ae6..246e86e 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
@@ -28,8 +28,9 @@ import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.lifecycle.LifecycleSupervisor;
import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
import org.apache.flume.node.MaterializedConfiguration;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.apache.inlong.sort.standalone.utils.FlumeConfigGenerator;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;
@@ -77,7 +78,7 @@ public class SortTask {
}
//
- Map<String, String> flumeConfiguration = config.generateFlumeConfiguration();
+ Map<String, String> flumeConfiguration = FlumeConfigGenerator.generateFlumeConfiguration(config);
LOG.info("Start sort task:{},config:{}", taskName, flumeConfiguration);
PropertiesConfigurationProvider configurationProvider = new PropertiesConfigurationProvider(
config.getName(), flumeConfiguration);
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index e8f14c7..a7d0357 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -26,10 +26,10 @@ import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.inlong.common.metric.MetricRegister;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
@@ -46,6 +46,7 @@ public class SinkContext {
public static final String KEY_MAX_THREADS = "maxThreads";
public static final String KEY_PROCESSINTERVAL = "processInterval";
public static final String KEY_RELOADINTERVAL = "reloadInterval";
+ public static final String KEY_TASK_NAME = "taskName";
protected final String clusterId;
protected final String taskName;
@@ -75,7 +76,7 @@ public class SinkContext {
this.sinkContext = context;
this.channel = channel;
this.clusterId = context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
- this.taskName = context.getString(SortTaskConfig.KEY_TASK_NAME);
+ this.taskName = context.getString(KEY_TASK_NAME);
this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL, 100);
this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index 9872ab9..aa05fba 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -26,11 +26,11 @@ import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.sink.SinkContext;
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index e89413f..1db9a2f 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -31,11 +31,11 @@ import org.apache.commons.lang.math.NumberUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.http.HttpHost;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.sink.SinkContext;
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSinkContext.java
index 545de7a..37a5cc1 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSinkContext.java
@@ -33,10 +33,10 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
index caf612f..b82d729 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
@@ -19,10 +19,10 @@ package org.apache.inlong.sort.standalone.sink.kafka;
import org.apache.flume.Channel;
import org.apache.flume.Context;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index 9142c2c..55ab5e3 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -24,10 +24,10 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.flume.Channel;
import org.apache.flume.Context;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.slf4j.Logger;
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index 731c46d..4d67718 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -30,12 +30,12 @@ import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.source.AbstractSource;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sdk.sort.api.SortClient;
import org.apache.inlong.sdk.sort.api.SortClientConfig;
import org.apache.inlong.sdk.sort.api.SortClientFactory;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
index 1cafb7c..3982bd6 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
@@ -31,7 +31,6 @@ import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sort.standalone.channel.BufferQueueChannel;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -65,7 +64,7 @@ public class TestEsSinkContext {
PowerMockito.doNothing().when(MetricRegister.class, "register", any());
Context context = CommonPropertiesHolder.getContext();
String sinkName = CommonPropertiesHolder.getClusterId() + "Sink";
- context.put(SortTaskConfig.KEY_TASK_NAME, "sid_es_es-rmrv7g7a_v3");
+ context.put("taskName", "sid_es_es-rmrv7g7a_v3");
Channel channel = new BufferQueueChannel();
EsSinkContext esSinkContext = new EsSinkContext(sinkName, context, channel, dispatchQueue);
esSinkContext.reload();
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/source/sortsdk/TestSortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/source/sortsdk/TestSortSdkSource.java
index 7aee6b1..5bac874 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/source/sortsdk/TestSortSdkSource.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/source/sortsdk/TestSortSdkSource.java
@@ -19,9 +19,9 @@ package org.apache.inlong.sort.standalone.source.sortsdk;
import org.apache.flume.Context;
import org.apache.inlong.common.metric.MetricRegister;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
-import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -64,7 +64,7 @@ public class TestSortSdkSource {
}
private SortClusterConfig prepareSortClusterConfig(final int size) {
- final SortClusterConfig testConfig = new SortClusterConfig();
+ final SortClusterConfig testConfig = SortClusterConfig.builder().build();
testConfig.setClusterName("testConfig");
testConfig.setSortTasks(prepareSortTaskConfig(size));
return testConfig;
@@ -74,7 +74,7 @@ public class TestSortSdkSource {
List<SortTaskConfig> configs = new ArrayList<>();
for (int i = 0; i < size; i++) {
- SortTaskConfig config = new SortTaskConfig();
+ SortTaskConfig config = SortTaskConfig.builder().build();
config.setName("testConfig" + i);
configs.add(config);
}