You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/02 08:54:13 UTC
[incubator-inlong] branch master updated: [INLONG-4486][TubeMQ] Adjust the parameter requirements of group consume delete APIs (#4495)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 102df00e7 [INLONG-4486][TubeMQ] Adjust the parameter requirements of group consume delete APIs (#4495)
102df00e7 is described below
commit 102df00e75ff040b944847aefcdd7f91b4b64000
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Jun 2 16:54:08 2022 +0800
[INLONG-4486][TubeMQ] Adjust the parameter requirements of group consume delete APIs (#4495)
---
.../master/metamanage/DefaultMetaDataService.java | 23 +-
.../server/master/metamanage/MetaDataService.java | 34 ++-
.../metastore/dao/mapper/ConsumeCtrlMapper.java | 4 +
.../metastore/dao/mapper/MetaConfigMapper.java | 26 ++-
.../metastore/impl/AbsConsumeCtrlMapperImpl.java | 62 +++++
.../metastore/impl/AbsMetaConfigMapperImpl.java | 21 +-
.../web/handler/WebAdminGroupCtrlHandler.java | 251 +++++++++++++++++----
.../web/handler/WebGroupConsumeCtrlHandler.java | 38 +++-
8 files changed, 399 insertions(+), 60 deletions(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
index 4347f77bc..f2c707280 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
@@ -29,7 +29,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-
import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
import org.apache.inlong.tubemq.corebase.TokenConstants;
@@ -1065,16 +1064,38 @@ public class DefaultMetaDataService implements MetaDataService {
return result.isSuccess();
}
+ @Override
+ public GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(String groupName,
+ String topicName) {
+ return metaConfigMapper.getConsumeCtrlByGroupAndTopic(groupName, topicName);
+ }
+
@Override
public List<GroupConsumeCtrlEntity> getConsumeCtrlByTopic(String topicName) {
return metaConfigMapper.getConsumeCtrlByTopic(topicName);
}
+ @Override
+ public Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByTopic(Set<String> topicSet) {
+ return metaConfigMapper.getConsumeCtrlByTopic(topicSet);
+ }
+
@Override
public Set<String> getDisableTopicByGroupName(String groupName) {
return metaConfigMapper.getDisableTopicByGroupName(groupName);
}
+ @Override
+ public List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName) {
+ return metaConfigMapper.getConsumeCtrlByGroupName(groupName);
+ }
+
+ @Override
+ public Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByGroupName(
+ Set<String> groupSet) {
+ return metaConfigMapper.getConsumeCtrlByGroupName(groupSet);
+ }
+
@Override
public Map<String, List<GroupConsumeCtrlEntity>> getGroupConsumeCtrlConf(
Set<String> groupSet, Set<String> topicSet, GroupConsumeCtrlEntity qryEntry) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
index a38bda3ce..e6e382a5b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
@@ -21,7 +21,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
-
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.server.Server;
import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
@@ -686,6 +685,15 @@ public interface MetaDataService extends Server {
String groupName, String topicName,
StringBuilder strBuff, ProcessResult result);
+ /**
+ * Get consume control records by group name
+ *
+ * @param groupName the queried group name
+ * @param topicName the queried topic name
+ * @return the consume control record list
+ */
+ GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(String groupName, String topicName);
+
/**
* Get all group consume control record for a specific topic
*
@@ -694,6 +702,14 @@ public interface MetaDataService extends Server {
*/
List<GroupConsumeCtrlEntity> getConsumeCtrlByTopic(String topicName);
+ /**
+ * Get all group consume control records for the specific topic set
+ *
+ * @param topicSet the queried topic name set
+ * @return group consume control list
+ */
+ Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByTopic(Set<String> topicSet);
+
/**
* Get all disable consumed topic for a specific group
*
@@ -702,6 +718,22 @@ public interface MetaDataService extends Server {
*/
Set<String> getDisableTopicByGroupName(String groupName);
+ /**
+ * Get consume control records by group name
+ *
+ * @param groupName the queried group name
+ * @return the consume control record list
+ */
+ List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName);
+
+ /**
+ * Get consume control records by group name set
+ *
+ * @param groupSet the queried group name set
+ * @return the consume control record map
+ */
+ Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByGroupName(Set<String> groupSet);
+
/**
* Get group consume control configure for topic & group set
*
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ConsumeCtrlMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ConsumeCtrlMapper.java
index 971e0885a..299d02b58 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ConsumeCtrlMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ConsumeCtrlMapper.java
@@ -44,8 +44,12 @@ public interface ConsumeCtrlMapper extends AbstractMapper {
List<GroupConsumeCtrlEntity> getConsumeCtrlByTopicName(String topicName);
+ Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByTopicName(Set<String> topicSet);
+
List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName);
+ Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByGroupName(Set<String> groupSet);
+
GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(String groupName, String topicName);
Map<String/* group */, List<GroupConsumeCtrlEntity>> getConsumeCtrlInfoMap(
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
index 0b3b4cc83..0ce5456b7 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
@@ -159,7 +159,7 @@ public interface MetaConfigMapper extends KeepAliveService {
* Add or Update topic control configure info
*
* @param isAddOp whether add operation
- * @param entity the topic control info entity will be add
+ * @param entity the topic control info entity will be added
* @param strBuff the print info string buffer
* @param result the process result return
* @return true if success otherwise false
@@ -517,6 +517,14 @@ public interface MetaConfigMapper extends KeepAliveService {
*/
List<GroupConsumeCtrlEntity> getConsumeCtrlByTopic(String topicName);
+ /**
+ * Get all group consume control record for the specific topic set
+ *
+ * @param topicSet the queried topic name set
+ * @return group consume control list
+ */
+ Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByTopic(Set<String> topicSet);
+
/**
* Get consume control record
*
@@ -534,6 +542,22 @@ public interface MetaConfigMapper extends KeepAliveService {
*/
Set<String> getDisableTopicByGroupName(String groupName);
+ /**
+ * Get consume control records by group name
+ *
+ * @param groupName the queried group name
+ * @return the consume control record list
+ */
+ List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName);
+
+ /**
+ * Get consume control records by group name set
+ *
+ * @param groupSet the queried group name set
+ * @return the consume control record list
+ */
+ Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByGroupName(Set<String> groupSet);
+
/**
* Get group consume control configure for topic & group set
*
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
index dc8703b82..1dff57566 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
@@ -188,6 +188,37 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
return result;
}
+ @Override
+ public Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByTopicName(
+ Set<String> topicSet) {
+ if (topicSet == null || topicSet.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ GroupConsumeCtrlEntity entity;
+ ConcurrentHashSet<String> keySet;
+ List<GroupConsumeCtrlEntity> itemRet;
+ Map<String, List<GroupConsumeCtrlEntity>> result = new HashMap<>();
+ for (String topicName : topicSet) {
+ keySet = topic2RecordCache.get(topicName);
+ if (keySet == null || keySet.isEmpty()) {
+ result.put(topicName, Collections.emptyList());
+ continue;
+ }
+ itemRet = new ArrayList<>();
+ for (String recordKey : keySet) {
+ if (recordKey == null) {
+ continue;
+ }
+ entity = consumeCtrlCache.get(recordKey);
+ if (entity != null) {
+ itemRet.add(entity);
+ }
+ }
+ result.put(topicName, itemRet);
+ }
+ return result;
+ }
+
@Override
public List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName) {
ConcurrentHashSet<String> keySet =
@@ -206,6 +237,37 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
return result;
}
+ @Override
+ public Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByGroupName(
+ Set<String> groupSet) {
+ if (groupSet == null || groupSet.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ GroupConsumeCtrlEntity entity;
+ ConcurrentHashSet<String> keySet;
+ List<GroupConsumeCtrlEntity> itemRet;
+ Map<String, List<GroupConsumeCtrlEntity>> result = new HashMap<>();
+ for (String groupName : groupSet) {
+ keySet = group2RecordCache.get(groupName);
+ if (keySet == null || keySet.isEmpty()) {
+ result.put(groupName, Collections.emptyList());
+ continue;
+ }
+ itemRet = new ArrayList<>();
+ for (String recordKey : keySet) {
+ if (recordKey == null) {
+ continue;
+ }
+ entity = consumeCtrlCache.get(recordKey);
+ if (entity != null) {
+ itemRet.add(entity);
+ }
+ }
+ result.put(groupName, itemRet);
+ }
+ return result;
+ }
+
@Override
public GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(
String groupName, String topicName) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
index 90e14b39b..d2d39a033 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
@@ -26,12 +26,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.commons.codec.binary.StringUtils;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.server.common.TServerConstants;
import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.inlong.tubemq.server.common.statusdef.EnableStatus;
import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
import org.apache.inlong.tubemq.server.common.statusdef.TopicStatus;
import org.apache.inlong.tubemq.server.common.utils.RowLock;
@@ -1230,6 +1230,11 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
return consumeCtrlMapper.getConsumeCtrlByTopicName(topicName);
}
+ @Override
+ public Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByTopic(Set<String> topicSet) {
+ return consumeCtrlMapper.getConsumeCtrlByTopicName(topicSet);
+ }
+
@Override
public GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(String groupName, String topicName) {
return consumeCtrlMapper.getConsumeCtrlByGroupAndTopic(groupName, topicName);
@@ -1244,13 +1249,25 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
return disTopicSet;
}
for (GroupConsumeCtrlEntity ctrlEntity : qryResult) {
- if (ctrlEntity != null && !ctrlEntity.isEnableConsume()) {
+ if (ctrlEntity != null
+ && ctrlEntity.getConsumeEnable() == EnableStatus.STATUS_DISABLE) {
disTopicSet.add(ctrlEntity.getTopicName());
}
}
return disTopicSet;
}
+ @Override
+ public List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName) {
+ return consumeCtrlMapper.getConsumeCtrlByGroupName(groupName);
+ }
+
+ @Override
+ public Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByGroupName(
+ Set<String> groupSet) {
+ return consumeCtrlMapper.getConsumeCtrlByGroupName(groupSet);
+ }
+
@Override
public Map<String, List<GroupConsumeCtrlEntity>> getGroupConsumeCtrlConf(
Set<String> groupSet, Set<String> topicSet, GroupConsumeCtrlEntity qryEntry) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
index 6187f9af9..e67688c3b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
@@ -27,8 +27,10 @@ import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.server.common.TServerConstants;
import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.inlong.tubemq.server.common.statusdef.EnableStatus;
import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
import org.apache.inlong.tubemq.server.master.TMaster;
+import org.apache.inlong.tubemq.server.master.metamanage.DataOpErrCode;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
@@ -392,11 +394,21 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
}
Set<String> topicNameSet = (Set<String>) result.getRetData();
// add black list records
+ GroupConsumeCtrlEntity ctrlEntity;
List<GroupProcessResult> retInfoList = new ArrayList<>();
for (String groupName : groupNameSet) {
for (String topicName : topicNameSet) {
+ ctrlEntity = defMetaDataService.getConsumeCtrlByGroupAndTopic(
+ groupName, topicName);
+ if (ctrlEntity != null
+ && ctrlEntity.getConsumeEnable() == EnableStatus.STATUS_DISABLE) {
+ result.setFailResult(DataOpErrCode.DERR_SUCCESS.getCode(), "Ok!");
+ retInfoList.add(new GroupProcessResult(groupName, topicName, result));
+ continue;
+ }
retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity, groupName,
- topicName, Boolean.FALSE, "Old API Set", null, null, sBuffer, result));
+ topicName, Boolean.FALSE, "Old API add blacklist, disable consume",
+ null, null, sBuffer, result));
}
}
return buildRetInfo(retInfoList, sBuffer);
@@ -420,15 +432,26 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
}
BaseEntity opEntity = (BaseEntity) result.getRetData();
// check and get groupNameJsonSet info
- if (!getGroupCsmJsonSetInfo(req, opEntity, Boolean.FALSE, sBuffer, result)) {
+ if (!getGroupCsmJsonSetInfo(req, opEntity, Boolean.FALSE,
+ "Old API batch set BlackList", sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
return sBuffer;
}
Map<String, GroupConsumeCtrlEntity> addRecordMap =
(Map<String, GroupConsumeCtrlEntity>) result.getRetData();
// add or update and build result
+ GroupConsumeCtrlEntity ctrlEntity;
List<GroupProcessResult> retInfoList = new ArrayList<>();
for (GroupConsumeCtrlEntity entry : addRecordMap.values()) {
+ ctrlEntity = defMetaDataService.getConsumeCtrlByGroupAndTopic(
+ entry.getGroupName(), entry.getTopicName());
+ if (ctrlEntity != null
+ && ctrlEntity.getConsumeEnable() == EnableStatus.STATUS_DISABLE) {
+ result.setFailResult(DataOpErrCode.DERR_SUCCESS.getCode(), "Ok!");
+ retInfoList.add(new GroupProcessResult(entry.getGroupName(),
+ entry.getTopicName(), result));
+ continue;
+ }
retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(entry, sBuffer, result));
}
return buildRetInfo(retInfoList, sBuffer);
@@ -451,27 +474,58 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
return sBuffer;
}
BaseEntity opEntity = (BaseEntity) result.getRetData();
- // get group list
+ // check and get topicName field
if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
+ WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
return sBuffer;
}
- Set<String> groupNameSet = (Set<String>) result.getRetData();
- // check and get topicName field
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
+ // get group list
if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
+ WebFieldDef.COMPSGROUPNAME, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.getRetData();
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
// add allowed consume records
List<GroupProcessResult> retInfoList = new ArrayList<>();
- for (String groupName : groupNameSet) {
- for (String topicName : topicNameSet) {
- retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity, groupName,
- topicName, Boolean.TRUE, "enable consume", null, null, sBuffer, result));
-
+ if (groupNameSet.isEmpty()) {
+ Map<String, List<GroupConsumeCtrlEntity>> topicConsumeCtrlMap =
+ defMetaDataService.getConsumeCtrlByGroupName(topicNameSet);
+ for (Map.Entry<String, List<GroupConsumeCtrlEntity>> entry :
+ topicConsumeCtrlMap.entrySet()) {
+ if (!entry.getValue().isEmpty()) {
+ for (GroupConsumeCtrlEntity ctrlEntity : entry.getValue()) {
+ if (ctrlEntity != null
+ && ctrlEntity.getConsumeEnable() != EnableStatus.STATUS_ENABLE) {
+ defMetaDataService.insertConsumeCtrlInfo(opEntity,
+ ctrlEntity.getGroupName(), ctrlEntity.getTopicName(),
+ Boolean.TRUE, "Old API delete blacklist, enable consume",
+ null, null, sBuffer, result);
+ }
+ }
+ }
+ result.setFullInfo(true, DataOpErrCode.DERR_SUCCESS.getCode(), "Ok");
+ retInfoList.add(new GroupProcessResult("", entry.getKey(), result));
+ }
+ } else {
+ GroupConsumeCtrlEntity ctrlEntity;
+ for (String groupName : groupNameSet) {
+ for (String topicName : topicNameSet) {
+ ctrlEntity = defMetaDataService.getConsumeCtrlByGroupAndTopic(
+ groupName, topicName);
+ if (ctrlEntity != null
+ && ctrlEntity.getConsumeEnable() != EnableStatus.STATUS_ENABLE) {
+ retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity,
+ groupName, topicName, Boolean.TRUE,
+ "Old API delete blacklist, enable consume",
+ null, null, sBuffer, result));
+ } else {
+ result.setFullInfo(true, DataOpErrCode.DERR_SUCCESS.getCode(), "Ok");
+ retInfoList.add(new GroupProcessResult(groupName, topicName, result));
+ }
+ }
}
}
return buildRetInfo(retInfoList, sBuffer);
@@ -510,10 +564,21 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
Set<String> topicNameSet = (Set<String>) result.getRetData();
List<GroupProcessResult> retInfoList = new ArrayList<>();
// add allowed consume records
+ GroupConsumeCtrlEntity ctrlEntity;
for (String groupName : groupNameSet) {
for (String topicName : topicNameSet) {
+ ctrlEntity = defMetaDataService.getConsumeCtrlByGroupAndTopic(
+ groupName, topicName);
+ if (ctrlEntity != null) {
+ if (ctrlEntity.getConsumeEnable() == EnableStatus.STATUS_ENABLE) {
+ result.setFailResult(DataOpErrCode.DERR_SUCCESS.getCode(), "Ok!");
+ retInfoList.add(new GroupProcessResult(groupName, topicName, result));
+ continue;
+ }
+ }
retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity, groupName,
- topicName, Boolean.TRUE, "enable consume", null, null, sBuffer, result));
+ topicName, Boolean.TRUE, "Old API add, enable consume",
+ null, null, sBuffer, result));
}
}
return buildRetInfo(retInfoList, sBuffer);
@@ -537,15 +602,27 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
}
BaseEntity opEntity = (BaseEntity) result.getRetData();
// check and get groupNameJsonSet info
- if (!getGroupCsmJsonSetInfo(req, opEntity, Boolean.TRUE, sBuffer, result)) {
+ if (!getGroupCsmJsonSetInfo(req, opEntity, Boolean.TRUE,
+ "Old API batch set Enable Consume", sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
return sBuffer;
}
Map<String, GroupConsumeCtrlEntity> addRecordMap =
(Map<String, GroupConsumeCtrlEntity>) result.getRetData();
// add or update and build result
+ GroupConsumeCtrlEntity ctrlEntity;
List<GroupProcessResult> retInfoList = new ArrayList<>();
for (GroupConsumeCtrlEntity entry : addRecordMap.values()) {
+ ctrlEntity = defMetaDataService.getConsumeCtrlByGroupAndTopic(
+ entry.getGroupName(), entry.getTopicName());
+ if (ctrlEntity != null) {
+ if (ctrlEntity.getConsumeEnable() == EnableStatus.STATUS_ENABLE) {
+ result.setFailResult(DataOpErrCode.DERR_SUCCESS.getCode(), "Ok!");
+ retInfoList.add(new GroupProcessResult(entry.getGroupName(),
+ entry.getTopicName(), result));
+ continue;
+ }
+ }
retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(entry, sBuffer, result));
}
return buildRetInfo(retInfoList, sBuffer);
@@ -568,26 +645,57 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
return sBuffer;
}
BaseEntity opEntity = (BaseEntity) result.getRetData();
- // get group list
+ // check and get topicName field
if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
+ WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
return sBuffer;
}
- Set<String> groupNameSet = (Set<String>) result.getRetData();
- // check and get topicName field
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
+ // get group list
if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
+ WebFieldDef.COMPSGROUPNAME, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.getRetData();
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
List<GroupProcessResult> retInfoList = new ArrayList<>();
- for (String groupName : groupNameSet) {
- for (String topicName : topicNameSet) {
- defMetaDataService.delConsumeCtrlConf(opEntity.getModifyUser(),
- groupName, topicName, sBuffer, result);
- retInfoList.add(new GroupProcessResult(groupName, topicName, result));
+ if (groupNameSet.isEmpty()) {
+ Map<String, List<GroupConsumeCtrlEntity>> topicConsumeCtrlMap =
+ defMetaDataService.getConsumeCtrlByGroupName(topicNameSet);
+ for (Map.Entry<String, List<GroupConsumeCtrlEntity>> entry :
+ topicConsumeCtrlMap.entrySet()) {
+ if (!entry.getValue().isEmpty()) {
+ for (GroupConsumeCtrlEntity ctrlEntity : entry.getValue()) {
+ if (ctrlEntity != null
+ && ctrlEntity.getConsumeEnable() == EnableStatus.STATUS_ENABLE) {
+ defMetaDataService.insertConsumeCtrlInfo(opEntity,
+ ctrlEntity.getGroupName(), ctrlEntity.getTopicName(),
+ Boolean.FALSE, "Old API delete, disable consume",
+ null, null, sBuffer, result);
+ }
+ }
+ }
+ result.setFullInfo(true, DataOpErrCode.DERR_SUCCESS.getCode(), "Ok");
+ retInfoList.add(new GroupProcessResult("", entry.getKey(), result));
+ }
+ } else {
+ GroupConsumeCtrlEntity ctrlEntity;
+ for (String groupName : groupNameSet) {
+ for (String topicName : topicNameSet) {
+ ctrlEntity = defMetaDataService.getConsumeCtrlByGroupAndTopic(
+ groupName, topicName);
+ if (ctrlEntity != null
+ && ctrlEntity.getConsumeEnable() == EnableStatus.STATUS_ENABLE) {
+ retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity,
+ groupName, topicName, Boolean.FALSE,
+ "Old API delete, disable consume",
+ null, null, sBuffer, result));
+ } else {
+ result.setFullInfo(true, DataOpErrCode.DERR_SUCCESS.getCode(), "Ok");
+ retInfoList.add(new GroupProcessResult(groupName, topicName, result));
+ }
+ }
}
}
return buildRetInfo(retInfoList, sBuffer);
@@ -666,26 +774,60 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
return sBuffer;
}
BaseEntity opEntity = (BaseEntity) result.getRetData();
- // get group list
+ // check and get topicName field
if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
+ WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
return sBuffer;
}
- Set<String> groupNameSet = (Set<String>) result.getRetData();
- // check and get topicName field
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
+ // get group list
if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
+ WebFieldDef.COMPSGROUPNAME, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.getRetData();
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
List<GroupProcessResult> retInfoList = new ArrayList<>();
- for (String groupName : groupNameSet) {
- for (String topicName : topicNameSet) {
- retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity,
- groupName, topicName, Boolean.TRUE, "enable consume",
- false, TServerConstants.BLANK_FILTER_ITEM_STR, sBuffer, result));
+ if (groupNameSet.isEmpty()) {
+ Map<String, List<GroupConsumeCtrlEntity>> topicConsumeCtrlMap =
+ defMetaDataService.getConsumeCtrlByGroupName(topicNameSet);
+ for (Map.Entry<String, List<GroupConsumeCtrlEntity>> entry :
+ topicConsumeCtrlMap.entrySet()) {
+ if (entry.getValue().isEmpty()) {
+ result.setFullInfo(true, DataOpErrCode.DERR_SUCCESS.getCode(), "Ok");
+ retInfoList.add(new GroupProcessResult("", entry.getKey(), result));
+ continue;
+ }
+ for (GroupConsumeCtrlEntity ctrlEntity : entry.getValue()) {
+ if (ctrlEntity != null
+ && ctrlEntity.getFilterEnable() == EnableStatus.STATUS_ENABLE) {
+ defMetaDataService.insertConsumeCtrlInfo(opEntity,
+ ctrlEntity.getGroupName(), ctrlEntity.getTopicName(), null,
+ "Old API delete, disable filter", false,
+ TServerConstants.BLANK_FILTER_ITEM_STR, sBuffer, result);
+ }
+ }
+ result.setFullInfo(true, DataOpErrCode.DERR_SUCCESS.getCode(), "Ok");
+ retInfoList.add(new GroupProcessResult("", entry.getKey(), result));
+ }
+ } else {
+ GroupConsumeCtrlEntity ctrlEntity;
+ for (String groupName : groupNameSet) {
+ for (String topicName : topicNameSet) {
+ ctrlEntity = defMetaDataService.getConsumeCtrlByGroupAndTopic(
+ groupName, topicName);
+ if (ctrlEntity != null
+ && ctrlEntity.getFilterEnable() == EnableStatus.STATUS_ENABLE) {
+ retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity,
+ groupName, topicName, null,
+ "Old API delete, disable filter", false,
+ TServerConstants.BLANK_FILTER_ITEM_STR, sBuffer, result));
+ } else {
+ result.setFullInfo(true, DataOpErrCode.DERR_SUCCESS.getCode(), "Ok");
+ retInfoList.add(new GroupProcessResult(groupName, topicName, result));
+ }
+ }
}
}
return buildRetInfo(retInfoList, sBuffer);
@@ -971,13 +1113,22 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
return sBuffer;
}
String filterCondStr = (String) result.getRetData();
+ // add or modify filter consume records
+ GroupConsumeCtrlEntity ctrlEntity;
List<GroupProcessResult> retInfoList = new ArrayList<>();
- // modify filter consume records
for (String groupName : groupNameSet) {
for (String topicName : topicNameSet) {
- retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity,
- groupName, topicName, Boolean.TRUE, "enable consume",
- filterEnable, filterCondStr, sBuffer, result));
+ ctrlEntity =
+ defMetaDataService.getConsumeCtrlByGroupAndTopic(groupName, topicName);
+ if (ctrlEntity == null) {
+ retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity, groupName,
+ topicName, Boolean.TRUE, "Old API set filter conditions",
+ filterEnable, filterCondStr, sBuffer, result));
+ } else {
+ retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity, groupName,
+ topicName, null, "Old API set filter conditions",
+ filterEnable, filterCondStr, sBuffer, result));
+ }
}
}
return buildRetInfo(retInfoList, sBuffer);
@@ -1010,8 +1161,14 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
Map<String, GroupConsumeCtrlEntity> addRecordMap =
(Map<String, GroupConsumeCtrlEntity>) result.getRetData();
// add or update and build result
+ GroupConsumeCtrlEntity curEntity;
List<GroupProcessResult> retInfoList = new ArrayList<>();
for (GroupConsumeCtrlEntity entry : addRecordMap.values()) {
+ curEntity = defMetaDataService.getConsumeCtrlByGroupAndTopic(
+ entry.getGroupName(), entry.getTopicName());
+ if (curEntity == null) {
+ entry.setConsumeEnable(true);
+ }
retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(entry, sBuffer, result));
}
return buildRetInfo(retInfoList, sBuffer);
@@ -1071,8 +1228,8 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
itemEntity =
new GroupConsumeCtrlEntity(itemOpEntity, groupName, topicName);
itemEntity.updModifyInfo(itemOpEntity.getDataVerId(),
- true, "enable consume", filterEnable, filterCondStr);
- addRecordMap.put(itemEntity.getGroupName(), itemEntity);
+ null, null, filterEnable, filterCondStr);
+ addRecordMap.put(itemEntity.getRecordKey(), itemEntity);
}
// check result
if (addRecordMap.isEmpty()) {
@@ -1148,8 +1305,8 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
}
private boolean getGroupCsmJsonSetInfo(HttpServletRequest req, BaseEntity defOpEntity,
- Boolean enableCsm, StringBuilder sBuffer,
- ProcessResult result) {
+ Boolean enableCsm, String opReason,
+ StringBuilder sBuffer, ProcessResult result) {
if (!WebParameterUtils.getJsonArrayParamValue(req,
WebFieldDef.GROUPJSONSET, true, null, result)) {
return result.isSuccess();
@@ -1189,8 +1346,8 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
itemEntity =
new GroupConsumeCtrlEntity(itemOpEntity, groupName, topicName);
itemEntity.updModifyInfo(itemOpEntity.getDataVerId(),
- enableCsm, "Old API batch set", null, null);
- addRecordMap.put(itemEntity.getGroupName(), itemEntity);
+ enableCsm, opReason, null, null);
+ addRecordMap.put(itemEntity.getRecordKey(), itemEntity);
}
// check result
if (addRecordMap.isEmpty()) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
index cffc25460..5432697c4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
@@ -27,6 +27,7 @@ import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
import org.apache.inlong.tubemq.server.master.TMaster;
+import org.apache.inlong.tubemq.server.master.metamanage.DataOpErrCode;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
@@ -92,7 +93,7 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
return sBuffer;
}
- Boolean consumeEnable = (Boolean) result.getRetData();
+ final Boolean consumeEnable = (Boolean) result.getRetData();
// get filterEnable info
if (!WebParameterUtils.getBooleanParamValue(req,
WebFieldDef.FILTERENABLE, false, null, sBuffer, result)) {
@@ -222,14 +223,36 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
Set<String> topicNameSet = (Set<String>) result.getRetData();
// execute delete operation
List<GroupProcessResult> retInfo = new ArrayList<>();
- for (String groupName : groupNameSet) {
- for (String topicName : topicNameSet) {
- defMetaDataService.delConsumeCtrlConf(opEntity.getModifyUser(),
- groupName, topicName, sBuffer, result);
- retInfo.add(new GroupProcessResult(groupName, topicName, result));
+ if (topicNameSet.isEmpty()) {
+ Map<String, List<GroupConsumeCtrlEntity>> groupCtrlConsumeMap =
+ defMetaDataService.getConsumeCtrlByGroupName(groupNameSet);
+ for (Map.Entry<String, List<GroupConsumeCtrlEntity>> entry :
+ groupCtrlConsumeMap.entrySet()) {
+ if (entry.getValue().isEmpty()) {
+ result.setFullInfo(true,
+ DataOpErrCode.DERR_SUCCESS.getCode(), "Ok!");
+ retInfo.add(new GroupProcessResult(entry.getKey(), "", result));
+ continue;
+ }
+ for (GroupConsumeCtrlEntity ctrlEntity : entry.getValue()) {
+ if (ctrlEntity != null) {
+ defMetaDataService.delConsumeCtrlConf(opEntity.getModifyUser(),
+ ctrlEntity.getGroupName(), ctrlEntity.getTopicName(), sBuffer, result);
+ }
+ }
+ result.setFullInfo(true,
+ DataOpErrCode.DERR_SUCCESS.getCode(), "Ok!");
+ retInfo.add(new GroupProcessResult(entry.getKey(), "", result));
+ }
+ } else {
+ for (String groupName : groupNameSet) {
+ for (String topicName : topicNameSet) {
+ defMetaDataService.delConsumeCtrlConf(opEntity.getModifyUser(),
+ groupName, topicName, sBuffer, result);
+ retInfo.add(new GroupProcessResult(groupName, topicName, result));
+ }
}
}
-
buildRetInfo(retInfo, sBuffer);
return sBuffer;
}
@@ -430,5 +453,4 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
result.setSuccResult(addRecordMap);
return result.isSuccess();
}
-
}