You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/07/09 13:56:09 UTC
[incubator-inlong] branch master updated: [INLONG-696] modify the
status of the entities after approval
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9e856b1 [INLONG-696] modify the status of the entities after approval
9e856b1 is described below
commit 9e856b19240d24bd1eaed740b769af5fa1d34294
Author: healzhou <he...@tencent.com>
AuthorDate: Fri Jul 9 21:40:33 2021 +0800
[INLONG-696] modify the status of the entities after approval
---
.../manager/common/enums/BizErrorCodeEnum.java | 5 --
.../inlong/manager/common/enums/EntityStatus.java | 76 +++++++++-------------
.../pojo/datastorage/BaseStorageClusterInfo.java | 34 ----------
inlong-manager/manager-dao/pom.xml | 14 ++++
.../manager/dao/mapper/DataStreamEntityMapper.java | 3 +
.../resources/mappers/DataStreamEntityMapper.xml | 13 +++-
.../resources/mappers/StorageHiveEntityMapper.xml | 6 +-
inlong-manager/manager-service/pom.xml | 19 ++++++
.../manager/service/core/BusinessService.java | 2 +-
.../manager/service/core/DataStreamService.java | 12 ++++
.../core/impl/BusinessProcessOperation.java | 2 +-
.../service/core/impl/BusinessServiceImpl.java | 26 ++++----
.../service/core/impl/DataStreamServiceImpl.java | 19 ++++--
.../service/core/impl/StorageHiveOperation.java | 2 +-
.../service/core/impl/StorageServiceImpl.java | 4 +-
.../service/thirdpart/hive/HiveTableOperator.java | 7 +-
.../sort/PushHiveConfigToSortEventListener.java | 52 ++++++++-------
.../CreateResourceWorkflowDefinition.java | 20 +++---
.../newbusiness/NewBusinessApproveForm.java | 14 ++--
.../newbusiness/NewBusinessWorkflowDefinition.java | 22 +++----
...ener.java => ApproveCancelProcessListener.java} | 4 +-
...kListener.java => ApprovePassTaskListener.java} | 10 +--
...ener.java => ApproveRejectProcessListener.java} | 4 +-
.../listener/CompleteProcessListener.java | 66 -------------------
.../CreateResourceCompleteProcessListener.java | 14 +++-
.../CreateResourceFailedProcessListener.java | 9 ++-
.../NewConsumptionWorkflowDefinition.java | 24 +++----
...er.java => ConsumptionApproveTaskListener.java} | 2 +-
....java => ConsumptionCancelProcessListener.java} | 4 +-
...ava => ConsumptionCompleteProcessListener.java} | 2 +-
....java => ConsumptionRejectProcessListener.java} | 12 +---
.../SingleStreamCompleteProcessListener.java | 13 +++-
.../SingleStreamFailedProcessListener.java | 14 +++-
.../manager/workflow/core/TransactionHelper.java | 11 ++--
inlong-manager/pom.xml | 6 ++
35 files changed, 258 insertions(+), 289 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizErrorCodeEnum.java
index 3084971..0955664 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizErrorCodeEnum.java
@@ -38,7 +38,6 @@ public enum BizErrorCodeEnum {
CLUSTER_NOT_FOUND(1101, "Cluster information does not exist"),
-
DATA_STREAM_NOT_FOUND(1201, "Data stream does not exist/no operation permission"),
DATA_STREAM_ID_DUPLICATE(1202, "The current business has a data stream with the same ID"),
DATA_STREAM_OPT_NOT_ALLOWED(1203,
@@ -72,10 +71,6 @@ public enum BizErrorCodeEnum {
STORAGE_HIVE_FIELD_SAVE_FAILED(1404, "Failed to save/update HIVE data storage field"),
STORAGE_OPT_NOT_ALLOWED(1405,
"The current business status does not allow adding/modifying/deleting data storage information"),
- STORAGE_CLUSTER_UPDATE_NOT_ALLOWED(1406,
- "Current business status does not allow modification of storage cluster information"),
- STORAGE_APP_GROUP_UPDATE_NOT_ALLOWED(1407,
- "Current business status does not allow modification of storage application group information"),
STORAGE_DB_NAME_UPDATE_NOT_ALLOWED(1408,
"The current business status does not allow modification of the storage target database name"),
STORAGE_TB_NAME_UPDATE_NOT_ALLOWED(1409,
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/EntityStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/EntityStatus.java
index 3a2dce0..fb3ed22 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/EntityStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/EntityStatus.java
@@ -36,25 +36,25 @@ public enum EntityStatus {
DELETED(40, "deleted"),
// Business related status
- BIZ_WAIT_APPLYING(100, "waiting to applying"),
- BIZ_WAIT_APPROVE(101, "waiting to approve"),
- BIZ_APPROVE_REJECT(102, "approval reject"),
- BIZ_APPROVE_PASS(103, "approval pass"),
- BIZ_CONFIG_ING(110, "configuring"),
- BIZ_CONFIG_FAILURE(120, "failed to config"),
- BIZ_CONFIG_SUCCESS(130, "successfully config"),
+ BIZ_WAIT_SUBMIT(100, "waiting for submit"),
+ BIZ_WAIT_APPROVAL(101, "waiting for approval"),
+ BIZ_APPROVE_REJECTED(102, "approval rejected"),
+ BIZ_APPROVE_PASSED(103, "approval passed"),
+ BIZ_CONFIG_ING(110, "in configure"),
+ BIZ_CONFIG_FAILED(120, "configuration failed"),
+ BIZ_CONFIG_SUCCESSFUL(130, "configuration successful"),
// Data stream related status
DATA_STREAM_NEW(100, "new"),
- DATA_STREAM_CONFIG_ING(110, "configuring"),
- DATA_STREAM_CONFIG_FAILURE(120, "failed to config"),
- DATA_STREAM_CONFIG_SUCCESS(130, "successfully config"),
+ DATA_STREAM_CONFIG_ING(110, "in configure"),
+ DATA_STREAM_CONFIG_FAILED(120, "configuration failed"),
+ DATA_STREAM_CONFIG_SUCCESSFUL(130, "configuration successful"),
// Data storage related status
DATA_STORAGE_NEW(100, "new"),
- DATA_STORAGE_CONFIG_ING(110, "configuring"),
- DATA_STORAGE_CONFIG_FAILURE(120, "failed to config"),
- DATA_STORAGE_CONFIG_SUCCESS(130, "successfully config"),
+ DATA_STORAGE_CONFIG_ING(110, "in configure"),
+ DATA_STORAGE_CONFIG_FAILED(120, "configuration failed"),
+ DATA_STORAGE_CONFIG_SUCCESSFUL(130, "configuration successful"),
// Data source related status
DATA_RESOURCE_NEW(200, "new"),
@@ -67,69 +67,51 @@ public enum EntityStatus {
AGENT_WAIT_DELETE(204, "wait delete"),
AGENT_WAIT_UPDATE(205, "wait update"),
- AGENT_ISSUED_CREATE(300, "created and issued"),
- AGENT_ISSUED_STOP(302, "stop has been issued"),
- AGENT_ISSUED_START(303, "start has been issued"),
- AGENT_ISSUED_DELETE(304, "deletion has been issued"),
- AGENT_ISSUED_UPDATE(305, "modification has been issued"),
-
- DEPLOY_WAIT(311, "waiting to change deployment"),
- DEPLOY_ING(312, "deploying"),
- DEPLOY_FAILURE(313, "deployment failed"),
- DEPLOY_SUCCESS(314, "deployed successfully"),
-
- ISSUE_WAIT(321, "waiting to issue"),
- ISSUE_ING(322, "issuing"),
- ISSUE_FAILURE(323, "failed to issue"),
- ISSUE_SUCCESS(324, "successfully issued"),
;
/**
* The status of the business that can initiate the approval process:
- * <p/>[BIZ_WAIT_APPLYING] [BIZ_APPROVE_REJECT] [BIZ_CONFIG_FAILURE] [BIZ_CONFIG_SUCCESS]
+ * <p/>[BIZ_WAIT_SUBMIT] [BIZ_APPROVE_REJECTED] [BIZ_CONFIG_FAILED] [BIZ_CONFIG_SUCCESSFUL]
*/
public static final List<Integer> ALLOW_START_WORKFLOW_STATUS = Arrays.asList(
- BIZ_WAIT_APPLYING.getCode(), BIZ_APPROVE_REJECT.getCode(), BIZ_CONFIG_FAILURE.getCode(),
- BIZ_CONFIG_SUCCESS.getCode());
+ BIZ_WAIT_SUBMIT.getCode(), BIZ_APPROVE_REJECTED.getCode(), BIZ_CONFIG_FAILED.getCode(),
+ BIZ_CONFIG_SUCCESSFUL.getCode());
/**
* The status of the business that can be modified:
- * <p/>[DRAFT] [BIZ_WAIT_APPLYING] [BIZ_APPROVE_REJECT] [BIZ_CONFIG_FAILURE] [BIZ_CONFIG_SUCCESS]
+ * <p/>[DRAFT] [BIZ_WAIT_SUBMIT] [BIZ_APPROVE_REJECTED] [BIZ_CONFIG_FAILED] [BIZ_CONFIG_SUCCESSFUL]
* <p/>[BIZ_CONFIG_ING] status cannot be modified
*/
public static final List<Integer> ALLOW_UPDATE_BIZ_STATUS = Arrays.asList(
- DRAFT.getCode(), BIZ_WAIT_APPLYING.getCode(), BIZ_APPROVE_REJECT.getCode(),
- BIZ_CONFIG_FAILURE.getCode(), BIZ_CONFIG_SUCCESS.getCode());
+ DRAFT.getCode(), BIZ_WAIT_SUBMIT.getCode(), BIZ_APPROVE_REJECTED.getCode(),
+ BIZ_CONFIG_FAILED.getCode(), BIZ_CONFIG_SUCCESSFUL.getCode());
/**
- * The status of the service that can be deleted:
- * <p/>[DRAFT] [BIZ_WAIT_APPLYING] [BIZ_APPROVE_REJECT] [BIZ_CONFIG_FAILURE] [BIZ_CONFIG_SUCCESS]
- * <p/>[BIZ_CONFIG_FAILURE] [BIZ_CONFIG_SUCCESS] status cannot be deleted
+ * The status of the service that can be deleted - all status
+ * <p/>[DRAFT] [BIZ_WAIT_SUBMIT] [BIZ_APPROVE_REJECTED] [BIZ_CONFIG_ING] [BIZ_CONFIG_FAILED] [BIZ_CONFIG_SUCCESSFUL]
+ * <p/>[BIZ_WAIT_APPROVAL] [BIZ_APPROVE_PASSED] status cannot be deleted
*/
public static final List<Integer> ALLOW_DELETE_BIZ_STATUS = Arrays.asList(
- DRAFT.getCode(), BIZ_WAIT_APPLYING.getCode(), BIZ_APPROVE_REJECT.getCode(),
- BIZ_CONFIG_FAILURE.getCode(), BIZ_CONFIG_SUCCESS.getCode());
+ DRAFT.getCode(), BIZ_WAIT_SUBMIT.getCode(), BIZ_APPROVE_REJECTED.getCode(),
+ BIZ_CONFIG_ING.getCode(), BIZ_CONFIG_FAILED.getCode(), BIZ_CONFIG_SUCCESSFUL.getCode());
/**
* The business can cascade to delete the status of the associated data:
- * <p/>[DRAFT] [BIZ_WAIT_APPLYING]
*/
public static final List<Integer> ALLOW_DELETE_BIZ_CASCADE_STATUS = Arrays.asList(
- DRAFT.getCode(), BIZ_WAIT_APPLYING.getCode());
+ DRAFT.getCode(), BIZ_WAIT_SUBMIT.getCode());
/**
- * Status of business approval:
- * <p/>[BIZ_CONFIG_FAILURE] [BIZ_CONFIG_SUCCESS]
+ * Status of business approval
*/
public static final List<Integer> BIZ_APPROVE_PASS_STATUS = Arrays.asList(
- BIZ_CONFIG_FAILURE.getCode(), BIZ_CONFIG_SUCCESS.getCode());
+ BIZ_CONFIG_FAILED.getCode(), BIZ_CONFIG_SUCCESSFUL.getCode());
/**
- * Temporary business status, adding, deleting and modifying operations are not allowed:
- * <p/>[BIZ_WAIT_APPROVE] [BIZ_CONFIG_ING]
+ * Temporary business status, adding, deleting and modifying operations are not allowed
*/
public static final List<Integer> BIZ_TEMP_STATUS = Arrays.asList(
- BIZ_WAIT_APPROVE.getCode(), BIZ_CONFIG_ING.getCode());
+ BIZ_WAIT_APPROVAL.getCode(), BIZ_CONFIG_ING.getCode());
private final Integer code;
private final String description;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageClusterInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageClusterInfo.java
deleted file mode 100644
index 225cc3b..0000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageClusterInfo.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.datastorage;
-
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-/**
- * Basic cluster information
- */
-@Data
-public class BaseStorageClusterInfo {
-
- private Integer id;
-
- @ApiModelProperty("Cluster name")
- private String name;
-
-}
diff --git a/inlong-manager/manager-dao/pom.xml b/inlong-manager/manager-dao/pom.xml
index 56c46fc..52d774e 100644
--- a/inlong-manager/manager-dao/pom.xml
+++ b/inlong-manager/manager-dao/pom.xml
@@ -129,6 +129,20 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>hive-llap-tez</artifactId>
+ <groupId>org.apache.hive</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hive-vector-code-gen</artifactId>
+ <groupId>org.apache.hive</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
index e50a3d8..7e74009 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
@@ -80,4 +80,7 @@ public interface DataStreamEntityMapper {
*/
int deleteAllByBid(@Param("bid") String bid);
+ int updateStatusByIdentifier(@Param("bid") String bid, @Param("dsid") String dsid, @Param("status") Integer status,
+ @Param("modifier") String modifier);
+
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
index 24e7b0b..064f845 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
@@ -44,7 +44,7 @@
</resultMap>
<resultMap id="dataStreamFullInfo"
- type="org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig">
+ type="org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig">
<result column="id" jdbcType="VARCHAR" property="id"/>
<result column="data_stream_identifier" jdbcType="VARCHAR" property="dataStreamIdentifier"/>
<result column="name" jdbcType="VARCHAR" property="businessIdentifier"/>
@@ -493,6 +493,17 @@
and data_stream_identifier = #{dataStreamIdentifier, jdbcType=VARCHAR}
and is_deleted = 0
</update>
+ <update id="updateStatusByIdentifier">
+ update data_stream
+ set status = #{status, jdbcType=INTEGER}, modifier = #{modifier, jdbcType=VARCHAR}
+ <where>
+ business_identifier = #{bid, jdbcType=VARCHAR}
+ <if test="dsid != null and dsid != ''">
+ and data_stream_identifier = #{dsid, jdbcType=VARCHAR}
+ </if>
+ and is_deleted = 0
+ </where>
+ </update>
<select id="queryStreamToHiveBaseInfoByBid" resultMap="dataStreamFullInfo">
SELECT h.id,
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
index ad9a088..43b1316 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
@@ -86,9 +86,7 @@
from storage_hive
<where>
is_deleted = 0
- <if test="bid != null and bid != ''">
- and business_identifier = #{bid, jdbcType=VARCHAR}
- </if>
+ and business_identifier = #{bid, jdbcType=VARCHAR}
<if test="dsid != null and dsid != ''">
and data_stream_identifier = #{dsid, jdbcType=VARCHAR}
</if>
@@ -432,7 +430,7 @@
file_format = #{fileFormat,jdbcType=VARCHAR},
field_splitter = #{fieldSplitter,jdbcType=VARCHAR},
encoding_type = #{encodingType,jdbcType=VARCHAR},
- hdfs_default_fs = #{hdfsDefaultFs,jdbcType=VARCHAR},
+ hdfs_default_fs = #{hdfsDefaultFs,jdbcType=VARCHAR},
warehouse_dir = #{warehouseDir,jdbcType=VARCHAR},
usage_interval = #{usageInterval,jdbcType=VARCHAR},
storage_period = #{storagePeriod,jdbcType=INTEGER},
diff --git a/inlong-manager/manager-service/pom.xml b/inlong-manager/manager-service/pom.xml
index c60baf1..ce04fab 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -116,6 +116,25 @@
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
+ <exclusion>
+ <artifactId>hive-classification</artifactId>
+ <groupId>org.apache.hive</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>hive-llap-tez</artifactId>
+ <groupId>org.apache.hive</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hive-vector-code-gen</artifactId>
+ <groupId>org.apache.hive</groupId>
+ </exclusion>
</exclusions>
</dependency>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/BusinessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/BusinessService.java
index df7756b..40af7c7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/BusinessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/BusinessService.java
@@ -68,7 +68,7 @@ public interface BusinessService {
* Modify the status of the specified business
*
* @param businessIdentifier Business identifier
- * @param status Modified state
+ * @param status Modified status
* @param operator Current operator
* @return whether succeed
*/
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
index 8d19480..8bcf83a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
@@ -179,4 +179,16 @@ public interface DataStreamService {
*/
boolean updateAfterApprove(List<DataStreamApproveInfo> streamApproveList, String operator);
+ /**
+ * Update stream status
+ *
+ * @param bid Business identifier
+ * @param dsid Data stream identifier
+ * @param status Modified status
+ * @param operator Edit person's name
+ * @return whether succeed
+ * @apiNote If dsid is null, update all data stream associated with bid
+ */
+ boolean updateStatus(String bid, String dsid, Integer status, String operator);
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessProcessOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessProcessOperation.java
index 4fd6f14..090fb25 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessProcessOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessProcessOperation.java
@@ -80,7 +80,7 @@ public class BusinessProcessOperation {
// Modify business status and other information
entity.setModifier(operator);
entity.setModifyTime(new Date());
- entity.setStatus(EntityStatus.BIZ_WAIT_APPROVE.getCode());
+ entity.setStatus(EntityStatus.BIZ_WAIT_APPROVAL.getCode());
int success = businessMapper.updateByIdentifierSelective(entity);
Preconditions.checkTrue(success == 1, "failed to update business during assign and start process");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
index e59621a..5b5931e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
@@ -94,8 +94,8 @@ public class BusinessServiceImpl implements BusinessService {
// Only M0 is currently supported
entity.setSchemaName(BizConstant.SCHEMA_M0_DAY);
- // After saving, the status is set to [BIZ_WAIT_APPLYING]
- entity.setStatus(EntityStatus.BIZ_WAIT_APPLYING.getCode());
+ // After saving, the status is set to [BIZ_WAIT_SUBMIT]
+ entity.setStatus(EntityStatus.BIZ_WAIT_SUBMIT.getCode());
entity.setCreator(operator);
entity.setModifier(operator);
@@ -161,8 +161,8 @@ public class BusinessServiceImpl implements BusinessService {
this.checkBizCanUpdate(entity, businessInfo);
CommonBeanUtils.copyProperties(businessInfo, entity, true);
- if (EntityStatus.BIZ_CONFIG_FAILURE.getCode().equals(entity.getStatus())) {
- entity.setStatus(EntityStatus.BIZ_WAIT_APPLYING.getCode());
+ if (EntityStatus.BIZ_CONFIG_FAILED.getCode().equals(entity.getStatus())) {
+ entity.setStatus(EntityStatus.BIZ_WAIT_SUBMIT.getCode());
}
entity.setModifier(operator);
businessMapper.updateByIdentifierSelective(entity);
@@ -191,7 +191,7 @@ public class BusinessServiceImpl implements BusinessService {
throw new BusinessException(BizErrorCodeEnum.BUSINESS_UPDATE_NOT_ALLOWED);
}
- // Non-[draft] status, no bid modification allowed
+ // Non-[DRAFT] status, no bid modification allowed
boolean updateBid = !EntityStatus.DRAFT.getCode().equals(oldStatus)
&& !Objects.equals(entity.getBusinessIdentifier(), businessInfo.getBusinessIdentifier());
if (updateBid) {
@@ -200,7 +200,7 @@ public class BusinessServiceImpl implements BusinessService {
}
// [Configuration successful] Status, bid and middleware type are not allowed to be modified
- if (EntityStatus.BIZ_CONFIG_SUCCESS.getCode().equals(oldStatus)) {
+ if (EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode().equals(oldStatus)) {
if (!Objects.equals(entity.getBusinessIdentifier(), businessInfo.getBusinessIdentifier())) {
LOGGER.error("current status was not allowed to update business identifier");
throw new BusinessException(BizErrorCodeEnum.BUSINESS_BID_UPDATE_NOT_ALLOWED);
@@ -241,17 +241,17 @@ public class BusinessServiceImpl implements BusinessService {
throw new BusinessException(BizErrorCodeEnum.BUSINESS_DELETE_NOT_ALLOWED);
}
- // [DRAFT] [BIZ_WAIT_APPLYING] status, all associated data can be logically deleted directly
+ // [DRAFT] [BIZ_WAIT_SUBMIT] status, all associated data can be logically deleted directly
if (EntityStatus.ALLOW_DELETE_BIZ_CASCADE_STATUS.contains(entity.getStatus())) {
// Logically delete data streams, data sources and data storage information
streamService.logicDeleteAllByBid(entity.getBusinessIdentifier(), operator);
} else {
- // In other states, you need to delete the associated "data stream" data before you can delete it
- // When deleting a data stream, you also need to check whether there is
- // an associated "data source" and "data storage" under it
+ // In other states, you need to delete the associated "data stream" first.
+ // When deleting a data stream, you also need to check whether there are
+ // some associated "data source" and "data storage"
int count = streamService.selectCountByBid(bid);
if (count >= 1) {
- LOGGER.error("bid={} have [{}] data stream, deleted failed", bid, count);
+ LOGGER.error("bid={} have [{}] data streams, deleted failed", bid, count);
throw new BusinessException(BizErrorCodeEnum.BUSINESS_HAS_DATA_STREAM);
}
}
@@ -291,9 +291,9 @@ public class BusinessServiceImpl implements BusinessService {
countVO.setTotalCount(countVO.getTotalCount() + count);
if (status == EntityStatus.BIZ_CONFIG_ING.getCode()) {
countVO.setWaitAssignCount(countVO.getWaitAssignCount() + count);
- } else if (status == EntityStatus.BIZ_WAIT_APPROVE.getCode()) {
+ } else if (status == EntityStatus.BIZ_WAIT_APPROVAL.getCode()) {
countVO.setWaitApproveCount(countVO.getWaitApproveCount() + count);
- } else if (status == EntityStatus.BIZ_APPROVE_REJECT.getCode()) {
+ } else if (status == EntityStatus.BIZ_APPROVE_REJECTED.getCode()) {
countVO.setRejectCount(countVO.getRejectCount() + count);
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
index 71b0797..ac83a2c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
@@ -621,6 +621,17 @@ public class DataStreamServiceImpl implements DataStreamService {
return true;
}
+ @Override
+ public boolean updateStatus(String bid, String dsid, Integer status, String operator) {
+ LOGGER.debug("begin to update status by bid={}, dsid={}", bid, dsid);
+
+ // businessMapper.updateStatusByIdentifier(bid, status, operator);
+ streamMapper.updateStatusByIdentifier(bid, dsid, status, operator);
+
+ LOGGER.info("success to update stream after approve");
+ return true;
+ }
+
/**
* Update extended information
* <p/>First physically delete the existing extended information, and then add this batch of extended information
@@ -716,16 +727,16 @@ public class DataStreamServiceImpl implements DataStreamService {
}
// Fields that are not allowed to be modified when the business [configuration is successful]
- if (EntityStatus.BIZ_CONFIG_SUCCESS.getCode().equals(bizStatus)) {
+ if (EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode().equals(bizStatus)) {
checkUpdatedFields(streamEntity, streamInfo);
}
// Business [Waiting to submit] [Approval rejected] [Configuration failed], if there is a
// data source/data storage, the fields that are not allowed to be modified
List<Integer> statusList = Arrays.asList(
- EntityStatus.BIZ_WAIT_APPLYING.getCode(),
- EntityStatus.BIZ_APPROVE_REJECT.getCode(),
- EntityStatus.BIZ_CONFIG_FAILURE.getCode());
+ EntityStatus.BIZ_WAIT_SUBMIT.getCode(),
+ EntityStatus.BIZ_APPROVE_REJECTED.getCode(),
+ EntityStatus.BIZ_CONFIG_FAILED.getCode());
if (statusList.contains(bizStatus)) {
String bid = streamInfo.getBusinessIdentifier();
String dsid = streamInfo.getDataStreamIdentifier();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
index d37f394..9bdd6bb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
@@ -295,7 +295,7 @@ public class StorageHiveOperation extends StorageBaseOperation {
// When the business status is [Configuration successful], modification and deletion are not allowed,
// only adding is allowed, and the order of existing fields cannot be changed
- if (EntityStatus.BIZ_CONFIG_SUCCESS.getCode().equals(bizStatus)) {
+ if (EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode().equals(bizStatus)) {
List<StorageHiveFieldEntity> existsFieldList = hiveFieldMapper.selectByStorageId(storageId);
if (existsFieldList.size() > fieldInfoList.size()) {
LOGGER.error("current status was not allowed to update hive field");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
index 18043c7..c168f6d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
@@ -87,7 +87,7 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
// If the business status is [Configuration Successful], then asynchronously initiate
// the [Single data stream Resource Creation] workflow
- if (EntityStatus.BIZ_CONFIG_SUCCESS.getCode().equals(businessEntity.getStatus())) {
+ if (EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode().equals(businessEntity.getStatus())) {
super.executorService.execute(new WorkflowStartRunnable(operator, businessEntity, dsid));
}
@@ -204,7 +204,7 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
// The business status is [Configuration successful], then asynchronously initiate
// the [Single data stream resource creation] workflow
- if (EntityStatus.BIZ_CONFIG_SUCCESS.getCode().equals(businessEntity.getStatus())) {
+ if (EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode().equals(businessEntity.getStatus())) {
super.executorService.execute(new WorkflowStartRunnable(operator, businessEntity, dsid));
}
LOGGER.info("success to update storage info");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
index c54772e..d2a4b8f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
@@ -57,7 +57,6 @@ public class HiveTableOperator {
}
HiveTableQueryBean tableBean = getTableQueryBean(hiveConfig);
-
try {
// create database if not exists
dataSourceService.createDb(tableBean);
@@ -76,10 +75,12 @@ public class HiveTableOperator {
dataSourceService.createColumn(tableBean);
}
}
- } catch (Exception e) {
+ storageService.updateHiveStatusById(hiveConfig.getId(),
+ EntityStatus.DATA_STORAGE_CONFIG_SUCCESSFUL.getCode(), "create hive table success");
+ } catch (Throwable e) {
log.error("create hive table error, ", e);
storageService.updateHiveStatusById(hiveConfig.getId(),
- EntityStatus.DATA_STORAGE_CONFIG_FAILURE.getCode(), e.getMessage());
+ EntityStatus.DATA_STORAGE_CONFIG_FAILED.getCode(), e.getMessage());
throw new WorkflowException("create hive table failed, reason: " + e.getMessage());
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
index 99e7024..4cac764 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
@@ -26,7 +26,7 @@ import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.StorageHiveEntity;
import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
import org.apache.inlong.manager.service.core.DataStreamService;
-import org.apache.inlong.manager.service.core.impl.StorageHiveOperation;
+import org.apache.inlong.manager.service.core.StorageService;
import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
@@ -56,14 +56,11 @@ import lombok.extern.slf4j.Slf4j;
public class PushHiveConfigToSortEventListener implements TaskEventListener {
@Autowired
+ private StorageService storageService;
+ @Autowired
private StorageHiveEntityMapper storageHiveMapper;
-
@Autowired
private DataStreamService dataStreamService;
-
- @Autowired
- private StorageHiveOperation storageHiveOperation;
-
@Autowired
private ClusterBean clusterBean;
@@ -81,11 +78,14 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
BusinessInfo businessInfo = form.getBusinessInfo();
String bid = businessInfo.getBusinessIdentifier();
+ // if dsid not null, just push the config belongs to the bid and the dsid
+ String dsid = form.getDataStreamIdentifier();
- List<StorageHiveEntity> storageHiveEntities = storageHiveMapper.selectByIdentifier(bid, null);
+ List<StorageHiveEntity> storageHiveEntities = storageHiveMapper.selectByIdentifier(bid, dsid);
for (StorageHiveEntity hiveEntity : storageHiveEntities) {
- Integer hiveStorageId = hiveEntity.getId();
- StorageHiveInfo hiveStorage = (StorageHiveInfo) storageHiveOperation.getHiveStorage(hiveStorageId);
+ Integer storageId = hiveEntity.getId();
+ StorageHiveInfo hiveStorage = (StorageHiveInfo) storageService
+ .getById(BizConstant.STORAGE_TYPE_HIVE, storageId);
if (log.isDebugEnabled()) {
log.debug("hive storage info: {}", hiveStorage);
}
@@ -94,17 +94,16 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
if (log.isDebugEnabled()) {
log.debug("try to push hive config to sort: {}", JsonUtils.toJson(dataFlowInfo));
}
-
try {
String zkUrl = clusterBean.getZkUrl();
String zkRoot = clusterBean.getZkRoot();
// push data flow info to zk
String sortClusterName = clusterBean.getAppName();
- ZkTools.updateDataFlowInfo(dataFlowInfo, sortClusterName, hiveStorageId, zkUrl, zkRoot);
+ ZkTools.updateDataFlowInfo(dataFlowInfo, sortClusterName, storageId, zkUrl, zkRoot);
// add storage id to zk
- ZkTools.addDataFlowToCluster(sortClusterName, hiveStorageId, zkUrl, zkRoot);
+ ZkTools.addDataFlowToCluster(sortClusterName, storageId, zkUrl, zkRoot);
} catch (Exception e) {
- log.error("add or update data stream information to zk failed, storageId={} ", hiveStorageId, e);
+ log.error("add or update data stream information to zk failed, storageId={} ", storageId, e);
throw new WorkflowListenerException("push hive config to sort failed, reason: " + e.getMessage());
}
}
@@ -113,6 +112,21 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
}
private DataFlowInfo getDataFlowInfo(BusinessInfo businessInfo, StorageHiveInfo hiveStorage) {
+ Stream<FieldInfo> hiveFields = hiveStorage.getHiveFieldList().stream().map(field -> {
+ FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getFieldType().toLowerCase());
+ return new FieldInfo(field.getFieldName(), formatInfo);
+ });
+
+ List<FieldInfo> sinkFields = hiveFields.collect(Collectors.toList());
+ FieldInfo partitionFieldInfo = new FieldInfo(hiveStorage.getPrimaryPartition(),
+ new TimestampFormatInfo("MILLIS"));
+ sinkFields.add(partitionFieldInfo);
+
+ String hiveServerUrl = hiveStorage.getJdbcUrl();
+ if (hiveServerUrl != null && !hiveServerUrl.startsWith("jdbc:hive2://")) {
+ hiveServerUrl = "jdbc:hive2://" + hiveServerUrl;
+ }
+
// dataPath = hdfsUrl + / + warehouseDir + / + dbName + .db/ + tableName
StringBuilder dataPathBuilder = new StringBuilder();
String hdfsUrl = hiveStorage.getHdfsDefaultFs();
@@ -138,20 +152,10 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
splitter = hiveStorage.getFieldSplitter().charAt(0);
}
- Stream<FieldInfo> hiveFields = hiveStorage.getHiveFieldList().stream().map(field -> {
- FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getFieldType().toLowerCase());
- return new FieldInfo(field.getFieldName(), formatInfo);
- });
-
- List<FieldInfo> sinkFields = hiveFields.collect(Collectors.toList());
- FieldInfo partitionFieldInfo = new FieldInfo(hiveStorage.getPrimaryPartition(),
- new TimestampFormatInfo("MILLIS"));
- sinkFields.add(partitionFieldInfo);
-
// encapsulate hive sink
HiveSinkInfo hiveSinkInfo = new HiveSinkInfo(
sinkFields.toArray(new FieldInfo[0]),
- hiveStorage.getJdbcUrl(),
+ hiveServerUrl,
hiveStorage.getDbName(),
hiveStorage.getTableName(),
hiveStorage.getUsername(),
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
index 5085df6..9c81be9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
@@ -116,21 +116,21 @@ public class CreateResourceWorkflowDefinition implements WorkflowDefinition {
createTubeTopicTask.addListener(createTubeTopicTaskEventListener);
process.addTask(createTubeTopicTask);
- ServiceTask createTubeConsumerGroupTask = new ServiceTask();
- createTubeConsumerGroupTask.setSkipResolver(c -> {
+ ServiceTask createConsumerGroupForSortTask = new ServiceTask();
+ createConsumerGroupForSortTask.setSkipResolver(c -> {
CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) c.getProcessForm();
BusinessInfo businessInfo = form.getBusinessInfo();
if (BizConstant.MIDDLEWARE_TYPE_TUBE.equalsIgnoreCase(businessInfo.getMiddlewareType())) {
return false;
}
- log.warn("no need to create tube resource for bid={}", form.getBusinessId());
+ log.warn("no need to create tube resource for bid={}, as the middleware type is {}",
+ form.getBusinessId(), businessInfo.getMiddlewareType());
return true;
});
- createTubeConsumerGroupTask.setName("createTubeSortConsumerGroup");
- createTubeConsumerGroupTask.setDisplayName("Create Tube Consumer Group");
- createTubeConsumerGroupTask.addListener(createTubeConsumeGroupTaskEventListener);
- process.addTask(createTubeConsumerGroupTask);
-
+ createConsumerGroupForSortTask.setName("createConsumerGroupForSort");
+ createConsumerGroupForSortTask.setDisplayName("Create Consumer Group For Sort");
+ createConsumerGroupForSortTask.addListener(createTubeConsumeGroupTaskEventListener);
+ process.addTask(createConsumerGroupForSortTask);
ServiceTask createHiveTablesTask = new ServiceTask();
createHiveTablesTask.setSkipResolver(c -> {
@@ -161,8 +161,8 @@ public class CreateResourceWorkflowDefinition implements WorkflowDefinition {
process.addTask(pushSortConfig);
startEvent.addNext(createTubeTopicTask);
- createTubeTopicTask.addNext(createTubeConsumerGroupTask);
- createTubeConsumerGroupTask.addNext(createHiveTablesTask);
+ createTubeTopicTask.addNext(createConsumerGroupForSortTask);
+ createConsumerGroupForSortTask.addNext(createHiveTablesTask);
createHiveTablesTask.addNext(pushSortConfig);
pushSortConfig.addNext(endEvent);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessApproveForm.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessApproveForm.java
index b3a0efe..f9c68b0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessApproveForm.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessApproveForm.java
@@ -17,22 +17,18 @@
package org.apache.inlong.manager.service.workflow.newbusiness;
-import org.apache.inlong.manager.common.pojo.business.BusinessApproveInfo;
-import org.apache.inlong.manager.common.pojo.datastream.DataStreamApproveInfo;
-import org.apache.inlong.manager.service.workflow.BaseWorkflowTaskFormType;
-import org.apache.inlong.manager.workflow.exception.FormValidateException;
-import org.apache.inlong.manager.common.util.Preconditions;
-
import io.swagger.annotations.ApiModelProperty;
-
import java.util.List;
-
import lombok.Data;
import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.pojo.business.BusinessApproveInfo;
+import org.apache.inlong.manager.common.pojo.datastream.DataStreamApproveInfo;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.service.workflow.BaseWorkflowTaskFormType;
+import org.apache.inlong.manager.workflow.exception.FormValidateException;
/**
* The system administrator approves and fills in the form
- *
*/
@Data
@EqualsAndHashCode(callSuper = false)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java
index 97fcb8c..c403c67 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java
@@ -22,10 +22,9 @@ import org.apache.inlong.manager.common.pojo.workflow.WorkflowApproverFilterCont
import org.apache.inlong.manager.service.core.WorkflowApproverService;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.ApproveTaskListener;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.CancelProcessListener;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.CompleteProcessListener;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.RejectProcessListener;
+import org.apache.inlong.manager.service.workflow.newbusiness.listener.ApproveCancelProcessListener;
+import org.apache.inlong.manager.service.workflow.newbusiness.listener.ApprovePassTaskListener;
+import org.apache.inlong.manager.service.workflow.newbusiness.listener.ApproveRejectProcessListener;
import org.apache.inlong.manager.service.workflow.newbusiness.listener.StartCreateResourceProcessListener;
import org.apache.inlong.manager.workflow.model.definition.EndEvent;
import org.apache.inlong.manager.workflow.model.definition.Process;
@@ -41,13 +40,11 @@ import org.springframework.stereotype.Component;
public class NewBusinessWorkflowDefinition implements WorkflowDefinition {
@Autowired
- private ApproveTaskListener approveTaskListener;
+ private ApprovePassTaskListener approvePassTaskListener;
@Autowired
- private CancelProcessListener cancelProcessListener;
+ private ApproveCancelProcessListener approveCancelProcessListener;
@Autowired
- private RejectProcessListener rejectProcessListener;
- @Autowired
- private CompleteProcessListener completeProcessListener;
+ private ApproveRejectProcessListener approveRejectProcessListener;
@Autowired
private StartCreateResourceProcessListener startCreateResourceProcessListener;
@Autowired
@@ -64,9 +61,8 @@ public class NewBusinessWorkflowDefinition implements WorkflowDefinition {
process.setVersion(1);
// Set up the listener
- process.addListener(cancelProcessListener);
- process.addListener(rejectProcessListener);
- process.addListener(completeProcessListener);
+ process.addListener(approveCancelProcessListener);
+ process.addListener(approveRejectProcessListener);
// Initiate the process of creating business resources,
// and set the business status to [Configuration Successful]/[Configuration Failed] according to its completion
process.addListener(startCreateResourceProcessListener);
@@ -85,7 +81,7 @@ public class NewBusinessWorkflowDefinition implements WorkflowDefinition {
adminUserTask.setDisplayName("System Administrator");
adminUserTask.setFormClass(NewBusinessApproveForm.class);
adminUserTask.setApproverAssign(context -> getTaskApprovers(adminUserTask.getName()));
- adminUserTask.addListener(approveTaskListener);
+ adminUserTask.addListener(approvePassTaskListener);
process.addTask(adminUserTask);
// Configuration order relationship
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CancelProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveCancelProcessListener.java
similarity index 95%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CancelProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveCancelProcessListener.java
index 4199ff2..6ad50b2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CancelProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveCancelProcessListener.java
@@ -34,7 +34,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
-public class CancelProcessListener implements ProcessEventListener {
+public class ApproveCancelProcessListener implements ProcessEventListener {
@Autowired
private BusinessService businessService;
@@ -50,7 +50,7 @@ public class CancelProcessListener implements ProcessEventListener {
// After canceling the approval, the status becomes [Waiting to submit]
String bid = form.getBusinessId();
String username = context.getApplicant();
- businessService.updateStatus(bid, EntityStatus.BIZ_WAIT_APPLYING.getCode(), username);
+ businessService.updateStatus(bid, EntityStatus.BIZ_WAIT_SUBMIT.getCode(), username);
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApprovePassTaskListener.java
similarity index 97%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveTaskListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApprovePassTaskListener.java
index 83f09dc..4bef36a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApprovePassTaskListener.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.service.workflow.newbusiness.listener;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.pojo.business.BusinessApproveInfo;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamApproveInfo;
import org.apache.inlong.manager.service.core.BusinessService;
@@ -27,21 +29,15 @@ import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
import org.apache.inlong.manager.workflow.model.WorkflowContext;
-
-import java.util.List;
-
-import lombok.extern.slf4j.Slf4j;
-
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* New Service Access-System Administrator Approval Task Event Listener
- *
*/
@Slf4j
@Component
-public class ApproveTaskListener implements TaskEventListener {
+public class ApprovePassTaskListener implements TaskEventListener {
@Autowired
private BusinessService businessService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/RejectProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveRejectProcessListener.java
similarity index 95%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/RejectProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveRejectProcessListener.java
index 50914d4..baa8fdd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/RejectProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveRejectProcessListener.java
@@ -34,7 +34,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
-public class RejectProcessListener implements ProcessEventListener {
+public class ApproveRejectProcessListener implements ProcessEventListener {
@Autowired
private BusinessService businessService;
@@ -50,7 +50,7 @@ public class RejectProcessListener implements ProcessEventListener {
// after reject, update business status to [BIZ_APPROVE_REJECT]
String bid = form.getBusinessId();
String username = context.getApplicant();
- businessService.updateStatus(bid, EntityStatus.BIZ_APPROVE_REJECT.getCode(), username);
+ businessService.updateStatus(bid, EntityStatus.BIZ_APPROVE_REJECTED.getCode(), username);
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CompleteProcessListener.java
deleted file mode 100644
index a909b30..0000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CompleteProcessListener.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.newbusiness.listener;
-
-import org.apache.inlong.manager.common.enums.EntityStatus;
-import org.apache.inlong.manager.service.core.BusinessService;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessWorkflowForm;
-import org.apache.inlong.manager.workflow.core.event.ListenerResult;
-import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
-import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
-import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
-import org.apache.inlong.manager.workflow.model.WorkflowContext;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Listener after new business approval [process completed]
- *
- */
-@Slf4j
-@Component
-public class CompleteProcessListener implements ProcessEventListener {
-
- @Autowired
- private BusinessService businessService;
-
- @Override
- public ProcessEvent event() {
- return ProcessEvent.COMPLETE;
- }
-
- /**
- * After the business approval process is completed, modify the business status to [Configuring]
- */
- @Override
- public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- NewBusinessWorkflowForm form = (NewBusinessWorkflowForm) context.getProcessForm();
- String bid = form.getBusinessId();
- String username = context.getApplicant();
- businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_ING.getCode(), username);
- return ListenerResult.success();
- }
-
- @Override
- public boolean async() {
- return false;
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceCompleteProcessListener.java
index 261b8df..0ce2509 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceCompleteProcessListener.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.workflow.newbusiness.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.core.DataStreamService;
import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
@@ -38,6 +39,8 @@ public class CreateResourceCompleteProcessListener implements ProcessEventListen
@Autowired
private BusinessService businessService;
+ @Autowired
+ private DataStreamService dataStreamService;
@Override
public ProcessEvent event() {
@@ -45,16 +48,21 @@ public class CreateResourceCompleteProcessListener implements ProcessEventListen
}
/**
- * After the process of creating business resources is completed, modify the business status to
- * [Configuration Successful] [Configuration Failed]
+ * After the process of creating business resources is completed, modify the status of business and all data stream
+ * belong to this business to [Configuration Successful] [Configuration Failed]
* <p/>{@link CreateResourceFailedProcessListener#listen}
*/
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+
String bid = form.getBusinessId();
String username = context.getApplicant();
- businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_SUCCESS.getCode(), username);
+ // update business status
+ businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode(), username);
+ // update data stream status
+ dataStreamService.updateStatus(bid, null, EntityStatus.DATA_STREAM_CONFIG_SUCCESSFUL.getCode(), username);
+
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceFailedProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceFailedProcessListener.java
index 06f503e..d81f545 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceFailedProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceFailedProcessListener.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.workflow.newbusiness.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.core.DataStreamService;
import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
@@ -38,6 +39,8 @@ public class CreateResourceFailedProcessListener implements ProcessEventListener
@Autowired
private BusinessService businessService;
+ @Autowired
+ private DataStreamService dataStreamService;
@Override
public ProcessEvent event() {
@@ -54,7 +57,11 @@ public class CreateResourceFailedProcessListener implements ProcessEventListener
CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
String bid = form.getBusinessId();
String username = context.getApplicant();
- businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_FAILURE.getCode(), username);
+
+ // update business status
+ businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_FAILED.getCode(), username);
+ // update data stream status
+ dataStreamService.updateStatus(bid, null, EntityStatus.DATA_STREAM_CONFIG_FAILED.getCode(), username);
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
index c5da39e..fe809ae 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
@@ -28,10 +28,10 @@ import org.apache.inlong.manager.service.core.BusinessService;
import org.apache.inlong.manager.service.core.WorkflowApproverService;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.newconsumption.listener.ApproveTaskEventListener;
-import org.apache.inlong.manager.service.workflow.newconsumption.listener.CancelProcessEventListener;
-import org.apache.inlong.manager.service.workflow.newconsumption.listener.CompleteProcessEventListener;
-import org.apache.inlong.manager.service.workflow.newconsumption.listener.RejectProcessEventListener;
+import org.apache.inlong.manager.service.workflow.newconsumption.listener.ConsumptionApproveTaskListener;
+import org.apache.inlong.manager.service.workflow.newconsumption.listener.ConsumptionCancelProcessListener;
+import org.apache.inlong.manager.service.workflow.newconsumption.listener.ConsumptionCompleteProcessListener;
+import org.apache.inlong.manager.service.workflow.newconsumption.listener.ConsumptionRejectProcessListener;
import org.apache.inlong.manager.workflow.model.WorkflowContext;
import org.apache.inlong.manager.workflow.model.definition.EndEvent;
import org.apache.inlong.manager.workflow.model.definition.Process;
@@ -50,16 +50,16 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
public static final String UT_BIZ_OWNER_NAME = "ut_biz_owner";
@Autowired
- private CompleteProcessEventListener completeProcessEventListener;
+ private ConsumptionCompleteProcessListener consumptionCompleteProcessListener;
@Autowired
- private ApproveTaskEventListener approveTaskEventListener;
+ private ConsumptionApproveTaskListener consumptionApproveTaskListener;
@Autowired
- private RejectProcessEventListener rejectProcessEventListener;
+ private ConsumptionRejectProcessListener consumptionRejectProcessListener;
@Autowired
- private CancelProcessEventListener cancelProcessEventListener;
+ private ConsumptionCancelProcessListener consumptionCancelProcessListener;
@Autowired
private WorkflowApproverService workflowApproverService;
@@ -103,7 +103,7 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
adminUserTask.setDisplayName("System Administrator");
adminUserTask.setFormClass(NewConsumptionApproveForm.class);
adminUserTask.setApproverAssign(this::adminUserTaskApprover);
- adminUserTask.addListener(approveTaskEventListener);
+ adminUserTask.addListener(consumptionApproveTaskListener);
process.addTask(adminUserTask);
// Set order relationship
@@ -112,9 +112,9 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
adminUserTask.addNext(endEvent);
// Set up the listener
- process.addListener(completeProcessEventListener);
- process.addListener(rejectProcessEventListener);
- process.addListener(cancelProcessEventListener);
+ process.addListener(consumptionCompleteProcessListener);
+ process.addListener(consumptionRejectProcessListener);
+ process.addListener(consumptionCancelProcessListener);
return process;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ApproveTaskEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java
similarity index 97%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ApproveTaskEventListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java
index ab15916..3417151 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ApproveTaskEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java
@@ -37,7 +37,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
-public class ApproveTaskEventListener implements TaskEventListener {
+public class ConsumptionApproveTaskListener implements TaskEventListener {
@Autowired
private ConsumptionService consumptionService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/CancelProcessEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCancelProcessListener.java
similarity index 93%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/CancelProcessEventListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCancelProcessListener.java
index 0d71d75..e6cd836 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/CancelProcessEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCancelProcessListener.java
@@ -40,12 +40,12 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
-public class CancelProcessEventListener implements ProcessEventListener {
+public class ConsumptionCancelProcessListener implements ProcessEventListener {
private ConsumptionEntityMapper consumptionEntityMapper;
@Autowired
- public CancelProcessEventListener(ConsumptionEntityMapper consumptionEntityMapper) {
+ public ConsumptionCancelProcessListener(ConsumptionEntityMapper consumptionEntityMapper) {
this.consumptionEntityMapper = consumptionEntityMapper;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/CompleteProcessEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
similarity index 98%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/CompleteProcessEventListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
index 5647ae0..619e5cf 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/CompleteProcessEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
@@ -52,7 +52,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
-public class CompleteProcessEventListener implements ProcessEventListener {
+public class ConsumptionCompleteProcessListener implements ProcessEventListener {
@Autowired
private QueryService queryService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/RejectProcessEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionRejectProcessListener.java
similarity index 92%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/RejectProcessEventListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionRejectProcessListener.java
index 744f160..e02ce91 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/RejectProcessEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionRejectProcessListener.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.workflow.newconsumption.listener;
+import java.util.Date;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
@@ -26,26 +27,19 @@ import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
import org.apache.inlong.manager.workflow.model.WorkflowContext;
-
-import java.util.Date;
-
-import lombok.extern.slf4j.Slf4j;
-
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* Added data consumption process rejection event listener
- *
*/
-@Slf4j
@Component
-public class RejectProcessEventListener implements ProcessEventListener {
+public class ConsumptionRejectProcessListener implements ProcessEventListener {
private ConsumptionEntityMapper consumptionEntityMapper;
@Autowired
- public RejectProcessEventListener(ConsumptionEntityMapper consumptionEntityMapper) {
+ public ConsumptionRejectProcessListener(ConsumptionEntityMapper consumptionEntityMapper) {
this.consumptionEntityMapper = consumptionEntityMapper;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamCompleteProcessListener.java
index ab9beed..e31524b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamCompleteProcessListener.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.workflow.newstream;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.core.DataStreamService;
import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
@@ -38,6 +39,8 @@ public class SingleStreamCompleteProcessListener implements ProcessEventListener
@Autowired
private BusinessService businessService;
+ @Autowired
+ private DataStreamService dataStreamService;
@Override
public ProcessEvent event() {
@@ -45,14 +48,20 @@ public class SingleStreamCompleteProcessListener implements ProcessEventListener
}
/**
- * The creation process ends normally, and the business status is modified to [BIZ_CONFIG_SUCCESS]
+ * The creation process ends normally, modify the status of business and all data stream
+ * belong to the business to [CONFIG_SUCCESSFUL]
*/
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
String bid = form.getBusinessId();
+ String dsid = form.getDataStreamIdentifier();
String username = context.getApplicant();
- businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_SUCCESS.getCode(), username);
+
+ // update business status
+ businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode(), username);
+ // update data stream status
+ dataStreamService.updateStatus(bid, dsid, EntityStatus.DATA_STREAM_CONFIG_SUCCESSFUL.getCode(), username);
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamFailedProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamFailedProcessListener.java
index aba59ee..7305500 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamFailedProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamFailedProcessListener.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.workflow.newstream;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.core.DataStreamService;
import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
@@ -38,6 +39,8 @@ public class SingleStreamFailedProcessListener implements ProcessEventListener {
@Autowired
private BusinessService businessService;
+ @Autowired
+ private DataStreamService dataStreamService;
@Override
public ProcessEvent event() {
@@ -45,14 +48,21 @@ public class SingleStreamFailedProcessListener implements ProcessEventListener {
}
/**
- * An exception occurred in the creation process, and the business status was changed to [BIZ_CONFIG_FAILURE]
+ * The creation process ends abnormally, modify the status of business and all data stream
+ * belong to the business to [CONFIG_FAILED]
*/
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
String bid = form.getBusinessId();
+ String dsid = form.getDataStreamIdentifier();
String username = context.getApplicant();
- businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_FAILURE.getCode(), username);
+
+ // update business status
+ businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_FAILED.getCode(), username);
+ // update data stream status
+ dataStreamService.updateStatus(bid, dsid, EntityStatus.DATA_STREAM_CONFIG_FAILED.getCode(), username);
+
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
index 0ea30d0..1ad90dc 100644
--- a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
+++ b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
@@ -17,13 +17,10 @@
package org.apache.inlong.manager.workflow.core;
-import org.apache.inlong.manager.workflow.exception.WorkflowNoRollbackException;
-import org.apache.inlong.manager.workflow.exception.WorkflowRollbackOnceException;
-
import java.lang.reflect.UndeclaredThrowableException;
-
import lombok.extern.slf4j.Slf4j;
-
+import org.apache.inlong.manager.workflow.exception.WorkflowNoRollbackException;
+import org.apache.inlong.manager.workflow.exception.WorkflowRollbackOnceException;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
@@ -33,7 +30,7 @@ import org.springframework.transaction.support.TransactionCallback;
import org.springframework.util.Assert;
/**
- * Trasaction to Help
+ * Transaction Helper
*/
@Slf4j
public class TransactionHelper {
@@ -47,7 +44,7 @@ public class TransactionHelper {
/**
* Execute in transaction
*
- * @param action Execution logic
+ * @param action Execution logic
* @param propagationBehavior Dissemination mechanism
* @param <T>
* @return result
diff --git a/inlong-manager/pom.xml b/inlong-manager/pom.xml
index 1c25709..dab6e65 100644
--- a/inlong-manager/pom.xml
+++ b/inlong-manager/pom.xml
@@ -242,6 +242,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${hive.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>