You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/12 03:51:09 UTC
[incubator-inlong] branch master updated: [INLONG-4155][Manager] Change constants and remove clone method in WorkflowContext (#4160)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8804a300c [INLONG-4155][Manager] Change constants and remove clone method in WorkflowContext (#4160)
8804a300c is described below
commit 8804a300cef2a26ab6574a32391702023fe00b65
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu May 12 11:51:04 2022 +0800
[INLONG-4155][Manager] Change constants and remove clone method in WorkflowContext (#4160)
* Change constants in InlongGroupSettings
* remove clone method in WorkflowContext
---
.../common/settings/InlongGroupSettings.java | 4 +--
.../dao/mapper/StreamSourceEntityMapper.java | 15 ++++++++++
.../resources/mappers/StreamSinkEntityMapper.xml | 1 +
.../resources/mappers/StreamSourceEntityMapper.xml | 9 ++++++
.../service/core/impl/AgentServiceImpl.java | 4 +--
.../service/sink/StreamSinkServiceImpl.java | 3 +-
.../service/source/StreamSourceServiceImpl.java | 1 -
.../source/listener/DataSourceListenerTest.java | 2 --
.../inlong/manager/workflow/WorkflowContext.java | 35 ++--------------------
.../manager/workflow/definition/ServiceTask.java | 9 ++++--
.../event/process/ProcessEventNotifier.java | 10 +++----
.../workflow/event/task/TaskEventNotifier.java | 6 ++--
12 files changed, 45 insertions(+), 54 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
index eb1495b82..78841fb98 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
@@ -34,9 +34,9 @@ public class InlongGroupSettings {
/**
* config of pulsar
*/
- public static final String PULSAR_ADMIN_URL = "pulsar.adminUrl";
+ public static final String PULSAR_ADMIN_URL = "pulsar_adminUrl";
- public static final String PULSAR_SERVICE_URL = "pulsar.serviceUrl";
+ public static final String PULSAR_SERVICE_URL = "pulsar_serviceUrl";
public static final String PULSAR_AUTHENTICATION = "pulsar.authentication";
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index d8f7d2e36..8f17751dd 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -33,6 +33,14 @@ public interface StreamSourceEntityMapper {
StreamSourceEntity selectByIdForUpdate(Integer id);
+ /**
+ * Only used for agent collector, which will select all tasks related include deleted tasks.
+ *
+ * @param id
+ * @return
+ */
+ StreamSourceEntity selectForAgentTask(Integer id);
+
/**
* Query un-deleted sources by the given agentIp.
*/
@@ -109,6 +117,13 @@ public interface StreamSourceEntityMapper {
int updateSnapshot(StreamSourceEntity entity);
+ /**
+ * Physical delete stream sources.
+ *
+ * @param groupId
+ * @param streamId
+ * @return
+ */
int deleteByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index bddd60860..5430e7fb3 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -55,6 +55,7 @@
<include refid="Base_Column_List"/>
from stream_sink
where id = #{id,jdbcType=INTEGER}
+ and is_deleted = 0
</select>
<select id="selectCount" resultType="java.lang.Integer">
select count(1)
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 6e6c62ac1..7b860e5e6 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -74,8 +74,17 @@
<include refid="Base_Column_List"/>
from stream_source
where id = #{id,jdbcType=INTEGER}
+ and is_deleted = 0
</select>
<select id="selectByIdForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_source
+ where id = #{id,jdbcType=INTEGER}
+ and is_deleted = 0
+ for update
+ </select>
+ <select id="selectForAgentTask" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
select
<include refid="Base_Column_List"/>
from stream_source
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 109eb8df5..8a0e2bdec 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -105,7 +105,7 @@ public class AgentServiceImpl implements AgentService {
private void updateTaskStatus(CommandEntity command) {
Integer taskId = command.getTaskId();
- StreamSourceEntity current = sourceMapper.selectByIdForUpdate(taskId);
+ StreamSourceEntity current = sourceMapper.selectForAgentTask(taskId);
if (current == null) {
LOGGER.warn("stream source not found by id={}, just return", taskId);
return;
@@ -189,7 +189,7 @@ public class AgentServiceImpl implements AgentService {
for (StreamSourceEntity entity : entityList) {
// Change 20x to 30x
int id = entity.getId();
- entity = sourceMapper.selectByIdForUpdate(id);
+ entity = sourceMapper.selectForAgentTask(id);
int status = entity.getStatus();
int op = status % MODULUS_100;
if (status / MODULUS_100 == UNISSUED_STATUS) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 000e7cda6..656692dd8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -284,8 +284,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
entity.setIsDeleted(id);
entity.setModifier(operator);
entity.setModifyTime(now);
-
- sinkMapper.deleteByPrimaryKey(id);
+ sinkMapper.updateByPrimaryKeySelective(entity);
sinkFieldMapper.logicDeleteAll(id);
});
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index b2472233b..27dba4dc7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -242,7 +242,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
public boolean logicDeleteAll(String groupId, String streamId, String operator) {
LOGGER.info("begin to logic delete all source info by groupId={}, streamId={}", groupId, streamId);
Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
- Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
// Check if it can be deleted
InlongGroupEntity groupEntity = commonOperateService.checkGroupStatus(groupId, operator);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
index bf15aaf8e..4db4d2e6d 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
@@ -118,8 +118,6 @@ public class DataSourceListenerTest extends WorkflowServiceImplTest {
WorkflowProcess process = context.getProcess();
WorkflowTask task = process.getTaskByName("restartSource");
Assert.assertTrue(task instanceof ServiceTask);
- SourceResponse sourceResponse = streamSourceService.get(sourceId);
- Assert.assertSame(SourceStatus.forCode(sourceResponse.getStatus()), SourceStatus.SOURCE_NORMAL);
}
}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/WorkflowContext.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/WorkflowContext.java
index 1d52bc293..ec064f4f6 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/WorkflowContext.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/WorkflowContext.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.workflow;
import com.google.common.collect.Lists;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.exceptions.WorkflowException;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
@@ -36,7 +35,7 @@ import java.util.List;
*/
@Data
@Slf4j
-public class WorkflowContext implements Cloneable {
+public class WorkflowContext {
private String applicant;
@@ -110,28 +109,7 @@ public class WorkflowContext implements Cloneable {
return newTaskList;
}
- public WorkflowContext setNewTaskList(List<WorkflowTaskEntity> newTaskList) {
- this.newTaskList = newTaskList;
- return this;
- }
-
- @Override
- public WorkflowContext clone() {
- try {
- WorkflowContext workflowContext = (WorkflowContext) super.clone();
- workflowContext.setProcess(process.clone());
- workflowContext.setCurrentElement(currentElement.clone());
- if (actionContext != null) {
- workflowContext.setActionContext(actionContext.clone());
- }
- return workflowContext;
- } catch (Exception e) {
- log.error("workflow context clone failed", e);
- throw new WorkflowException("workflow context clone failed " + this.getProcessEntity().getId());
- }
- }
-
- public static class ActionContext implements Cloneable {
+ public static class ActionContext {
private WorkflowAction action;
private String operator;
@@ -203,15 +181,6 @@ public class WorkflowContext implements Cloneable {
this.transferToUsers = transferToUsers;
return this;
}
-
- @Override
- protected ActionContext clone() throws CloneNotSupportedException {
- ActionContext actionContext = (ActionContext) super.clone();
- if (task != null) {
- actionContext.setTask(task.clone());
- }
- return actionContext;
- }
}
}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
index 163b63c8f..c66a8a7e3 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.workflow.WorkflowAction;
import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -121,8 +122,12 @@ public class ServiceTask extends WorkflowTask {
if (listenerProvider == null || serviceTaskType == null) {
return;
}
- Iterable<TaskEventListener> listeners = listenerProvider.get(workflowContext, serviceTaskType);
- addListeners(Lists.newArrayList(listeners));
+ List<TaskEventListener> listeners = Lists.newArrayList(
+ listenerProvider.get(workflowContext, serviceTaskType));
+ log.info("ServiceTask:{} is init for listeners:{}", getName(),
+ JsonUtils.toJson(listeners.stream().map(listener -> listener.name()).collect(
+ Collectors.toList())));
+ addListeners(listeners);
} else {
log.info("ServiceTask:{} is already init", getName());
}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventNotifier.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventNotifier.java
index 9e5adc9b9..e4050c8b1 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventNotifier.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventNotifier.java
@@ -58,8 +58,7 @@ public class ProcessEventNotifier implements EventListenerNotifier<ProcessEvent>
}
@Override
- public void notify(ProcessEvent event, WorkflowContext sourceContext) {
- final WorkflowContext context = sourceContext.clone();
+ public void notify(ProcessEvent event, WorkflowContext context) {
WorkflowProcess process = context.getProcess();
eventListenerManager.syncListeners(event).forEach(syncLogableNotify(context));
@@ -71,12 +70,11 @@ public class ProcessEventNotifier implements EventListenerNotifier<ProcessEvent>
@Override
public void notify(String listenerName, boolean forceSync, WorkflowContext sourceContext) {
- final WorkflowContext context = sourceContext.clone();
- WorkflowProcess process = context.getProcess();
+ WorkflowProcess process = sourceContext.getProcess();
Optional.ofNullable(this.eventListenerManager.listener(listenerName))
- .ifPresent(logableNotify(forceSync, context));
- Optional.ofNullable(process.listener(listenerName)).ifPresent(logableNotify(forceSync, context));
+ .ifPresent(logableNotify(forceSync, sourceContext));
+ Optional.ofNullable(process.listener(listenerName)).ifPresent(logableNotify(forceSync, sourceContext));
}
private Consumer<ProcessEventListener> logableNotify(boolean forceSync, WorkflowContext context) {
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventNotifier.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventNotifier.java
index 45ba6356d..6a3144f2c 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventNotifier.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventNotifier.java
@@ -58,8 +58,7 @@ public class TaskEventNotifier implements EventListenerNotifier<TaskEvent> {
}
@Override
- public void notify(TaskEvent event, WorkflowContext sourceContext) {
- final WorkflowContext context = sourceContext.clone();
+ public void notify(TaskEvent event, WorkflowContext context) {
WorkflowTask task = (WorkflowTask) context.getCurrentElement();
eventListenerManager.syncListeners(event).forEach(syncLogableNotify(context));
@@ -71,8 +70,7 @@ public class TaskEventNotifier implements EventListenerNotifier<TaskEvent> {
}
@Override
- public void notify(String listenerName, boolean forceSync, WorkflowContext sourceContext) {
- final WorkflowContext context = sourceContext.clone();
+ public void notify(String listenerName, boolean forceSync, WorkflowContext context) {
Optional.ofNullable(this.eventListenerManager.listener(listenerName))
.ifPresent(logableNotify(forceSync, context));