You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/10 13:45:36 UTC
[inlong] 03/04: [INLONG-6502][Manager] Optimize some log and code formats (#6503)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 4e511b880eb57c30a0908fb7abd9e5276a9c2219
Author: healchow <he...@gmail.com>
AuthorDate: Thu Nov 10 19:03:00 2022 +0800
[INLONG-6502][Manager] Optimize some log and code formats (#6503)
---
.../manager/common/consts/InlongConstants.java | 8 ++
.../inlong/manager/common/enums/ClusterStatus.java | 15 +--
.../inlong/manager/common/enums/GroupStatus.java | 8 +-
.../inlong/manager/dao/RestTemplateConfig.java | 5 +-
.../manager/plugin/flink/TaskRunService.java | 50 ++++----
.../plugin/listener/DeleteSortListener.java | 24 ++--
.../plugin/listener/DeleteStreamListener.java | 27 ++---
.../inlong/manager/plugin/util/FlinkUtils.java | 2 +-
.../manager/pojo/sink/mysql/MySQLSinkDTO.java | 13 +-
.../manager/service/core/SortConfigLoader.java | 32 +++--
.../manager/service/core/impl/AbstractService.java | 119 -------------------
.../service/group/InlongGroupProcessService.java | 23 ++--
.../service/heartbeat/HeartbeatManager.java | 2 +-
.../listener/queue/QueueResourceListener.java | 26 ++--
.../service/listener/sort/SortConfigListener.java | 6 +-
.../service/operationlog/OperationLogPool.java | 29 ++---
.../queue/kafka/KafkaResourceOperators.java | 23 ++--
.../resource/queue/pulsar/PulsarOperator.java | 2 +-
.../queue/pulsar/PulsarResourceOperator.java | 131 ++++++++++-----------
.../resource/sink/es/ElasticsearchConfig.java | 16 ++-
.../resource/sink/mysql/MySQLJdbcUtils.java | 88 +++++++-------
.../resource/sink/mysql/MySQLResourceOperator.java | 18 +--
.../resource/sort/DefaultSortConfigOperator.java | 13 +-
.../service/source/AbstractSourceOperator.java | 4 +-
.../service/source/SourceSnapshotOperator.java | 21 ++--
.../source/pulsar/PulsarSourceOperator.java | 1 +
.../service/stream/InlongStreamProcessService.java | 26 ++--
.../service/stream/InlongStreamServiceImpl.java | 5 +-
.../inlong/manager/service/RestTemplateConfig.java | 5 +-
.../resources/application-unit-test.properties | 6 +-
.../manager/web/auth/openapi/OpenAPIFilter.java | 2 +-
.../manager/web/auth/web/AuthenticationFilter.java | 2 +-
.../manager/web/config/RestTemplateConfig.java | 10 +-
.../src/main/resources/application-prod.properties | 3 +-
.../src/main/resources/application-test.properties | 5 +-
.../event/process/ProcessEventListener.java | 13 +-
.../workflow/processor/ServiceTaskProcessor.java | 18 +--
.../workflow/processor/StartEventProcessor.java | 24 ++--
38 files changed, 365 insertions(+), 460 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 28c2435d2..5d0b0a13d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -22,6 +22,14 @@ package org.apache.inlong.manager.common.consts;
*/
public class InlongConstants {
+ /**
+ * Thread pool related config.
+ */
+ public static final int CORE_POOL_SIZE = 10;
+ public static final int MAX_POOL_SIZE = 20;
+ public static final long ALIVE_TIME_MS = 100L;
+ public static final int QUEUE_SIZE = 10000;
+
/**
* Group config
*/
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterStatus.java
index 0cccd1589..e6f3b0d3a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterStatus.java
@@ -17,8 +17,6 @@
package org.apache.inlong.manager.common.enums;
-import org.apache.inlong.manager.common.exceptions.WorkflowException;
-
/**
* Status enum of cluster
*/
@@ -28,7 +26,7 @@ public enum ClusterStatus {
INITING(2);
- int status;
+ final int status;
ClusterStatus(int status) {
this.status = status;
@@ -37,15 +35,4 @@ public enum ClusterStatus {
public int getStatus() {
return status;
}
-
- public static ClusterStatus fromStatus(int status) {
- switch (status) {
- case 1:
- return NORMAL;
- case 2:
- return INITING;
- default:
- throw new WorkflowException("unknown status: " + status);
- }
- }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
index dafcfe747..e60b898d5 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
@@ -49,7 +49,7 @@ public enum GroupStatus {
DELETING(41, "deleting"),
DELETED(40, "deleted"),
- // GROUP_FINISH is used for batch task.
+ // FINISH is used for batch task.
FINISH(131, "finish");
private static final Map<GroupStatus, Set<GroupStatus>> GROUP_STATE_AUTOMATON = Maps.newHashMap();
@@ -60,8 +60,7 @@ public enum GroupStatus {
static {
GROUP_STATE_AUTOMATON.put(DRAFT, Sets.newHashSet(DRAFT, TO_BE_SUBMIT, DELETING));
GROUP_STATE_AUTOMATON.put(TO_BE_SUBMIT, Sets.newHashSet(TO_BE_SUBMIT, TO_BE_APPROVAL, DELETING));
- GROUP_STATE_AUTOMATON.put(TO_BE_APPROVAL,
- Sets.newHashSet(TO_BE_APPROVAL, APPROVE_REJECTED, APPROVE_PASSED, DELETING));
+ GROUP_STATE_AUTOMATON.put(TO_BE_APPROVAL, Sets.newHashSet(TO_BE_APPROVAL, APPROVE_REJECTED, APPROVE_PASSED));
GROUP_STATE_AUTOMATON.put(APPROVE_REJECTED, Sets.newHashSet(APPROVE_REJECTED, TO_BE_APPROVAL, DELETING));
GROUP_STATE_AUTOMATON.put(APPROVE_PASSED, Sets.newHashSet(APPROVE_PASSED, CONFIG_ING, DELETING));
@@ -149,6 +148,7 @@ public enum GroupStatus {
@Override
public String toString() {
- return name().toLowerCase(Locale.ROOT).replace("group_", "");
+ return name().toLowerCase(Locale.ROOT);
}
+
}
diff --git a/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/RestTemplateConfig.java b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/RestTemplateConfig.java
index 4a5e7b804..3cbb57840 100644
--- a/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/RestTemplateConfig.java
+++ b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/RestTemplateConfig.java
@@ -29,6 +29,7 @@ import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpContext;
+import org.apache.inlong.common.constant.ProtocolType;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -79,8 +80,8 @@ public class RestTemplateConfig {
public PoolingHttpClientConnectionManager httpClientConnectionManager() {
// Support HTTP, HTTPS
Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
- .register("http", PlainConnectionSocketFactory.getSocketFactory())
- .register("https", SSLConnectionSocketFactory.getSocketFactory())
+ .register(ProtocolType.HTTP, PlainConnectionSocketFactory.getSocketFactory())
+ .register(ProtocolType.HTTPS, SSLConnectionSocketFactory.getSocketFactory())
.build();
PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager(
registry);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java
index 51109e2c7..bbc9c64fd 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java
@@ -17,71 +17,61 @@
package org.apache.inlong.manager.plugin.flink;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
/**
* Task run service.
*/
public class TaskRunService {
- private static final ExecutorService executorService;
-
- private static final int CORE_POOL_SIZE = 16;
- private static final int MAXIMUM_POOL_SIZE = 32;
- private static final int QUEUE_SIZE = 10000;
- private static final long KEEP_ALIVE_TIME = 0L;
-
- static {
- executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE,
- KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(QUEUE_SIZE));
- }
+ private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+ CORE_POOL_SIZE,
+ MAX_POOL_SIZE,
+ ALIVE_TIME_MS,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(QUEUE_SIZE),
+ new ThreadFactoryBuilder().setNameFormat("inlong-plugin-%s").build(),
+ new CallerRunsPolicy());
/**
* execute
- *
- * @param runnable
*/
public static void execute(Runnable runnable) {
- executorService.execute(runnable);
+ EXECUTOR_SERVICE.execute(runnable);
}
/**
* submit
- *
- * @param runnable
- * @return
*/
public static Future<?> submit(Runnable runnable) {
- return executorService.submit(runnable);
+ return EXECUTOR_SERVICE.submit(runnable);
}
/**
* submit
- *
- * @param runnable
- * @param defaultValue
- * @param <T>
- * @return
*/
public static <T> Future<T> submit(Runnable runnable, T defaultValue) {
- return executorService.submit(runnable, defaultValue);
+ return EXECUTOR_SERVICE.submit(runnable, defaultValue);
}
/**
* submit
- *
- * @param callable
- * @param <T>
- * @return
*/
public static <T> Future<T> submit(Callable<T> callable) {
- return executorService.submit(callable);
+ return EXECUTOR_SERVICE.submit(callable);
}
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index 6e65f30dc..7eae0e9ca 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -42,7 +42,7 @@ import java.util.Map;
import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
/**
- * Listener of delete sort.
+ * Listener of delete Sort task or config.
*/
@Slf4j
public class DeleteSortListener implements SortOperateListener {
@@ -57,17 +57,17 @@ public class DeleteSortListener implements SortOperateListener {
ProcessForm processForm = context.getProcessForm();
String groupId = processForm.getInlongGroupId();
if (!(processForm instanceof GroupResourceProcessForm)) {
- log.info("not add delete group listener, not GroupResourceProcessForm for groupId [{}]", groupId);
+ log.info("not add delete group listener, not GroupResourceProcessForm for groupId={}", groupId);
return false;
}
GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
if (groupProcessForm.getGroupOperateType() != GroupOperateType.DELETE) {
- log.info("not add delete group listener, as the operate was not DELETE for groupId [{}]", groupId);
+ log.info("not add delete group listener, as the operate was not DELETE for groupId={}", groupId);
return false;
}
- log.info("add delete group listener for groupId [{}]", groupId);
+ log.info("add delete group listener for groupId={}", groupId);
return true;
}
@@ -76,7 +76,7 @@ public class DeleteSortListener implements SortOperateListener {
ProcessForm processForm = context.getProcessForm();
String groupId = processForm.getInlongGroupId();
if (!(processForm instanceof GroupResourceProcessForm)) {
- String message = String.format("process form was not GroupResourceProcessForm for groupId [%s]", groupId);
+ String message = String.format("process form was not GroupResourceProcessForm for groupId=%s", groupId);
log.error(message);
return ListenerResult.fail(message);
}
@@ -90,10 +90,8 @@ public class DeleteSortListener implements SortOperateListener {
extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
- String message = String.format("delete sort failed for groupId [%s], as the sort properties is empty",
- groupId);
- log.error(message);
- return ListenerResult.fail(message);
+ log.warn("no need to delete sort for groupId={}, as the sort properties is empty", groupId);
+ return ListenerResult.success();
}
Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
@@ -102,7 +100,7 @@ public class DeleteSortListener implements SortOperateListener {
kvConf.putAll(result);
String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
- String message = String.format("sort job id is empty for groupId [%s]", groupId);
+ String message = String.format("sort job id is empty for groupId=%s", groupId);
return ListenerResult.fail(message);
}
@@ -115,16 +113,16 @@ public class DeleteSortListener implements SortOperateListener {
FlinkOperation flinkOperation = new FlinkOperation(flinkService);
try {
flinkOperation.delete(flinkInfo);
- log.info("job delete success for [{}]", jobId);
+ log.info("job delete success for jobId={}", jobId);
return ListenerResult.success();
} catch (Exception e) {
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
flinkOperation.pollJobStatus(flinkInfo);
- String message = String.format("delete sort failed for groupId [%s] ", groupId);
+ String message = String.format("delete sort failed for groupId=%s", groupId);
log.error(message, e);
- return ListenerResult.fail(message + e.getMessage());
+ return ListenerResult.fail(message + ": " + e.getMessage());
}
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index 7c50d9738..64006410b 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -59,19 +59,19 @@ public class DeleteStreamListener implements SortOperateListener {
ProcessForm processForm = context.getProcessForm();
String groupId = processForm.getInlongGroupId();
if (!(processForm instanceof StreamResourceProcessForm)) {
- log.info("not add delete stream listener, not StreamResourceProcessForm for groupId [{}]", groupId);
+ log.info("not add delete stream listener, not StreamResourceProcessForm for groupId={}", groupId);
return false;
}
StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
if (streamProcessForm.getGroupOperateType() != GroupOperateType.DELETE) {
- log.info("not add delete stream listener, as the operate was not DELETE for groupId [{}] streamId [{}]",
+ log.info("not add delete stream listener, as the operate was not DELETE for groupId={} streamId={}",
groupId, streamId);
return false;
}
- log.info("add delete stream listener for groupId [{}] streamId [{}]", groupId, streamId);
+ log.info("add delete stream listener for groupId={} streamId={}", groupId, streamId);
return true;
}
@@ -81,26 +81,23 @@ public class DeleteStreamListener implements SortOperateListener {
StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
- log.info("inlong group :{} ext info: {}", groupInfo.getInlongGroupId(), groupExtList);
+ log.info("inlong group: {} ext info: {}", groupInfo.getInlongGroupId(), groupExtList);
+
InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo();
List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
- log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
+ log.info("inlong stream: {} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
Map<String, String> kvConf = new HashMap<>();
groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
- streamExtList.forEach(extInfo -> {
- kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
- });
+ streamExtList.forEach(extInfo -> kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue()));
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
- String message = String.format(
- "delete sort failed for groupId [%s] and streamId [%s], as the sort properties is empty",
+ log.warn("no need to delete sort for groupId={} streamId={}, as the sort properties is empty",
groupId, streamId);
- log.error(message);
- return ListenerResult.fail(message);
+ return ListenerResult.success();
}
Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
@@ -109,7 +106,7 @@ public class DeleteStreamListener implements SortOperateListener {
kvConf.putAll(result);
String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
- String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);
+ String message = String.format("sort job id is empty for groupId=%s streamId=%s", groupId, streamId);
return ListenerResult.fail(message);
}
@@ -122,14 +119,14 @@ public class DeleteStreamListener implements SortOperateListener {
FlinkOperation flinkOperation = new FlinkOperation(flinkService);
try {
flinkOperation.delete(flinkInfo);
- log.info("job delete success for [{}]", jobId);
+ log.info("job delete success for jobId={}", jobId);
return ListenerResult.success();
} catch (Exception e) {
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
flinkOperation.pollJobStatus(flinkInfo);
- String message = String.format("delete sort failed for groupId [%s] streamId [%s]", groupId, streamId);
+ String message = String.format("delete sort failed for groupId=%s streamId=%s", groupId, streamId);
log.error(message, e);
return ListenerResult.fail(message + e.getMessage());
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index 25052df49..c3daefecf 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -101,7 +101,7 @@ public class FlinkUtils {
File baseDir = new File(baseDirName);
if (!baseDir.exists() || !baseDir.isDirectory()) {
- log.error("baseDirName find fail :{}", baseDirName);
+ log.error("baseDirName find fail: {}", baseDirName);
return result;
}
String tempName;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
index 21604f777..6363a8ec1 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
@@ -145,26 +145,25 @@ public class MySQLSinkDTO {
}
String connUri = jdbcUrl.substring(pos1 + 1);
- int pos;
if (connUri.startsWith("//")) {
- if ((pos = connUri.indexOf('/', 2)) != -1) {
+ int pos = connUri.indexOf('/', 2);
+ if (pos != -1) {
database = connUri.substring(pos + 1);
}
} else {
database = connUri;
}
+ if (Strings.isNullOrEmpty(database)) {
+ throw new IllegalArgumentException("Invalid JDBC URL: " + jdbcUrl);
+ }
+
if (database.contains("?")) {
database = database.substring(0, database.indexOf("?"));
}
-
if (database.contains(";")) {
database = database.substring(0, database.indexOf(";"));
}
-
- if (Strings.isNullOrEmpty(database)) {
- throw new IllegalArgumentException("Invalid JDBC url.");
- }
return database;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
index 6b0a02fc5..7f5469aa6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
@@ -34,64 +34,76 @@ import java.util.List;
* Loader for sort service to load configs thought Cursor
*/
public interface SortConfigLoader {
+
/**
* Load all clusters by cursor
- * @return List of clusters, including MQ cluster and DataNode cluster.
+ *
+ * @return list of clusters, including MQ cluster and DataProxy cluster
*/
List<SortSourceClusterInfo> loadAllClusters();
/**
* Load stream sinks by cursor
- * @return List of Stream sinks.
+ *
+ * @return list of stream sinks
*/
List<SortSourceStreamSinkInfo> loadAllStreamSinks();
/**
* Load groups by cursor
- * @return List of group info
+ *
+ * @return list of group info
*/
List<SortSourceGroupInfo> loadAllGroup();
/**
- * Load group backup info by cursor
- * @param keyName Key name
- * @return List of group backup info
+ * Load backup group info by cursor
+ *
+ * # @param keyName key name
+ *
+ * @return list of backup group info
*/
List<InlongGroupExtEntity> loadGroupBackupInfo(String keyName);
/**
- * Load stream backup info by cursor
- * @param keyName Key name
- * @return List of stream backup info
+ * Load backup stream info by cursor
+ *
+ * @param keyName key name
+ * @return list of backup stream info
*/
List<InlongStreamExtEntity> loadStreamBackupInfo(String keyName);
/**
* Load all inlong stream info by cursor
- * @return List of stream info
+ *
+ * @return list of stream info
*/
List<SortSourceStreamInfo> loadAllStreams();
/**
* Load all inlong stream sink entity by cursor
+ *
* @return List of stream sink entity
*/
List<StreamSinkEntity> loadAllStreamSinkEntity();
/**
* Load all task info
+ *
* @return List of tasks
*/
List<SortTaskInfo> loadAllTask();
/**
* Load all data node entity
+ *
* @return List of data node
*/
List<DataNodeEntity> loadAllDataNodeEntity();
/**
* Load all fields info
+ *
* @return List of fields info
*/
List<SortFieldInfo> loadAllFields();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java
deleted file mode 100644
index 3b819f898..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.collections.CollectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Value;
-
-public abstract class AbstractService<T> implements AutoCloseable, InitializingBean {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(AbstractService.class);
-
-
- @Value("${msg.to.db.batch.size:10}")
- private int batchSize = 10;
-
- @Value("${msg.to.db.queue.size:10000}")
- private int queueSize = 10000;
-
- @Value("${msg.to.db.core.pool.size:2}")
- private int corePoolSize = 2;
-
- @Value("${msg.to.db.max.pool.size:2}")
- private int maximumPoolSize = 2;
-
- @Value("${msg.to.db.queue.pool.size:10}")
- private int syncSendQueueSize = 10;
-
- private volatile boolean isClose = false;
-
- private LinkedBlockingQueue<T> dataQueue;
-
- private ThreadPoolExecutor pool;
-
- /**
- * batch insert entities
- *
- * @param entryList entryList
- * @return boolean true/false
- */
- abstract boolean batchInsertEntities(List<T> entryList);
-
- /**
- * put Data
- *
- * @param data data
- * @return boolean true/false
- */
- public boolean putData(T data) {
- if (dataQueue == null) {
- return false;
- }
- return dataQueue.offer(data);
- }
-
- @Override
- public void close() {
- isClose = true;
- }
-
- @Override
- public void afterPropertiesSet() {
- dataQueue = new LinkedBlockingQueue(queueSize);
- pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
- 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(syncSendQueueSize),
- Executors.defaultThreadFactory());
- for (int i = 0; i < corePoolSize; i++) {
- pool.execute(new Task());
- }
- }
-
- class Task implements Runnable {
- @Override
- public void run() {
- while (!isClose) {
- try {
- List<T> entryList = new ArrayList<>();
- int count = 0;
- while (count < batchSize) {
- T data = dataQueue.poll(1, TimeUnit.MILLISECONDS);
- if (data != null) {
- entryList.add(data);
- }
- count++;
- }
- if (CollectionUtils.isNotEmpty(entryList)) {
- batchInsertEntities(entryList);
- }
- } catch (Exception e) {
- LOGGER.error("batchInsertEntities has exception = {}", e);
- }
- }
- }
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
index 8d950b706..de044f0aa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
@@ -48,6 +48,11 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
/**
* Operation to the inlong group process
*/
@@ -56,12 +61,12 @@ public class InlongGroupProcessService {
private static final Logger LOGGER = LoggerFactory.getLogger(InlongGroupProcessService.class);
- private final ExecutorService executorService = new ThreadPoolExecutor(
- 20,
- 40,
- 0L,
+ private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+ CORE_POOL_SIZE,
+ MAX_POOL_SIZE,
+ ALIVE_TIME_MS,
TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(),
+ new LinkedBlockingQueue<>(QUEUE_SIZE),
new ThreadFactoryBuilder().setNameFormat("inlong-group-process-%s").build(),
new CallerRunsPolicy());
@@ -107,7 +112,7 @@ public class InlongGroupProcessService {
groupService.updateStatus(groupId, GroupStatus.SUSPENDING.getCode(), operator);
InlongGroupInfo groupInfo = groupService.get(groupId);
GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.SUSPEND);
- executorService.execute(() -> workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form));
+ EXECUTOR_SERVICE.execute(() -> workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form));
LOGGER.info("success to suspend process asynchronously for groupId={} by operator={}", groupId, operator);
return groupId;
@@ -147,7 +152,7 @@ public class InlongGroupProcessService {
groupService.updateStatus(groupId, GroupStatus.RESTARTING.getCode(), operator);
InlongGroupInfo groupInfo = groupService.get(groupId);
GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.RESTART);
- executorService.execute(() -> workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form));
+ EXECUTOR_SERVICE.execute(() -> workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form));
LOGGER.info("success to restart process asynchronously for groupId={} by operator={}", groupId, operator);
return groupId;
@@ -181,7 +186,7 @@ public class InlongGroupProcessService {
*/
public String deleteProcessAsync(String groupId, String operator) {
LOGGER.info("begin to delete process asynchronously for groupId={} by operator={}", groupId, operator);
- executorService.execute(() -> {
+ EXECUTOR_SERVICE.execute(() -> {
try {
invokeDeleteProcess(groupId, operator);
} catch (Exception ex) {
@@ -255,7 +260,7 @@ public class InlongGroupProcessService {
List<WorkflowProcessEntity> entities = workflowQueryService.listProcessEntity(processQuery);
entities.sort(Comparator.comparingInt(WorkflowProcessEntity::getId));
WorkflowProcessEntity lastProcess = entities.get(entities.size() - 1);
- executorService.execute(() -> {
+ EXECUTOR_SERVICE.execute(() -> {
workflowService.continueProcess(lastProcess.getId(), operator, "Reset group status");
});
return true;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index 010afee02..16a96a8f1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -79,7 +79,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
}
}).build();
- // The expire time of cluster info cache must be greater than heartbeat cache
+ // The expiry time of cluster info cache must be greater than heartbeat cache
// because the eviction handler needs to query cluster info cache
clusterInfoCache = Caffeine.newBuilder()
.expireAfterAccess(expireTime * 2L, TimeUnit.SECONDS)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
index 390c11b48..152832ba1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
@@ -48,6 +48,10 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
import static org.apache.inlong.manager.common.enums.GroupOperateType.INIT;
import static org.apache.inlong.manager.common.enums.ProcessName.CREATE_STREAM_RESOURCE;
@@ -59,21 +63,23 @@ import static org.apache.inlong.manager.common.enums.ProcessName.CREATE_STREAM_R
@Service
public class QueueResourceListener implements QueueOperateListener {
+ private static final Integer TIMEOUT_SECONDS = 180;
+
private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
- 20,
- 40,
- 10L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
- new ThreadFactoryBuilder().setNameFormat("inlong-stream-process-%s").build(),
+ CORE_POOL_SIZE,
+ MAX_POOL_SIZE,
+ ALIVE_TIME_MS,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(QUEUE_SIZE),
+ new ThreadFactoryBuilder().setNameFormat("inlong-mq-process-%s").build(),
new CallerRunsPolicy());
@Autowired
private InlongGroupService groupService;
@Autowired
- private QueueResourceOperatorFactory queueOperatorFactory;
- @Autowired
private WorkflowService workflowService;
+ @Autowired
+ private QueueResourceOperatorFactory queueOperatorFactory;
@Override
public TaskEvent event() {
@@ -152,7 +158,9 @@ public class QueueResourceListener implements QueueOperateListener {
}
});
try {
- future.get(180, TimeUnit.SECONDS);
+ // wait for the current process complete before starting the next stream,
+ // otherwise, an exception is thrown and the next stream process will not be started.
+ future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (Exception e) {
String msg = "failed to execute stream process in asynchronously ";
log.error(msg, e);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
index 08a310ddb..39a3b78dc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
@@ -77,15 +77,17 @@ public class SortConfigListener implements SortOperateListener {
GroupOperateType operateType = form.getGroupOperateType();
if (operateType == GroupOperateType.SUSPEND || operateType == GroupOperateType.DELETE) {
- LOGGER.info("not build sort config for groupId={}, as the group operate type={}", groupId, operateType);
+ LOGGER.info("no need to build sort config for groupId={} as the operate type is {}", groupId, operateType);
return ListenerResult.success();
}
+
InlongGroupInfo groupInfo = form.getGroupInfo();
List<InlongStreamInfo> streamInfos = form.getStreamInfos();
if (CollectionUtils.isEmpty(streamInfos)) {
- LOGGER.warn("do not build sort config for groupId={}, as the stream is empty", groupId);
+ LOGGER.warn("no need to build sort config for groupId={}, as not found any stream", groupId);
return ListenerResult.success();
}
+
int sinkCount = streamInfos.stream()
.map(stream -> stream.getSinkList() == null ? 0 : stream.getSinkList().size())
.reduce(0, Integer::sum);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogPool.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogPool.java
index 85b8e7eb7..cf7bd4828 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogPool.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogPool.java
@@ -36,6 +36,9 @@ import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
/**
* Operation log thread pool
*/
@@ -43,24 +46,23 @@ import java.util.stream.IntStream;
@Component
public class OperationLogPool {
- private static final int BUFFER_SIZE = 500;
- private static final int MAX_WAIT_TIME_SECOND = 30;
- private static final int MAX_QUEUE_SIZE = 10000;
+ private static final int TIMEOUT_SECOND = 30;
private static final int THREAD_NUM = 3;
- private static final ArrayBlockingQueue<OperationLogEntity> OPERATION_POOL = new ArrayBlockingQueue<>(
- MAX_QUEUE_SIZE);
+ private static final int BUFFER_SIZE = 500;
+
+ private static final ArrayBlockingQueue<OperationLogEntity> OPERATION_POOL = new ArrayBlockingQueue<>(QUEUE_SIZE);
- private final ExecutorService executorService = new ThreadPoolExecutor(
+ private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
THREAD_NUM,
THREAD_NUM,
- 0L,
+ ALIVE_TIME_MS,
TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(),
- new ThreadFactoryBuilder().setNameFormat("async-operation-log-%s").build(),
+ new LinkedBlockingQueue<>(QUEUE_SIZE),
+ new ThreadFactoryBuilder().setNameFormat("inlong-operation-log-%s").build(),
new CallerRunsPolicy());
@Autowired
- private OperationLogEntityMapper operationLogEntityMapper;
+ private OperationLogEntityMapper operationLogMapper;
public static void publish(OperationLogEntity operation) {
if (!OPERATION_POOL.offer(operation)) {
@@ -71,7 +73,7 @@ public class OperationLogPool {
@PostConstruct
public void init() {
IntStream.range(0, THREAD_NUM).forEach(
- i -> executorService.submit(this::saveOperationLog)
+ i -> EXECUTOR_SERVICE.submit(this::saveOperationLog)
);
}
@@ -79,14 +81,13 @@ public class OperationLogPool {
List<OperationLogEntity> buffer = new ArrayList<>(BUFFER_SIZE);
while (true) {
buffer.clear();
- int size = 0;
try {
- size = Queues.drain(OPERATION_POOL, buffer, BUFFER_SIZE, MAX_WAIT_TIME_SECOND, TimeUnit.SECONDS);
+ int size = Queues.drain(OPERATION_POOL, buffer, BUFFER_SIZE, TIMEOUT_SECOND, TimeUnit.SECONDS);
if (buffer.isEmpty()) {
continue;
}
long startTime = System.currentTimeMillis();
- operationLogEntityMapper.insertBatch(buffer);
+ operationLogMapper.insertBatch(buffer);
log.info("receive {} logs and saved cost {} ms", size, System.currentTimeMillis() - startTime);
} catch (InterruptedException e) {
log.error("save operation log interrupted", e);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
index 2f65aa735..2157acb1d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
@@ -51,14 +51,14 @@ public class KafkaResourceOperators implements QueueResourceOperator {
*/
private static final String KAFKA_CONSUMER_GROUP = "%s_%s_consumer_group";
- @Autowired
- private InlongClusterService clusterService;
@Autowired
private KafkaOperator kafkaOperator;
@Autowired
private InlongStreamService streamService;
@Autowired
private InlongConsumeService consumeService;
+ @Autowired
+ private InlongClusterService clusterService;
@Override
public boolean accept(String mqType) {
@@ -67,7 +67,8 @@ public class KafkaResourceOperators implements QueueResourceOperator {
@Override
public void createQueueForGroup(@NotNull InlongGroupInfo groupInfo, @NotBlank String operator) {
- log.info("skip to create kafka topic for groupId={}", groupInfo.getInlongGroupId());
+ log.info("skip to create kafka topic for groupId={}, just create in each inlong stream",
+ groupInfo.getInlongGroupId());
}
@Override
@@ -84,7 +85,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
return;
}
for (InlongStreamBriefInfo streamInfo : streamInfoList) {
- this.deleteKafkaTopic(groupInfo, streamInfo.getInlongStreamId());
+ this.deleteKafkaTopic(groupInfo, streamInfo.getMqResource());
}
} catch (Exception e) {
log.error("failed to delete kafka resource for groupId=" + groupId, e);
@@ -94,8 +95,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
}
@Override
- public void createQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
- String operator) {
+ public void createQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, String operator) {
Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
Preconditions.checkNotNull(streamInfo, "inlong stream info cannot be null");
Preconditions.checkNotNull(operator, "operator cannot be null");
@@ -117,8 +117,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
}
@Override
- public void deleteQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
- String operator) {
+ public void deleteQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, String operator) {
Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
Preconditions.checkNotNull(streamInfo, "inlong stream info cannot be null");
@@ -141,7 +140,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
* Create Kafka Topic and Subscription, and save the consumer group info.
*/
private void createKafkaTopic(InlongKafkaInfo kafkaInfo, String topicName) throws Exception {
- // 1. create kafka topic
+ // create Kafka topic
ClusterInfo clusterInfo = clusterService.getOne(kafkaInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
kafkaOperator.createTopic(kafkaInfo, (KafkaClusterInfo) clusterInfo, topicName);
@@ -153,7 +152,8 @@ public class KafkaResourceOperators implements QueueResourceOperator {
}
// Kafka consumers do not need to register in advance
- // 2. insert the consumer group info
+
+ // save the consumer group info for the Kafka topic
String consumeGroup = String.format(KAFKA_CONSUMER_GROUP, kafkaInfo.getInlongClusterTag(), topicName);
Integer id = consumeService.saveBySystem(kafkaInfo, topicName, consumeGroup);
log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}",
@@ -163,9 +163,8 @@ public class KafkaResourceOperators implements QueueResourceOperator {
/**
* Delete Kafka Topic and Subscription, and delete the consumer group info.
*/
- private void deleteKafkaTopic(InlongGroupInfo groupInfo, String streamId) {
+ private void deleteKafkaTopic(InlongGroupInfo groupInfo, String topicName) {
ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
- String topicName = groupInfo.getInlongGroupId() + "_" + streamId;
kafkaOperator.forceDeleteTopic((KafkaClusterInfo) clusterInfo, topicName);
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 3ebeeca37..b82074d74 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -353,7 +353,7 @@ public class PulsarOperator {
LOGGER.error("check if the subscription exists for topic={} error, continue retry", topic, e);
if (count == RETRY_TIMES) {
LOGGER.error("after {} times retry, still check subscription exception for topic {}", count, topic);
- throw new BusinessException("check if the subscription exists error");
+ throw new BusinessException("check if the subscription exists error: " + e.getMessage());
}
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index 2e379b285..69a38ec6e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
@@ -44,7 +45,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
-import java.util.stream.Collectors;
/**
* Operator for create Pulsar Tenant, Namespace, Topic and Subscription
@@ -80,72 +80,67 @@ public class PulsarResourceOperator implements QueueResourceOperator {
Preconditions.checkNotNull(operator, "operator cannot be null");
String groupId = groupInfo.getInlongGroupId();
- log.info("begin to create pulsar resource for groupId={}", groupId);
+ String clusterTag = groupInfo.getInlongClusterTag();
+ log.info("begin to create pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag);
// get pulsar cluster via the inlong cluster tag from the inlong group
- String clusterTag = groupInfo.getInlongClusterTag();
- List<PulsarClusterInfo> pulsarClusters =
- clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR).stream()
- .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
- .collect(Collectors.toList());
- for (PulsarClusterInfo pulsarCluster : pulsarClusters) {
+ List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
+ for (ClusterInfo clusterInfo : clusterInfos) {
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
- String clusterName = pulsarCluster.getName();
// create pulsar tenant and namespace
String tenant = pulsarCluster.getTenant();
if (StringUtils.isEmpty(tenant)) {
tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
}
- String namespace = groupInfo.getMqResource();
- InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+
// if the group was not successful, need create tenant and namespace
if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) {
pulsarOperator.createTenant(pulsarAdmin, tenant);
- log.info("success to create pulsar tenant for groupId={}, tenant={}, cluster={}",
- groupId, tenant, clusterName);
- pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
- log.info("success to create pulsar namespace for groupId={}, namespace={}, cluster={}",
- groupId, namespace, clusterName);
+ String namespace = groupInfo.getMqResource();
+ pulsarOperator.createNamespace(pulsarAdmin, (InlongPulsarInfo) groupInfo, tenant, namespace);
+
+ log.info("success to create pulsar resource for groupId={}, tenant={}, namespace={}, cluster={}",
+ groupId, tenant, namespace, pulsarCluster);
}
} catch (Exception e) {
- String msg = String.format("failed to create pulsar resource for groupId=%s, cluster=%s", groupId,
- pulsarCluster.toString());
- log.error(msg, e);
+ String msg = "failed to create pulsar resource for groupId=" + groupId;
+ log.error(msg + ", cluster=" + pulsarCluster, e);
throw new WorkflowListenerException(msg + ": " + e.getMessage());
}
}
- log.info("success to create pulsar resource for groupId={}", groupId);
+
+ log.info("success to create pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag);
}
@Override
public void deleteQueueForGroup(InlongGroupInfo groupInfo, String operator) {
Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
-
String groupId = groupInfo.getInlongGroupId();
- log.info("begin to delete pulsar resource for groupId={}", groupId);
-
- List<PulsarClusterInfo> pulsarClusters =
- clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
- .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
- .collect(Collectors.toList());
- for (PulsarClusterInfo clusterInfo : pulsarClusters) {
- String clusterName = clusterInfo.getName();
+ String clusterTag = groupInfo.getInlongClusterTag();
+ log.info("begin to delete pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag);
+
+ List<InlongStreamBriefInfo> streamInfos = streamService.getTopicList(groupId);
+ if (CollectionUtils.isEmpty(streamInfos)) {
+ log.warn("skip to delete pulsar resource as no streams for groupId={}", groupId);
+ return;
+ }
+
+ List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
+ for (ClusterInfo clusterInfo : clusterInfos) {
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
try {
- List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
- if (streamInfoList == null || streamInfoList.isEmpty()) {
- log.warn("skip to create pulsar topic and subscription as no streams for groupId={}, cluster={}",
- groupId, clusterName);
- return;
- }
- for (InlongStreamBriefInfo streamInfo : streamInfoList) {
- this.deletePulsarTopic(groupInfo, clusterInfo, streamInfo.getMqResource());
+ for (InlongStreamBriefInfo streamInfo : streamInfos) {
+ this.deletePulsarTopic(groupInfo, pulsarCluster, streamInfo.getMqResource());
}
} catch (Exception e) {
- log.error("failed to delete pulsar resource for groupId={}, cluster={}", groupId, clusterName, e);
- throw new WorkflowListenerException("failed to delete pulsar resource: " + e.getMessage());
+ String msg = "failed to delete pulsar resource for groupId=" + groupId;
+ log.error(msg + ", cluster=" + pulsarCluster, e);
+ throw new WorkflowListenerException(msg + ": " + e.getMessage());
}
}
- log.info("success to delete pulsar resource for groupId={}", groupId);
+
+ log.info("success to delete pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag);
}
@Override
@@ -156,22 +151,25 @@ public class PulsarResourceOperator implements QueueResourceOperator {
String groupId = streamInfo.getInlongGroupId();
String streamId = streamInfo.getInlongStreamId();
- log.info("begin to create pulsar resource for groupId={}, streamId={}", groupId, streamId);
+ String clusterTag = groupInfo.getInlongClusterTag();
+ log.info("begin to create pulsar resource for groupId={}, streamId={}, clusterTag={}",
+ groupId, streamId, clusterTag);
- List<PulsarClusterInfo> pulsarClusters =
- clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
- .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
- .collect(Collectors.toList());
- for (PulsarClusterInfo pulsarCluster : pulsarClusters) {
+ // get pulsar cluster via the inlong cluster tag from the inlong group
+ List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
+ for (ClusterInfo clusterInfo : clusterInfos) {
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
try {
// create pulsar topic and subscription
- this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource());
- this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster,
- streamInfo.getMqResource(), streamId);
+ String topicName = streamInfo.getMqResource();
+ this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, topicName);
+ this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster, topicName, streamId);
+
+ log.info("success to create pulsar resource for groupId={}, streamId={}, topic={}, cluster={}",
+ groupId, streamId, topicName, pulsarCluster);
} catch (Exception e) {
- String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s, cluster=%s",
- groupId, streamId, pulsarCluster.getName());
- log.error(msg, e);
+ String msg = "failed to create pulsar resource for groupId=" + groupId + ", streamId=" + streamId;
+ log.error(msg + ", cluster=" + pulsarCluster, e);
throw new WorkflowListenerException(msg + ": " + e.getMessage());
}
}
@@ -186,25 +184,24 @@ public class PulsarResourceOperator implements QueueResourceOperator {
String groupId = streamInfo.getInlongGroupId();
String streamId = streamInfo.getInlongStreamId();
- log.info("begin to delete pulsar resource for groupId={} streamId={}", groupId, streamId);
-
- List<PulsarClusterInfo> pulsarClusters =
- clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
- .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
- .collect(Collectors.toList());
- for (PulsarClusterInfo clusterInfo : pulsarClusters) {
- String clusterName = clusterInfo.getName();
+ String clusterTag = groupInfo.getInlongClusterTag();
+ log.info("begin to delete pulsar resource for groupId={}, streamId={}, clusterTag={}",
+ groupId, streamId, clusterTag);
+
+ List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
+ for (ClusterInfo clusterInfo : clusterInfos) {
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
try {
- this.deletePulsarTopic(groupInfo, clusterInfo, streamInfo.getMqResource());
- log.info("success to delete pulsar topic for groupId={}, streamId={}, cluster={}",
- groupId, streamId, clusterName);
+ this.deletePulsarTopic(groupInfo, pulsarCluster, streamInfo.getMqResource());
+ log.info("success to delete pulsar topic for groupId={}, streamId={}, topic={}, cluster={}",
+ groupId, streamId, streamInfo.getMqResource(), pulsarCluster);
} catch (Exception e) {
- String msg = String.format("failed to delete pulsar topic for groupId=%s, streamId=%s, cluster=%s",
- groupId, streamId, clusterName);
- log.error(msg, e);
- throw new WorkflowListenerException(msg);
+ String msg = "failed to delete pulsar topic for groupId=" + groupId + ", streamId=" + streamId;
+ log.error(msg + ", cluster=" + pulsarCluster, e);
+ throw new WorkflowListenerException(msg + ": " + e.getMessage());
}
}
+
log.info("success to delete pulsar resource for groupId={}, streamId={}", groupId, streamId);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
index 6cccff706..c2df68f05 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
@@ -17,17 +17,14 @@
package org.apache.inlong.manager.service.resource.sink.es;
-import java.util.ArrayList;
-import java.util.List;
-
import lombok.Data;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.inlong.common.constant.ProtocolType;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
@@ -36,6 +33,9 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* Elasticsearch config information, including host, port, etc.
*/
@@ -43,6 +43,8 @@ import org.springframework.stereotype.Component;
@Component
public class ElasticsearchConfig {
+ private static final Logger logger = LoggerFactory.getLogger(ElasticsearchConfig.class);
+ private static RestHighLevelClient highLevelClient;
@Value("${es.index.search.hostname}")
private String host;
@Value("${es.index.search.port}")
@@ -54,10 +56,6 @@ public class ElasticsearchConfig {
@Value("${es.auth.password}")
private String password;
- private static final Logger logger = LoggerFactory.getLogger(ElasticsearchConfig.class);
-
- private static RestHighLevelClient highLevelClient;
-
/**
* highLevelClient
*
@@ -75,7 +73,7 @@ public class ElasticsearchConfig {
for (String host : hostArrays) {
if (StringUtils.isNotBlank(host)) {
host = host.trim();
- hosts.add(new HttpHost(host, port, "http"));
+ hosts.add(new HttpHost(host, port, ProtocolType.HTTP));
}
}
RestClientBuilder clientBuilder = RestClient.builder(hosts.toArray(new HttpHost[0]));
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java
index 441779c5c..d463ca9ab 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java
@@ -38,10 +38,8 @@ import java.util.Objects;
public class MySQLJdbcUtils {
private static final String MYSQL_JDBC_PREFIX = "jdbc:mysql";
-
private static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
-
- private static final Logger LOG = LoggerFactory.getLogger(MySQLJdbcUtils.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(MySQLJdbcUtils.class);
/**
* Get MySQL connection from the url and user.
@@ -52,46 +50,46 @@ public class MySQLJdbcUtils {
* @return {@link Connection}
* @throws Exception on get connection error
*/
- public static Connection getConnection(final String url, final String user, final String password)
- throws Exception {
+ public static Connection getConnection(String url, String user, String password) throws Exception {
if (StringUtils.isBlank(url) || !url.startsWith(MYSQL_JDBC_PREFIX)) {
- throw new Exception("MySQL server URL was invalid, it should start with jdbc:mysql");
+ throw new Exception("MySQL JDBC URL was invalid, it should start with jdbc:mysql");
}
+
Connection conn;
try {
Class.forName(MYSQL_DRIVER_CLASS);
conn = DriverManager.getConnection(url, user, password);
} catch (Exception e) {
- final String errorMsg = "get MySQL connection error, please check MySQL JDBC url, username or password!";
- LOG.error(errorMsg, e);
+ String errorMsg = "get MySQL connection error, please check MySQL JDBC url, username or password!";
+ LOGGER.error(errorMsg, e);
throw new Exception(errorMsg + " other error msg: " + e.getMessage());
}
if (Objects.isNull(conn)) {
throw new Exception("get MySQL connection failed, please contact administrator.");
}
- LOG.info("get MySQL connection success, url={}", url);
+ LOGGER.info("get MySQL connection success for url={}", url);
return conn;
}
/**
* Execute SQL command on MySQL.
*
- * @param conn JDBC Connection {@link Connection}
- * @param sql SQL string to be executed
+ * @param conn JDBC {@link Connection}
+ * @param sql SQL to be executed
* @throws Exception on execute SQL error
*/
public static void executeSql(final Connection conn, final String sql) throws Exception {
try (Statement stmt = conn.createStatement()) {
stmt.execute(sql);
- LOG.info("execute sql [{}] success", sql);
+ LOGGER.info("execute sql [{}] success", sql);
}
}
/**
* Execute batch query SQL on MySQL.
*
- * @param conn JDBC Connection {@link Connection}
- * @param sqls SQL string to be executed
+ * @param conn JDBC {@link Connection}
+ * @param sqls SQL to be executed
* @throws Exception on get execute SQL batch error
*/
public static void executeSqlBatch(final Connection conn, final List<String> sqls) throws Exception {
@@ -101,7 +99,7 @@ public class MySQLJdbcUtils {
stmt.execute(entry);
}
conn.commit();
- LOG.info("execute sql [{}] success", sqls);
+ LOGGER.info("execute sql [{}] success", sqls);
} finally {
conn.setAutoCommit(true);
}
@@ -110,7 +108,7 @@ public class MySQLJdbcUtils {
/**
* Create MySQL database
*
- * @param conn JDBC Connection {@link Connection}
+ * @param conn JDBC {@link Connection}
* @param dbName database name
* @throws Exception on create database error
*/
@@ -118,58 +116,58 @@ public class MySQLJdbcUtils {
if (!checkDbExist(conn, dbName)) {
final String createDbSql = MySQLSqlBuilder.buildCreateDbSql(dbName);
executeSql(conn, createDbSql);
- LOG.info("execute sql [{}] success", createDbSql);
+ LOGGER.info("execute sql [{}] success", createDbSql);
} else {
- LOG.info("The database [{}] are exists", dbName);
+ LOGGER.info("The database [{}] are exists", dbName);
}
}
/**
* Check database from the MySQL information_schema.
*
- * @param conn JDBC Connection {@link Connection}
- * @param dbName MySQL database name
+ * @param conn JDBC {@link Connection}
+ * @param dbName database name
* @return true if table exist, otherwise false
* @throws Exception on check database exist error
*/
public static boolean checkDbExist(final Connection conn, final String dbName) throws Exception {
- boolean result = false;
final String checkDbSql = MySQLSqlBuilder.getCheckDatabase(dbName);
try (Statement stmt = conn.createStatement();
ResultSet resultSet = stmt.executeQuery(checkDbSql)) {
if (Objects.nonNull(resultSet)) {
if (resultSet.next()) {
+ LOGGER.info("check db exist for db={}, result=true", dbName);
return true;
}
}
}
- LOG.info("check db exist for db={}, result={}", dbName, result);
- return result;
+ LOGGER.info("check db exist for db={}, result=false", dbName);
+ return false;
}
/**
* Create MySQL table by MySQLTableInfo
*
- * @param conn JDBC Connection {@link Connection}
- * @param tableInfo MySQL table info {@link MySQLTableInfo}
+ * @param conn JDBC {@link Connection}
+ * @param tableInfo table info {@link MySQLTableInfo}
* @throws Exception on create table error
*/
public static void createTable(final Connection conn, final MySQLTableInfo tableInfo) throws Exception {
if (checkTablesExist(conn, tableInfo.getDbName(), tableInfo.getTableName())) {
- LOG.info("The table [{}] are exists", tableInfo.getTableName());
+ LOGGER.info("The table [{}] are exists", tableInfo.getTableName());
} else {
final String createTableSql = MySQLSqlBuilder.buildCreateTableSql(tableInfo);
executeSql(conn, createTableSql);
- LOG.info("execute sql [{}] success", createTableSql);
+ LOGGER.info("execute sql [{}] success", createTableSql);
}
}
/**
* Check tables from the MySQL information_schema.
*
- * @param conn JDBC Connection {@link Connection}
- * @param dbName MySQL database name
- * @param tableName MySQL table name
+ * @param conn JDBC {@link Connection}
+ * @param dbName database name
+ * @param tableName table name
* @return true if table exist, otherwise false
* @throws Exception on check table exist error
*/
@@ -185,23 +183,22 @@ public class MySQLJdbcUtils {
}
}
}
- LOG.info("check table exist for db={} table={}, result={}", dbName, tableName, result);
+ LOGGER.info("check table exist for db={} table={}, result={}", dbName, tableName, result);
return result;
}
/**
- * Check whether the column exists in the table.
+ * Check whether the column exists in the MySQL table.
*
* @param conn JDBC Connection {@link Connection}
- * @param dbName MySQL database name
- * @param tableName MySQL table name
- * @param column MySQL table column name
+ * @param dbName database name
+ * @param tableName table name
+ * @param column table column name
* @return true if column exist in the table, otherwise false
* @throws Exception on check column exist error
*/
public static boolean checkColumnExist(final Connection conn, final String dbName, final String tableName,
- final String column)
- throws Exception {
+ final String column) throws Exception {
boolean result = false;
final String checkTableSql = MySQLSqlBuilder.getCheckColumn(dbName, tableName, column);
try (Statement stmt = conn.createStatement();
@@ -212,16 +209,16 @@ public class MySQLJdbcUtils {
}
}
}
- LOG.info("check column exist for db={} table={}, result={} column={}", dbName, tableName, result, column);
+ LOGGER.info("check column exist for db={} table={}, result={} column={}", dbName, tableName, result, column);
return result;
}
/**
- * Query all columns of the tableName.
+ * Query all MySQL table columns by the given tableName.
*
- * @param conn JDBC Connection {@link Connection}
- * @param dbName MySQL database name
- * @param tableName MySQL table name
+ * @param conn JDBC {@link Connection}
+ * @param dbName database name
+ * @param tableName table name
* @return {@link List}
* @throws Exception on get columns error
*/
@@ -247,9 +244,9 @@ public class MySQLJdbcUtils {
* Add columns for MySQL table.
*
* @param conn JDBC Connection {@link Connection}
- * @param dbName MySQL database name
- * @param tableName MySQL table name
- * @param columns MySQL columns to be added
+ * @param dbName database name
+ * @param tableName table name
+ * @param columns columns to be added
* @throws Exception on add columns error
*/
public static void addColumns(final Connection conn, final String dbName, final String tableName,
@@ -264,4 +261,5 @@ public class MySQLJdbcUtils {
final List<String> addColumnSql = MySQLSqlBuilder.buildAddColumnsSql(dbName, tableName, columnInfos);
executeSqlBatch(conn, addColumnSql);
}
+
}
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
index d60232ac0..6c859cf2a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
@@ -19,15 +19,15 @@ package org.apache.inlong.manager.service.resource.sink.mysql;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLColumnInfo;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLTableInfo;
-import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
-import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.slf4j.Logger;
@@ -90,19 +90,19 @@ public class MySQLResourceOperator implements SinkResourceOperator {
columnList.add(columnInfo);
}
- final MySQLSinkDTO mySQLSink = MySQLSinkDTO.getFromJson(sinkInfo.getExtParams());
- final MySQLTableInfo tableInfo = MySQLSinkDTO.getTableInfo(mySQLSink, columnList);
-
- try (Connection conn = MySQLJdbcUtils.getConnection(mySQLSink.getJdbcUrl(), mySQLSink.getUsername(),
- mySQLSink.getPassword())) {
+ MySQLSinkDTO sinkDTO = MySQLSinkDTO.getFromJson(sinkInfo.getExtParams());
+ MySQLTableInfo tableInfo = MySQLSinkDTO.getTableInfo(sinkDTO, columnList);
+ try (Connection conn = MySQLJdbcUtils.getConnection(sinkDTO.getJdbcUrl(), sinkDTO.getUsername(),
+ sinkDTO.getPassword())) {
// 1. create database if not exists
MySQLJdbcUtils.createDb(conn, tableInfo.getDbName());
// 2. table not exists, create it
MySQLJdbcUtils.createTable(conn, tableInfo);
// 3. table exists, add columns - skip the exists columns
MySQLJdbcUtils.addColumns(conn, tableInfo.getDbName(), tableInfo.getTableName(), columnList);
+
// 4. update the sink status to success
- final String info = "success to create MySQL resource";
+ String info = "success to create MySQL resource";
sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
LOG.info(info + " for sinkInfo={}", sinkInfo);
} catch (Throwable e) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 2449b835e..08ec23226 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -78,16 +78,16 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
public void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, boolean isStream)
throws Exception {
if (isStream) {
- LOGGER.warn("stream workflow no need to build sort config for disable zk");
+ LOGGER.warn("no need to build sort config for stream process when disable zk");
return;
}
if (groupInfo == null || CollectionUtils.isEmpty(streamInfos)) {
- LOGGER.warn("group info is null or stream infos is empty, no need to build sort config for disable zk");
+ LOGGER.warn("no need to build sort config as the group is null or streams is empty when disable zk");
return;
}
- GroupInfo configInfo = this.getGroupInfo(groupInfo, streamInfos);
- String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
+ GroupInfo sortConfigInfo = this.getGroupInfo(groupInfo, streamInfos);
+ String dataflow = OBJECT_MAPPER.writeValueAsString(sortConfigInfo);
this.addToGroupExt(groupInfo, dataflow);
if (LOGGER.isDebugEnabled()) {
@@ -127,9 +127,8 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
if (InlongConstants.STANDARD_MODE.equals(groupInfo.getLightweight())) {
if (CollectionUtils.isNotEmpty(transformResponses)) {
relations = NodeRelationUtils.createNodeRelations(inlongStream);
-
- // in standard mode, replace upstream source node and transform input fields node to mq node
- // mq node name, which is inlong stream id
+ // in standard mode, replace upstream source node and transform input fields node
+ // to MQ node (which is inlong stream id)
String mqNodeName = sources.get(0).getSourceName();
Set<String> nodeNameSet = getInputNodeNames(sources, transformResponses);
adjustTransformField(transformResponses, nodeNameSet, mqNodeName);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 6247822b3..a3e68b8b5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -236,7 +236,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
}
}
- private void updateFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) {
+ protected void updateFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) {
Integer sourceId = entity.getId();
if (CollectionUtils.isEmpty(fieldInfos)) {
return;
@@ -270,7 +270,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
streamFieldMapper.insertAll(list);
}
- private void saveFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) {
+ protected void saveFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) {
LOGGER.info("begin to save source fields={}", fieldInfos);
if (CollectionUtils.isEmpty(fieldInfos)) {
return;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperator.java
index ddcdc0dde..e94cf2276 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperator.java
@@ -36,7 +36,6 @@ import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -44,6 +43,11 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
/**
* Operate the source snapshot
*/
@@ -51,14 +55,15 @@ import java.util.concurrent.TimeUnit;
public class SourceSnapshotOperator implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(SourceSnapshotOperator.class);
- private final ExecutorService executorService = new ThreadPoolExecutor(
- 1,
- 1,
- 10L,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(100),
+ private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+ CORE_POOL_SIZE,
+ MAX_POOL_SIZE,
+ ALIVE_TIME_MS,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(QUEUE_SIZE),
new ThreadFactoryBuilder().setNameFormat("stream-source-snapshot-%s").build(),
new CallerRunsPolicy());
+
@Autowired
private StreamSourceEntityMapper sourceMapper;
@@ -101,7 +106,7 @@ public class SourceSnapshotOperator implements AutoCloseable {
snapshotQueue = new LinkedBlockingQueue<>(queueSize);
}
SaveSnapshotTaskRunnable taskRunnable = new SaveSnapshotTaskRunnable();
- this.executorService.execute(taskRunnable);
+ EXECUTOR_SERVICE.execute(taskRunnable);
LOGGER.info("source snapshot operate thread started successfully");
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 12ebe6200..c4af55daa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -142,6 +142,7 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
&& StringUtils.isNotEmpty(sourceInfo.getSerializationType())) {
pulsarSource.setSerializationType(sourceInfo.getSerializationType());
}
+ // currently, only reuse the primary key from Kafka source
if (SourceType.KAFKA.equals(sourceInfo.getSourceType())) {
pulsarSource.setPrimaryKey(((KafkaSource) sourceInfo).getPrimaryKey());
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
index eaa05ba41..43fff8cdd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
@@ -42,6 +42,11 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
/**
* Operation related to inlong stream process
*/
@@ -49,12 +54,12 @@ import java.util.concurrent.TimeUnit;
@Service
public class InlongStreamProcessService {
- private final ExecutorService executorService = new ThreadPoolExecutor(
- 20,
- 40,
- 0L,
+ private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+ CORE_POOL_SIZE,
+ MAX_POOL_SIZE,
+ ALIVE_TIME_MS,
TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(),
+ new LinkedBlockingQueue<>(QUEUE_SIZE),
new ThreadFactoryBuilder().setNameFormat("inlong-stream-process-%s").build(),
new CallerRunsPolicy());
@@ -100,7 +105,7 @@ public class InlongStreamProcessService {
ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus();
return processStatus == ProcessStatus.COMPLETED;
} else {
- executorService.execute(() -> workflowService.start(processName, operator, processForm));
+ EXECUTOR_SERVICE.execute(() -> workflowService.start(processName, operator, processForm));
return true;
}
}
@@ -143,7 +148,7 @@ public class InlongStreamProcessService {
ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus();
return processStatus == ProcessStatus.COMPLETED;
} else {
- executorService.execute(() -> workflowService.start(processName, operator, processForm));
+ EXECUTOR_SERVICE.execute(() -> workflowService.start(processName, operator, processForm));
return true;
}
}
@@ -158,8 +163,7 @@ public class InlongStreamProcessService {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
}
GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
- if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL
- && groupStatus != GroupStatus.RESTARTED) {
+ if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL && groupStatus != GroupStatus.RESTARTED) {
throw new BusinessException(
String.format("group status=%s not support restart stream for groupId=%s", groupStatus, groupId));
}
@@ -185,7 +189,7 @@ public class InlongStreamProcessService {
ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus();
return processStatus == ProcessStatus.COMPLETED;
} else {
- executorService.execute(() -> workflowService.start(processName, operator, processForm));
+ EXECUTOR_SERVICE.execute(() -> workflowService.start(processName, operator, processForm));
return true;
}
}
@@ -235,7 +239,7 @@ public class InlongStreamProcessService {
return false;
}
} else {
- executorService.execute(() -> {
+ EXECUTOR_SERVICE.execute(() -> {
WorkflowResult workflowResult = workflowService.start(processName, operator, processForm);
ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus();
if (processStatus == ProcessStatus.COMPLETED) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 2afa702ca..633d46fca 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -504,7 +504,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
/**
* Update field information
- * <p/>First physically delete the existing field information, and then add the field information of this batch
+ * <p/>
+ * First physically delete the existing field information, and then add the field information of this batch
*/
@Transactional(rollbackFor = Throwable.class)
void updateField(String groupId, String streamId, List<StreamField> fieldList) {
@@ -525,7 +526,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
if (CollectionUtils.isEmpty(infoList)) {
return;
}
- infoList.stream().forEach(streamField -> streamField.setId(null));
+ infoList.forEach(streamField -> streamField.setId(null));
List<InlongStreamFieldEntity> list = CommonBeanUtils.copyListProperties(infoList,
InlongStreamFieldEntity::new);
for (InlongStreamFieldEntity entity : list) {
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/RestTemplateConfig.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/RestTemplateConfig.java
index 9306364bc..d07626cae 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/RestTemplateConfig.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/RestTemplateConfig.java
@@ -29,6 +29,7 @@ import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpContext;
+import org.apache.inlong.common.constant.ProtocolType;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -79,8 +80,8 @@ public class RestTemplateConfig {
public PoolingHttpClientConnectionManager httpClientConnectionManager() {
// Support HTTP, HTTPS
Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
- .register("http", PlainConnectionSocketFactory.getSocketFactory())
- .register("https", SSLConnectionSocketFactory.getSocketFactory())
+ .register(ProtocolType.HTTP, PlainConnectionSocketFactory.getSocketFactory())
+ .register(ProtocolType.HTTPS, SSLConnectionSocketFactory.getSocketFactory())
.build();
PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager(
registry);
diff --git a/inlong-manager/manager-test/src/main/resources/application-unit-test.properties b/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
index b02b1d211..862eba470 100644
--- a/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
+++ b/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
@@ -18,7 +18,7 @@
#
# Log level
-logging.level.root=INFO
+logging.level.root=info
logging.level.org.apache.inlong.manager=debug
spring.sql.init.platform=h2
@@ -31,7 +31,7 @@ spring.datasource.username=root
spring.datasource.password=""
# Audit configuration
-# Audit query source that decide what data source to query, currently only supports [MYSQL|ELASTICSEARCH]
+# Audit query source that decide what data source to query, currently only supports [MYSQL|ELASTICSEARCH|CLICKHOUSE]
audit.query.source=MYSQL
# Elasticsearch config
@@ -47,7 +47,7 @@ es.auth.user=admin
es.auth.password=inlong
# ClickHouse config
-# ClickHouse url
+# ClickHouse jdbcUrl
audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
# ClickHouse username
audit.ck.username=default
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/openapi/OpenAPIFilter.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/openapi/OpenAPIFilter.java
index 9eae25d0e..3b0f6d107 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/openapi/OpenAPIFilter.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/openapi/OpenAPIFilter.java
@@ -62,7 +62,7 @@ public class OpenAPIFilter implements Filter {
SecretToken token = parseBasicAuth(httpServletRequest);
subject.login(token);
} catch (Exception ex) {
- LOGGER.error("login error, msg: {}", ex.getMessage());
+ LOGGER.error("login error: {}", ex.getMessage());
((HttpServletResponse) servletResponse).sendError(HttpServletResponse.SC_FORBIDDEN, ex.getMessage());
return;
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/web/AuthenticationFilter.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/web/AuthenticationFilter.java
index 1e0b8c976..5408daefc 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/web/AuthenticationFilter.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/web/AuthenticationFilter.java
@@ -70,7 +70,7 @@ public class AuthenticationFilter implements Filter {
UsernamePasswordToken token = getPasswordToken(servletRequest);
subject.login(token);
} catch (Exception ex) {
- LOGGER.error("login error, msg: {}", ex.getMessage());
+ LOGGER.error("login error: {}", ex.getMessage());
((HttpServletResponse) servletResponse).sendError(HttpServletResponse.SC_FORBIDDEN, ex.getMessage());
return;
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/config/RestTemplateConfig.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/config/RestTemplateConfig.java
index 68022eef1..79380d0d8 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/config/RestTemplateConfig.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/config/RestTemplateConfig.java
@@ -17,8 +17,6 @@
package org.apache.inlong.manager.web.config;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
import lombok.Data;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
@@ -31,6 +29,7 @@ import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpContext;
+import org.apache.inlong.common.constant.ProtocolType;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -43,6 +42,9 @@ import org.springframework.util.ObjectUtils;
import org.springframework.web.client.DefaultResponseErrorHandler;
import org.springframework.web.client.RestTemplate;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
@Data
@Configuration
@ConditionalOnMissingBean(RestTemplate.class)
@@ -84,8 +86,8 @@ public class RestTemplateConfig {
public PoolingHttpClientConnectionManager httpClientConnectionManager() {
// Support HTTP, HTTPS
Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
- .register("http", PlainConnectionSocketFactory.getSocketFactory())
- .register("https", SSLConnectionSocketFactory.getSocketFactory())
+ .register(ProtocolType.HTTP, PlainConnectionSocketFactory.getSocketFactory())
+ .register(ProtocolType.HTTPS, SSLConnectionSocketFactory.getSocketFactory())
.build();
PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager(
registry);
diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index b11bfdf1f..dfac2d818 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -44,6 +44,7 @@ spring.datasource.druid.testOnReturn=false
spring.datasource.druid.filters=stat,wall
# Open the mergeSql function through the connectProperties property, Slow SQL records
spring.datasource.druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
+
# Exclude ElasticsearchRestClientAutoConfiguration
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration
@@ -64,7 +65,7 @@ es.auth.user=admin
es.auth.password=inlong
# ClickHouse config
-# ClickHouse url
+# ClickHouse jdbcUrl
audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
# ClickHouse username
audit.ck.username=default
diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 203124fc2..53dd907c1 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -32,7 +32,7 @@ spring.datasource.druid.initialSize=20
spring.datasource.druid.minIdle=20
spring.datasource.druid.maxActive=300
# Configure the timeout period to wait for the connection to be acquired
-spring.datasource.druid.maxWait=6000
+spring.datasource.druid.maxWait=600000
# Configure the minimum survival time of a connection in the pool, in milliseconds
spring.datasource.druid.minEvictableIdleTimeMillis=3600000
# Detect when applying for connection. It is recommended to configure it to true, which does not affect performance and ensures safety
@@ -45,6 +45,7 @@ spring.datasource.druid.testOnReturn=false
spring.datasource.druid.filters=stat,wall
# Open the mergeSql function through the connectProperties property, Slow SQL records
spring.datasource.druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
+
# Exclude ElasticsearchRestClientAutoConfiguration
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration
@@ -65,7 +66,7 @@ es.auth.user=admin
es.auth.password=inlong
# ClickHouse config
-# ClickHouse url
+# ClickHouse jdbcUrl
audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
# ClickHouse username
audit.ck.username=default
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java
index 581024506..2bb692925 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java
@@ -29,6 +29,11 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
/**
* Process event listener
*/
@@ -43,11 +48,11 @@ public interface ProcessEventListener extends EventListener<ProcessEvent> {
* Async process common thread pool
*/
ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
- 20,
- 40,
- 0L,
+ CORE_POOL_SIZE,
+ MAX_POOL_SIZE,
+ ALIVE_TIME_MS,
TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(),
+ new LinkedBlockingQueue<>(QUEUE_SIZE),
new ThreadFactoryBuilder().setNameFormat("inlong-workflow-%s").build(),
new CallerRunsPolicy());
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
index ac61cfb5b..d157e9ecb 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
@@ -105,21 +105,23 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
WorkflowContext.ActionContext actionContext = context.getActionContext();
if (actionContext == null) {
resetActionContext(context);
+ actionContext = context.getActionContext();
}
- WorkflowTaskEntity workflowTaskEntity = actionContext.getTaskEntity();
- Preconditions.checkTrue(ALLOW_COMPLETE_STATE.contains(TaskStatus.valueOf(workflowTaskEntity.getStatus())),
- "task status should allow complete");
+ WorkflowTaskEntity taskEntity = actionContext.getTaskEntity();
+ Preconditions.checkTrue(ALLOW_COMPLETE_STATE.contains(TaskStatus.valueOf(taskEntity.getStatus())),
+ String.format("task status %s not allowed to complete", taskEntity.getStatus()));
+
try {
ListenerResult listenerResult = this.taskEventNotifier.notify(TaskEvent.COMPLETE, context);
if (!listenerResult.isSuccess()) {
- failedTask(context, workflowTaskEntity);
+ failedTask(context, taskEntity);
} else {
- completeTaskEntity(context, workflowTaskEntity, TaskStatus.COMPLETED);
+ completeTaskEntity(context, taskEntity, TaskStatus.COMPLETED);
}
return listenerResult.isSuccess();
} catch (Exception e) {
- log.error("Complete service task failed", e);
- failedTask(context, workflowTaskEntity);
+ log.error("failed to complete service task: " + taskEntity, e);
+ failedTask(context, taskEntity);
return false;
}
}
@@ -139,12 +141,14 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
taskQuery.setProcessId(processId);
taskQuery.setName(serviceName);
List<WorkflowTaskEntity> taskEntities = taskEntityMapper.selectByQuery(taskQuery);
+
WorkflowTaskEntity taskEntity;
if (CollectionUtils.isEmpty(taskEntities)) {
taskEntity = saveTaskEntity(serviceTask, context);
} else {
taskEntity = taskEntities.get(0);
}
+
ActionContext actionContext = new WorkflowContext.ActionContext()
.setTask((WorkflowTask) context.getCurrentElement())
.setAction(WorkflowAction.COMPLETE)
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
index d3b26786e..2b98103a8 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
@@ -17,10 +17,10 @@
package org.apache.inlong.manager.workflow.processor;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.ProcessEvent;
import org.apache.inlong.manager.common.enums.ProcessStatus;
-import org.apache.inlong.manager.common.exceptions.JsonException;
+import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
@@ -40,11 +40,10 @@ import java.util.Date;
/**
* Start event handler
*/
+@Slf4j
@Service
public class StartEventProcessor extends AbstractNextableElementProcessor<StartEvent> {
- @Autowired
- private ObjectMapper objectMapper;
@Autowired
private ProcessEventNotifier processEventNotifier;
@Autowired
@@ -61,13 +60,16 @@ public class StartEventProcessor extends AbstractNextableElementProcessor<StartE
WorkflowProcess process = context.getProcess();
ProcessForm form = context.getProcessForm();
if (process.getFormClass() != null) {
- Preconditions.checkNotNull(form, "form cannot be null");
+ Preconditions.checkNotNull(form, "process form cannot be null");
Preconditions.checkTrue(form.getClass().isAssignableFrom(process.getFormClass()),
- "form type not match, should be class " + process.getFormClass());
+ String.format("form type %s should match the process form type %s",
+ form.getClass(), process.getFormClass()));
form.validate();
} else {
- Preconditions.checkNull(form, "no form required");
+ log.warn("not need to provide the form info");
+ context.setProcessForm(null);
}
+
WorkflowProcessEntity processEntity = saveProcessEntity(applicant, process, form);
context.setProcessEntity(processEntity);
context.setActionContext(new WorkflowContext.ActionContext().setAction(WorkflowAction.START));
@@ -91,18 +93,16 @@ public class StartEventProcessor extends AbstractNextableElementProcessor<StartE
processEntity.setDisplayName(process.getDisplayName());
processEntity.setType(process.getType());
processEntity.setTitle(form.getTitle());
+
processEntity.setInlongGroupId(form.getInlongGroupId());
if (form instanceof StreamResourceProcessForm) {
StreamResourceProcessForm streamForm = (StreamResourceProcessForm) form;
processEntity.setInlongStreamId(streamForm.getStreamInfo().getInlongStreamId());
}
+
processEntity.setApplicant(applicant);
processEntity.setStatus(ProcessStatus.PROCESSING.name());
- try {
- processEntity.setFormData(objectMapper.writeValueAsString(form));
- } catch (Exception e) {
- throw new JsonException("write form to json error: ", e);
- }
+ processEntity.setFormData(JsonUtils.toJsonString(form));
processEntity.setStartTime(new Date());
processEntity.setHidden(process.getHidden());