You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/21 11:32:13 UTC
[incubator-inlong] branch master updated: [INLONG-3264][Manager] Fix deadlock in stream source (#3265)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ba7e159 [INLONG-3264][Manager] Fix deadlock in stream source (#3265)
ba7e159 is described below
commit ba7e159d8c40f3dc4210f265814af72382175238
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Mon Mar 21 19:32:08 2022 +0800
[INLONG-3264][Manager] Fix deadlock in stream source (#3265)
---
.../inlong/manager/service/core/AgentService.java | 12 +-
.../service/core/impl/AgentServiceImpl.java | 129 ++++++++++++---------
.../service/source/StreamSourceServiceImpl.java | 27 +++--
.../manager-web/sql/apache_inlong_manager.sql | 3 +-
.../web/controller/openapi/AgentController.java | 3 +-
5 files changed, 104 insertions(+), 70 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
index 35130a6..f118187 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
@@ -43,12 +43,20 @@ public interface AgentService {
Boolean reportSnapshot(TaskSnapshotRequest request);
/**
- * Agent report the task result, and pull task config to operate.
+ * Agent report the task result.
*
* @param request Request of the task result.
* @return Task result.
*/
- TaskResult reportAndGetTask(TaskRequest request);
+ void report(TaskRequest request);
+
+ /**
+ * Pull task config to operate.
+ *
+ * @param request Request of the task result.
+ * @return Task result.
+ */
+ TaskResult getTaskResult(TaskRequest request);
@Deprecated
FileAgentTaskInfo getFileAgentTask(FileAgentCommandInfo info);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 0f4691d..c6c7c68 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.db.CommandEntity;
import org.apache.inlong.common.pojo.agent.CmdConfig;
@@ -40,6 +41,7 @@ import org.apache.inlong.manager.common.pojo.agent.FileAgentCommandInfo;
import org.apache.inlong.manager.common.pojo.agent.FileAgentCommandInfo.CommandInfoBean;
import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskConfig;
import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskInfo;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.DataSourceCmdConfigEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
@@ -59,6 +61,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
@@ -110,84 +113,92 @@ public class AgentServiceImpl implements AgentService {
}
@Override
- @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
- public TaskResult reportAndGetTask(TaskRequest request) {
+ @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED,
+ propagation = Propagation.REQUIRES_NEW)
+ public void report(TaskRequest request) {
LOGGER.info("begin to get agent task: {}", request);
- if (request == null || request.getAgentIp() == null) {
+ if (request == null) {
LOGGER.warn("agent request was empty, just return");
- return null;
+ return;
}
-
- this.updateTaskStatus(request);
-
- return this.getTaskResult(request);
- }
-
- /**
- * Update the task status by the request
- */
- private void updateTaskStatus(TaskRequest request) {
+ Preconditions.checkNotEmpty(request.getAgentIp(),
+ String.format("AgentIp should not be null in request=%s", request));
if (CollectionUtils.isEmpty(request.getCommandInfo())) {
LOGGER.warn("task result was empty, just return");
return;
}
-
for (CommandEntity command : request.getCommandInfo()) {
- Integer taskId = command.getTaskId();
- StreamSourceEntity current = sourceMapper.selectByIdForUpdate(taskId);
- if (current == null) {
- continue;
- }
+ updateCommandEntity(command);
+ // Other tasks with status 20x will change to 30x in next getTaskResult method
+ }
+ }
- LocalDateTime localDateTime = LocalDateTime.parse(command.getDeliveryTime(), TIME_FORMATTER);
- Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
- if (current.getModifyTime().getTime() - instant.toEpochMilli() > maxModifyTime) {
- LOGGER.warn("task {} receive result delay more than {} ms, skip it", taskId, maxModifyTime);
- continue;
- }
+ public void updateCommandEntity(CommandEntity command) {
+ Integer taskId = command.getTaskId();
+ StreamSourceEntity current = sourceMapper.selectByIdForUpdate(taskId);
+ if (current == null) {
+ return;
+ }
- int result = command.getCommandResult();
- int previousStatus = current.getStatus();
- int nextStatus = SourceState.SOURCE_NORMAL.getCode();
- // Change the status from 30x to normal / disable / frozen
- if (previousStatus / MODULUS_100 == ISSUED_STATUS) {
- if (Constants.RESULT_SUCCESS == result) {
- if (SourceState.TEMP_TO_NORMAL.contains(previousStatus)) {
- nextStatus = SourceState.SOURCE_NORMAL.getCode();
- } else if (SourceState.BEEN_ISSUED_DELETE.getCode() == previousStatus) {
- nextStatus = SourceState.SOURCE_DISABLE.getCode();
- } else if (SourceState.BEEN_ISSUED_FROZEN.getCode() == previousStatus) {
- nextStatus = SourceState.SOURCE_FROZEN.getCode();
- }
- } else if (Constants.RESULT_FAIL == result) {
- nextStatus = SourceState.SOURCE_FAILED.getCode();
- }
+ LocalDateTime localDateTime = LocalDateTime.parse(command.getDeliveryTime(), TIME_FORMATTER);
+ Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
+ if (current.getModifyTime().getTime() - instant.toEpochMilli() > maxModifyTime) {
+ LOGGER.warn("task {} receive result delay more than {} ms, skip it", taskId, maxModifyTime);
+ return;
+ }
- sourceMapper.updateStatus(taskId, nextStatus, current.getModifyTime());
- LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, taskId);
+ int result = command.getCommandResult();
+ int previousStatus = current.getStatus();
+ int nextStatus = SourceState.SOURCE_NORMAL.getCode();
+ // Change the status from 30x to normal / disable / frozen
+ if (previousStatus / MODULUS_100 == ISSUED_STATUS) {
+ if (Constants.RESULT_SUCCESS == result) {
+ if (SourceState.TEMP_TO_NORMAL.contains(previousStatus)) {
+ nextStatus = SourceState.SOURCE_NORMAL.getCode();
+ } else if (SourceState.BEEN_ISSUED_DELETE.getCode() == previousStatus) {
+ nextStatus = SourceState.SOURCE_DISABLE.getCode();
+ } else if (SourceState.BEEN_ISSUED_FROZEN.getCode() == previousStatus) {
+ nextStatus = SourceState.SOURCE_FROZEN.getCode();
+ }
+ } else if (Constants.RESULT_FAIL == result) {
+ nextStatus = SourceState.SOURCE_FAILED.getCode();
}
- // Other tasks with status 20x will change to 30x in next getTaskResult method
+
+ sourceMapper.updateStatus(taskId, nextStatus, current.getModifyTime());
+ LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, taskId);
}
}
/**
* Get task result by the request
*/
- private TaskResult getTaskResult(TaskRequest request) {
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED,
+ propagation = Propagation.REQUIRES_NEW)
+ public TaskResult getTaskResult(TaskRequest request) {
+ if (request == null) {
+ LOGGER.warn("agent request was empty, just return");
+ return null;
+ }
+ Preconditions.checkNotEmpty(request.getAgentIp(),
+ String.format("AgentIp should not be null in request=%s", request));
// Query the tasks that needed to add or active - without agentIp and uuid
List<Integer> addedStatusList = Arrays.asList(SourceState.TO_BE_ISSUED_ADD.getCode(),
SourceState.TO_BE_ISSUED_ACTIVE.getCode());
- List<StreamSourceEntity> addList = sourceMapper.selectByStatusForUpdate(addedStatusList);
+ List<StreamSourceEntity> entityList = sourceMapper.selectByStatusForUpdate(addedStatusList);
- // Query other tasks by agentIp and uuid - not included status with TO_BE_ISSUED_ADD and TO_BE_ISSUED_ACTIVE
- List<Integer> statusList = Arrays.asList(SourceState.TO_BE_ISSUED_DELETE.getCode(),
- SourceState.TO_BE_ISSUED_RETRY.getCode(), SourceState.TO_BE_ISSUED_BACKTRACK.getCode(),
- SourceState.TO_BE_ISSUED_FROZEN.getCode(), SourceState.TO_BE_ISSUED_CHECK.getCode(),
- SourceState.TO_BE_ISSUED_REDO_METRIC.getCode(), SourceState.TO_BE_ISSUED_MAKEUP.getCode());
String agentIp = request.getAgentIp();
String uuid = request.getUuid();
- List<StreamSourceEntity> entityList = sourceMapper.selectByStatusAndIpForUpdate(statusList, agentIp, uuid);
- entityList.addAll(addList);
+ if (StringUtils.isNotEmpty(agentIp)) {
+ // Query other tasks by agentIp and uuid - not included status with TO_BE_ISSUED_ADD and TO_BE_ISSUED_ACTIVE
+ List<Integer> statusList = Arrays.asList(SourceState.TO_BE_ISSUED_DELETE.getCode(),
+ SourceState.TO_BE_ISSUED_RETRY.getCode(), SourceState.TO_BE_ISSUED_BACKTRACK.getCode(),
+ SourceState.TO_BE_ISSUED_FROZEN.getCode(), SourceState.TO_BE_ISSUED_CHECK.getCode(),
+ SourceState.TO_BE_ISSUED_REDO_METRIC.getCode(), SourceState.TO_BE_ISSUED_MAKEUP.getCode());
+ List<StreamSourceEntity> agentAddList = sourceMapper.selectByStatusAndIpForUpdate(statusList, agentIp,
+ uuid);
+ entityList.addAll(agentAddList);
+ }
List<DataConfig> dataConfigs = Lists.newArrayList();
for (StreamSourceEntity entity : entityList) {
@@ -211,9 +222,13 @@ public class AgentServiceImpl implements AgentService {
List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
// Update agentIp and uuid for the added and active tasks
- for (StreamSourceEntity entity : addList) {
- sourceMapper.updateIpAndUuid(entity.getId(), agentIp, uuid, entity.getModifyTime());
- LOGGER.info("update stream source ip to [{}], uuid to [{}] for id [{}] ", agentIp, uuid, entity.getId());
+ for (StreamSourceEntity entity : entityList) {
+ if (StringUtils.isNotEmpty(agentIp)
+ && StringUtils.isEmpty(entity.getAgentIp())) {
+ sourceMapper.updateIpAndUuid(entity.getId(), agentIp, uuid, entity.getModifyTime());
+ LOGGER.info("update stream source ip to [{}], uuid to [{}] for id [{}] ", agentIp, uuid,
+ entity.getId());
+ }
}
return TaskResult.builder().dataConfigs(dataConfigs).cmdConfigs(cmdConfigs).build();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index b6ec0f1..3fa64ad 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
@@ -68,7 +69,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
private CommonOperateService commonOperateService;
@Override
- @Transactional(rollbackFor = Throwable.class)
+ @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
public Integer save(SourceRequest request, String operator) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("begin to save source info=" + request);
@@ -149,7 +150,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
}
@Override
- @Transactional(rollbackFor = Throwable.class)
+ @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
+ isolation = Isolation.READ_COMMITTED)
public boolean update(SourceRequest request, String operator) {
LOGGER.debug("begin to update source info=" + request);
this.checkParams(request);
@@ -168,7 +170,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
}
@Override
- @Transactional(rollbackFor = Throwable.class)
+ @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
+ isolation = Isolation.READ_COMMITTED)
public boolean updateStatus(Integer id, Integer targetStatus, String operator) {
sourceMapper.updateStatus(id, targetStatus, null);
LOGGER.info("success to update source status={} for id={} by {}", targetStatus, id, operator);
@@ -176,7 +179,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
}
@Override
- @Transactional(rollbackFor = Throwable.class)
+ @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
+ isolation = Isolation.READ_COMMITTED)
public boolean updateStatus(String groupId, String streamId, Integer targetStatus, String operator) {
sourceMapper.updateStatusByRelatedId(groupId, streamId, targetStatus);
LOGGER.info("success to update source status={} for groupId={}, streamId={} by {}",
@@ -185,7 +189,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
}
@Override
- @Transactional(rollbackFor = Throwable.class)
+ @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
+ isolation = Isolation.READ_COMMITTED)
public boolean delete(Integer id, String sourceType, String operator) {
LOGGER.info("begin to delete source by id={}, sourceType={}", id, sourceType);
Preconditions.checkNotNull(id, Constant.ID_IS_EMPTY);
@@ -204,7 +209,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
}
@Override
- @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
+ @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
+ isolation = Isolation.READ_COMMITTED)
public boolean restart(Integer id, String sourceType, String operator) {
LOGGER.info("begin to restart source by id={}, sourceType={}", id, sourceType);
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
@@ -221,7 +227,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
}
@Override
- @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
+ @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
+ isolation = Isolation.READ_COMMITTED)
public boolean stop(Integer id, String sourceType, String operator) {
LOGGER.info("begin to stop source by id={}, sourceType={}", id, sourceType);
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
@@ -238,7 +245,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
}
@Override
- @Transactional(rollbackFor = Throwable.class)
+ @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
+ isolation = Isolation.READ_COMMITTED)
public boolean logicDeleteAll(String groupId, String streamId, String operator) {
LOGGER.info("begin to logic delete all source info by groupId={}, streamId={}", groupId, streamId);
Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
@@ -272,7 +280,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
}
@Override
- @Transactional(rollbackFor = Throwable.class)
+ @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
+ isolation = Isolation.READ_COMMITTED)
public boolean deleteAll(String groupId, String streamId, String operator) {
LOGGER.info("begin to delete all source by groupId={}, streamId={}", groupId, streamId);
Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index f6fb2d5..fc2ee52 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -611,7 +611,8 @@ CREATE TABLE `stream_source`
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`)
+ UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`),
+ KEY `status` (`status`,`is_deleted`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source table';
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
index 3599da6..0744bfe 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
@@ -73,7 +73,8 @@ public class AgentController {
@PostMapping("/reportAndGetTask")
@ApiOperation(value = "Report source task snapshot")
public Response<TaskResult> reportAndGetTask(@RequestBody TaskRequest request) {
- return Response.success(agentService.reportAndGetTask(request));
+ agentService.report(request);
+ return Response.success(agentService.getTaskResult(request));
}
@Deprecated