You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/02 06:14:47 UTC
[inlong] branch master updated: [INLONG-5611][Manager] Set the sink status after updating the sink info (#5758)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c7d35a3f2 [INLONG-5611][Manager] Set the sink status after updating the sink info (#5758)
c7d35a3f2 is described below
commit c7d35a3f24ad7ca9097432c1c3429ad1ef13f6a2
Author: ciscozhou <45...@users.noreply.github.com>
AuthorDate: Fri Sep 2 14:14:42 2022 +0800
[INLONG-5611][Manager] Set the sink status after updating the sink info (#5758)
---
.../inlong/manager/common/enums/StreamStatus.java | 21 ++++++++-----
.../manager/dao/mapper/StreamSinkEntityMapper.java | 4 +--
.../resources/mappers/StreamSinkEntityMapper.xml | 33 ++------------------
.../mappers/StreamSinkFieldEntityMapper.xml | 4 +--
.../sink/ck/ClickHouseResourceOperator.java | 12 ++++----
.../manager/service/sink/AbstractSinkOperator.java | 10 +++---
.../manager/service/sink/StreamSinkOperator.java | 4 ++-
.../service/sink/StreamSinkServiceImpl.java | 36 +++++++++++++---------
.../service/stream/InlongStreamProcessService.java | 26 ++++++++--------
9 files changed, 67 insertions(+), 83 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java
index 151922eee..a522e7481 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java
@@ -24,7 +24,6 @@ public enum StreamStatus {
DRAFT(0, "draft"),
- // Stream related status
NEW(100, "new"),
CONFIG_ING(110, "in configure"),
CONFIG_FAILED(120, "configuration failed"),
@@ -47,12 +46,12 @@ public enum StreamStatus {
this.description = description;
}
- public Integer getCode() {
- return code;
- }
-
- public String getDescription() {
- return description;
+ /**
+ * Checks whether the given status allows the update.
+ */
+ public static boolean notAllowedUpdate(StreamStatus status) {
+ return status == StreamStatus.CONFIG_ING || status == StreamStatus.SUSPENDING
+ || status == StreamStatus.RESTARTING || status == StreamStatus.DELETING;
}
public static StreamStatus forCode(int code) {
@@ -64,4 +63,12 @@ public enum StreamStatus {
throw new IllegalStateException(String.format("Illegal code=%s for StreamStatus", code));
}
+ public Integer getCode() {
+ return code;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
index 111197bbe..9999371ff 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
@@ -129,9 +129,7 @@ public interface StreamSinkEntityMapper {
*/
List<SortSourceStreamInfo> selectAllStreams();
- int updateByPrimaryKeySelective(StreamSinkEntity record);
-
- int updateByPrimaryKey(StreamSinkEntity record);
+ int updateByIdSelective(StreamSinkEntity record);
int updateStatus(StreamSinkEntity entity);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index daacb78d2..bd9b72330 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -370,8 +370,7 @@
where is_deleted = 0
</select>
- <update id="updateByPrimaryKeySelective"
- parameterType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
+ <update id="updateByIdSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
update stream_sink
<set>
<if test="inlongGroupId != null">
@@ -411,17 +410,12 @@
operate_log = #{operateLog,jdbcType=VARCHAR},
</if>
<if test="status != null">
+ previous_status = status,
status = #{status,jdbcType=INTEGER},
</if>
- <if test="previousStatus != null">
- previous_status = #{previousStatus,jdbcType=INTEGER},
- </if>
<if test="isDeleted != null">
is_deleted = #{isDeleted,jdbcType=INTEGER},
</if>
- <if test="creator != null">
- creator = #{creator,jdbcType=VARCHAR},
- </if>
<if test="modifier != null">
modifier = #{modifier,jdbcType=VARCHAR},
</if>
@@ -430,29 +424,6 @@
where id = #{id,jdbcType=INTEGER}
and version = #{version,jdbcType=INTEGER}
</update>
- <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
- update stream_sink
- set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
- sink_type = #{sinkType,jdbcType=VARCHAR},
- sink_name = #{sinkName,jdbcType=VARCHAR},
- description = #{description,jdbcType=VARCHAR},
- enable_create_resource = #{enableCreateResource,jdbcType=TINYINT},
- inlong_cluster_name = #{inlongClusterName,jdbcType=VARCHAR},
- data_node_name = #{dataNodeName,jdbcType=VARCHAR},
- sort_task_name = #{sortTaskName,jdbcType=VARCHAR},
- sort_consumer_group = #{sortConsumerGroup,jdbcType=VARCHAR},
- ext_params = #{extParams,jdbcType=VARCHAR},
- operate_log = #{operateLog,jdbcType=VARCHAR},
- status = #{status,jdbcType=INTEGER},
- previous_status = #{previousStatus,jdbcType=INTEGER},
- is_deleted = #{isDeleted,jdbcType=INTEGER},
- creator = #{creator,jdbcType=VARCHAR},
- modifier = #{modifier,jdbcType=VARCHAR},
- version = #{version,jdbcType=INTEGER} + 1
- where id = #{id,jdbcType=INTEGER}
- and version = #{version,jdbcType=INTEGER}
- </update>
<update id="updateStatus" parameterType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
update stream_sink
set status = #{status,jdbcType=INTEGER},
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
index ece761b77..052b46556 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
@@ -69,7 +69,7 @@
</insert>
<insert id="insertAll">
insert into stream_sink_field (
- id, inlong_group_id,
+ inlong_group_id,
inlong_stream_id, sink_id,
sink_type, field_name,
field_type, field_comment,
@@ -82,7 +82,7 @@
values
<foreach collection="list" index="index" item="item" separator=",">
(
- #{item.id,jdbcType=INTEGER}, #{item.inlongGroupId,jdbcType=VARCHAR},
+ #{item.inlongGroupId,jdbcType=VARCHAR},
#{item.inlongStreamId,jdbcType=VARCHAR}, #{item.sinkId,jdbcType=INTEGER},
#{item.sinkType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
#{item.fieldType,jdbcType=VARCHAR}, #{item.fieldComment,jdbcType=VARCHAR},
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
index e4d4ea84f..f8aa955e4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
@@ -19,15 +19,15 @@ package org.apache.inlong.manager.service.resource.sink.ck;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
-import org.apache.inlong.manager.pojo.sink.ck.ClickHouseColumnInfo;
-import org.apache.inlong.manager.pojo.sink.ck.ClickHouseTableInfo;
-import org.apache.inlong.manager.pojo.sink.SinkInfo;
-import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSinkDTO;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.sink.ck.ClickHouseColumnInfo;
+import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSinkDTO;
+import org.apache.inlong.manager.pojo.sink.ck.ClickHouseTableInfo;
import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.slf4j.Logger;
@@ -135,7 +135,7 @@ public class ClickHouseResourceOperator implements SinkResourceOperator {
throw new WorkflowException(errMsg);
}
- LOGGER.info("success create ClickHouse table for data sind [" + sinkInfo.getId() + "]");
+ LOGGER.info("success create ClickHouse table for sink id [" + sinkInfo.getId() + "]");
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index ac227ca56..e4db70a34 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -110,7 +110,7 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
}
@Override
- public void updateOpt(SinkRequest request, String operator) {
+ public void updateOpt(SinkRequest request, SinkStatus nextStatus, String operator) {
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
@@ -123,9 +123,11 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
CommonBeanUtils.copyProperties(request, entity, true);
setTargetEntity(request, entity);
entity.setPreviousStatus(entity.getStatus());
- entity.setStatus(SinkStatus.CONFIG_ING.getCode());
+ if (nextStatus != null) {
+ entity.setStatus(nextStatus.getCode());
+ }
entity.setModifier(operator);
- int rowCount = sinkMapper.updateByPrimaryKeySelective(entity);
+ int rowCount = sinkMapper.updateByIdSelective(entity);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
LOGGER.error(errMsg);
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
@@ -199,7 +201,7 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
entity.setStatus(InlongConstants.DELETED_STATUS);
entity.setIsDeleted(entity.getId());
entity.setModifier(operator);
- int rowCount = sinkMapper.updateByPrimaryKeySelective(entity);
+ int rowCount = sinkMapper.updateByIdSelective(entity);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}",
entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSinkName(), entity.getVersion());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
index e23b6cef2..71ed90842 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.sink;
import com.github.pagehelper.Page;
+import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.sink.SinkField;
@@ -74,9 +75,10 @@ public interface StreamSinkOperator {
* Update the sink info.
*
* @param request sink info needs to update
+ * @param nextStatus next status
* @param operator name of the operator
*/
- void updateOpt(SinkRequest request, String operator);
+ void updateOpt(SinkRequest request, SinkStatus nextStatus, String operator);
/**
* Update the sink fields.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 2c537e848..8360dc4df 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -25,12 +25,13 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
@@ -74,6 +75,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Autowired
private GroupCheckService groupCheckService;
@Autowired
+ private InlongStreamEntityMapper streamMapper;
+ @Autowired
private StreamSinkEntityMapper sinkMapper;
@Autowired
private StreamSinkFieldEntityMapper sinkFieldMapper;
@@ -212,7 +215,11 @@ public class StreamSinkServiceImpl implements StreamSinkService {
String groupId = request.getInlongGroupId();
String streamId = request.getInlongStreamId();
String sinkName = request.getSinkName();
- final InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);
+ groupCheckService.checkGroupStatus(groupId, operator);
+
+ // Check whether the stream exist or not
+ InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+ Preconditions.checkNotNull(streamEntity, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
// Check whether the sink name exists with the same groupId and streamId
List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
@@ -223,24 +230,23 @@ public class StreamSinkServiceImpl implements StreamSinkService {
throw new BusinessException(String.format(err, sinkName, groupId, streamId));
}
}
- List<SinkField> fields = request.getSinkFieldList();
- // Remove id in sinkField when save
- if (CollectionUtils.isNotEmpty(fields)) {
- fields.forEach(sinkField -> sinkField.setId(null));
- }
+ SinkStatus nextStatus = null;
+ boolean streamSuccess = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
+ if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
+ nextStatus = SinkStatus.CONFIG_ING;
+ }
StreamSinkOperator sinkOperator = operatorFactory.getInstance(request.getSinkType());
- sinkOperator.updateOpt(request, operator);
+ sinkOperator.updateOpt(request, nextStatus, operator);
- // The inlong group status is [Configuration successful], then asynchronously initiate
- // the [Single inlong stream resource creation] workflow
- if (GroupStatus.CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
+ // If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the [CREATE_STREAM_RESOURCE] process
+ if (streamSuccess) {
// To work around the circular reference check we manually instantiate and wire
if (streamProcessOperation == null) {
streamProcessOperation = new InlongStreamProcessService();
autowireCapableBeanFactory.autowireBean(streamProcessOperation);
}
- streamProcessOperation.startProcess(groupId, streamId, operator, true);
+ streamProcessOperation.startProcess(groupId, streamId, operator, false);
}
LOGGER.info("success to update sink info: {}", request);
return true;
@@ -289,7 +295,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
entity.setStatus(InlongConstants.DELETED_STATUS);
entity.setIsDeleted(id);
entity.setModifier(operator);
- int rowCount = sinkMapper.updateByPrimaryKeySelective(entity);
+ int rowCount = sinkMapper.updateByIdSelective(entity);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}",
entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSinkName(),
@@ -367,7 +373,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
entity.setPreviousStatus(entity.getStatus());
entity.setStatus(status);
entity.setModifier(operator);
- int rowCount = sinkMapper.updateByPrimaryKeySelective(entity);
+ int rowCount = sinkMapper.updateByIdSelective(entity);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}",
entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSinkName(),
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
index b95fdbbc6..06046eceb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
@@ -70,30 +71,27 @@ public class InlongStreamProcessService {
public boolean startProcess(String groupId, String streamId, String operator, boolean sync) {
log.info("begin to start stream process for groupId={} streamId={}", groupId, streamId);
InlongGroupInfo groupInfo = groupService.get(groupId);
- if (groupInfo == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
- }
+ Preconditions.checkNotNull(groupInfo, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL && groupStatus != GroupStatus.RESTARTED) {
throw new BusinessException(
- String.format("group status =%s not support start stream for groupId=%s", groupStatus, groupId));
+ String.format("group status=%s not support start stream for groupId=%s", groupStatus, groupId));
}
+
InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
- if (streamInfo == null) {
- throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
- }
+ Preconditions.checkNotNull(streamInfo, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
if (status == StreamStatus.CONFIG_ING) {
- log.warn("stream status={}, no need restart for groupId={}, streamId={}", status, groupId, streamId);
+ log.warn("stream status={}, not need restart for groupId={} streamId={}", status, groupId, streamId);
return true;
}
- // only new, failed, and success status support update
- if (status != StreamStatus.NEW && status != StreamStatus.CONFIG_FAILED
- && status != StreamStatus.CONFIG_SUCCESSFUL) {
- throw new BusinessException(
- String.format("stream status=%s not support start stream for groupId=%s streamId=%s",
- status, groupId, streamId));
+ if (StreamStatus.notAllowedUpdate(status)) {
+ String errMsg = String.format("stream status=%s not support start stream for groupId=%s streamId=%s",
+ status, groupId, streamId);
+ log.error(errMsg);
+ throw new BusinessException(errMsg);
}
+
StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.INIT);
ProcessName processName = ProcessName.CREATE_STREAM_RESOURCE;
if (sync) {