You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/08/19 08:14:17 UTC
[incubator-inlong] branch INLONG-570 updated: [INLONG-1444] Fix Web
API multiple field search logic bug (#1445)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch INLONG-570
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-570 by this push:
new 9e66f54 [INLONG-1444] Fix Web API multiple field search logic bug (#1445)
9e66f54 is described below
commit 9e66f5458cd50f1544677d36873618de7d9a75b2
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Aug 19 16:13:38 2021 +0800
[INLONG-1444] Fix Web API multiple field search logic bug (#1445)
---
.../server/master/metamanage/MetaDataManager.java | 30 ++--
.../metastore/BdbMetaStoreServiceImpl.java | 20 ++-
.../metamanage/metastore/MetaStoreService.java | 8 +-
.../metastore/dao/entity/TopicDeployEntity.java | 2 +
.../dao/mapper/GroupConsumeCtrlMapper.java | 7 +-
.../metastore/dao/mapper/TopicDeployMapper.java | 4 +-
.../impl/bdbimpl/BdbBrokerConfigMapperImpl.java | 44 ++++--
.../bdbimpl/BdbGroupConsumeCtrlMapperImpl.java | 116 ++++++++-------
.../impl/bdbimpl/BdbTopicDeployMapperImpl.java | 159 ++++++++++-----------
.../master/web/handler/WebTopicDeployHandler.java | 2 +-
10 files changed, 202 insertions(+), 190 deletions(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
index 61e7927..98878d5 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
@@ -19,6 +19,7 @@ package org.apache.inlong.tubemq.server.master.metamanage;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -1184,7 +1185,9 @@ public class MetaDataManager implements Server {
/**
* Get broker topic entity, if query entity is null, return all topic entity
*
- * @param qryEntity query conditions
+ * @param topicNameSet query by topicNameSet
+ * @param brokerIdSet query by brokerIdSet
+ * @param qryEntity query conditions
* @return topic entity map
*/
public Map<String, List<TopicDeployEntity>> getTopicConfEntityMap(Set<String> topicNameSet,
@@ -1202,11 +1205,14 @@ public class MetaDataManager implements Server {
*
* @return topic entity map
*/
- public Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<Integer> brokerIdSet,
- Set<String> topicNameSet) {
+ public Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
+ Set<Integer> brokerIdSet) {
Map<Integer, BrokerConfEntity> qryBrokerInfoMap =
metaStoreService.getBrokerConfInfo(brokerIdSet, null, null);
- return metaStoreService.getTopicDeployInfoMap(qryBrokerInfoMap.keySet(), topicNameSet);
+ if (qryBrokerInfoMap.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ return metaStoreService.getTopicDeployInfoMap(topicNameSet, qryBrokerInfoMap.keySet());
}
public Map<String, List<TopicDeployEntity>> getTopicConfMapByTopicAndBrokerIds(
@@ -1914,18 +1920,12 @@ public class MetaDataManager implements Server {
StringBuilder sBuffer,
ProcessResult result) {
List<GroupProcessResult> retInfo = new ArrayList<>();
- if ((groupNameSet == null || groupNameSet.isEmpty())
- && (topicNameSet == null || topicNameSet.isEmpty())) {
+ Set<String> rmvRecordSet =
+ metaStoreService.getMatchedKeysByGroupAndTopicSet(groupNameSet, topicNameSet);
+ if (rmvRecordSet == null || rmvRecordSet.isEmpty()) {
return retInfo;
}
- Set<String> rmvRecords = new HashSet<>();
- if (groupNameSet != null && !groupNameSet.isEmpty()) {
- rmvRecords.addAll(metaStoreService.getConsumeCtrlKeyByGroupName(groupNameSet));
- }
- if (topicNameSet != null && !topicNameSet.isEmpty()) {
- rmvRecords.addAll(metaStoreService.getConsumeCtrlKeyByTopicName(topicNameSet));
- }
- for (String recKey : rmvRecords) {
+ for (String recKey : rmvRecordSet) {
Tuple2<String, String> groupTopicTuple =
KeyBuilderUtils.splitRecKey2GroupTopic(recKey);
metaStoreService.delGroupConsumeCtrlConf(operator, recKey, sBuffer, result);
@@ -1978,7 +1978,7 @@ public class MetaDataManager implements Server {
return disTopicSet;
}
for (GroupConsumeCtrlEntity ctrlEntity : qryResult) {
- if (!ctrlEntity.isEnableConsume()) {
+ if (ctrlEntity != null && !ctrlEntity.isEnableConsume()) {
disTopicSet.add(ctrlEntity.getTopicName());
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
index 1c75ddc..756550e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
@@ -565,8 +565,8 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
@Override
public Map<Integer/* brokerId */, List<TopicDeployEntity>> getTopicDeployInfoMap(
- Set<Integer> brokerIdSet, Set<String> topicNameSet) {
- return topicDeployMapper.getTopicDeployInfoMap(brokerIdSet, topicNameSet);
+ Set<String> topicNameSet, Set<Integer> brokerIdSet) {
+ return topicDeployMapper.getTopicDeployInfoMap(topicNameSet, brokerIdSet);
}
@@ -901,16 +901,6 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
}
@Override
- public Set<String> getConsumeCtrlKeyByTopicName(Set<String> topicSet) {
- return groupConsumeCtrlMapper.getConsumeCtrlKeyByTopicName(topicSet);
- }
-
- @Override
- public Set<String> getConsumeCtrlKeyByGroupName(Set<String> groupSet) {
- return groupConsumeCtrlMapper.getConsumeCtrlKeyByGroupName(groupSet);
- }
-
- @Override
public GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(
String groupName, String topicName) {
return groupConsumeCtrlMapper.getConsumeCtrlByGroupAndTopic(groupName, topicName);
@@ -928,6 +918,12 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
}
@Override
+ public Set<String> getMatchedKeysByGroupAndTopicSet(Set<String> groupSet,
+ Set<String> topicSet) {
+ return groupConsumeCtrlMapper.getMatchedRecords(groupSet, topicSet);
+ }
+
+ @Override
public void registerObserver(AliveObserver eventObserver) {
if (eventObserver != null) {
eventObservers.add(eventObserver);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
index 2e12188..052d014 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
@@ -168,7 +168,7 @@ public interface MetaStoreService extends KeepAlive, Server {
Set<String> topicNameSet, Set<Integer> brokerIdSet, TopicDeployEntity qryEntity);
Map<Integer/* brokerId */, List<TopicDeployEntity>> getTopicDeployInfoMap(
- Set<Integer> brokerIdSet, Set<String> topicNameSet);
+ Set<String> topicNameSet, Set<Integer> brokerIdSet);
Map<String/* topicName */, List<TopicDeployEntity>> getTopicDepInfoByTopicBrokerId(
Set<String> topicSet, Set<Integer> brokerIdSet);
@@ -334,9 +334,7 @@ public interface MetaStoreService extends KeepAlive, Server {
List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName);
- Set<String> getConsumeCtrlKeyByTopicName(Set<String> topicSet);
-
- Set<String> getConsumeCtrlKeyByGroupName(Set<String> groupSet);
-
List<GroupConsumeCtrlEntity> getGroupConsumeCtrlConf(GroupConsumeCtrlEntity qryEntity);
+
+ Set<String> getMatchedKeysByGroupAndTopicSet(Set<String> groupSet, Set<String> topicSet);
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
index 4a65743..2d1df26 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
@@ -270,6 +270,8 @@ public class TopicDeployEntity extends BaseEntity implements Cloneable {
}
return (target.getBrokerId() == TBaseConstants.META_VALUE_UNDEFINED
|| target.getBrokerId() == this.brokerId)
+ && (target.getBrokerPort() == TBaseConstants.META_VALUE_UNDEFINED
+ || target.getBrokerPort() == this.brokerPort)
&& (target.getTopicId() == TBaseConstants.META_VALUE_UNDEFINED
|| target.getTopicId() == this.topicNameId)
&& (TStringUtils.isBlank(target.getTopicName())
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
index fd04fd9..52d54a9 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
@@ -20,7 +20,6 @@ package org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.inlong.tubemq.server.common.utils.ProcessResult;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
@@ -48,10 +47,6 @@ public interface GroupConsumeCtrlMapper extends AbstractMapper {
List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName);
- Set<String> getConsumeCtrlKeyByTopicName(Set<String> topicSet);
-
- Set<String> getConsumeCtrlKeyByGroupName(Set<String> groupSet);
-
GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(String groupName, String topicName);
Map<String/* group */, List<GroupConsumeCtrlEntity>> getConsumeCtrlInfoMap(
@@ -59,4 +54,6 @@ public interface GroupConsumeCtrlMapper extends AbstractMapper {
List<GroupConsumeCtrlEntity> getGroupConsumeCtrlConf(GroupConsumeCtrlEntity qryEntity);
+ Set<String> getMatchedRecords(Set<String> groupSet, Set<String> topicSet);
+
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
index 3b69c5d..7896f57 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
@@ -59,8 +59,8 @@ public interface TopicDeployMapper extends AbstractMapper {
Set<Integer> brokerIdSet,
TopicDeployEntity qryEntity);
- Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<Integer> brokerIdSet,
- Set<String> topicNameSet);
+ Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
+ Set<Integer> brokerIdSet);
Map<String/* topicName */, List<TopicDeployEntity>> getTopicConfMapByTopicAndBrokerIds(
Set<String> topicSet, Set<Integer> brokerIdSet);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
index f89c07d..4bc5c5b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
@@ -23,7 +23,6 @@ import com.sleepycat.persist.EntityCursor;
import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.StoreConfig;
-
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -202,30 +201,51 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
/**
* get broker configure info from bdb store
+ *
+ * @param brokerIdSet need matched broker id set
+ * @param brokerIpSet need matched broker ip set
+ * @param qryEntity need matched properties
* @return result, only read
*/
@Override
public Map<Integer, BrokerConfEntity> getBrokerConfInfo(Set<Integer> brokerIdSet,
Set<String> brokerIpSet,
BrokerConfEntity qryEntity) {
- Set<Integer> qryBrokerKeySet = null;
+ Set<Integer> ipHitSet = null;
+ Set<Integer> totalMatchedSet = null;
Map<Integer, BrokerConfEntity> retMap = new HashMap<>();
- if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
- qryBrokerKeySet = new HashSet<>(brokerIdSet);
- }
+ // get records set by brokerIpSet
if (brokerIpSet != null && !brokerIpSet.isEmpty()) {
- if (qryBrokerKeySet == null) {
- qryBrokerKeySet = new HashSet<>();
- }
+ ipHitSet = new HashSet<>();
for (String brokerIp : brokerIpSet) {
Integer brokerId = brokerIpIndexCache.get(brokerIp);
if (brokerId != null) {
- qryBrokerKeySet.add(brokerId);
+ ipHitSet.add(brokerId);
+ }
+ }
+ if (ipHitSet.isEmpty()) {
+ return retMap;
+ }
+ }
+ // get intersection from brokerIdSet and brokerIpSet
+ if (brokerIdSet != null || ipHitSet != null) {
+ if (brokerIdSet == null) {
+ totalMatchedSet = new HashSet<>(ipHitSet);
+ } else {
+ if (ipHitSet == null) {
+ totalMatchedSet = new HashSet<>(brokerIdSet);
+ } else {
+ totalMatchedSet = new HashSet<>();
+ for (Integer record : brokerIdSet) {
+ if (ipHitSet.contains(record)) {
+ totalMatchedSet.add(record);
+ }
+ }
}
}
}
// get broker configures
- if (qryBrokerKeySet == null) {
+ if (totalMatchedSet == null) {
for (BrokerConfEntity entity : brokerConfCache.values()) {
if (entity == null
|| (qryEntity != null && !entity.isMatched(qryEntity))) {
@@ -234,7 +254,7 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
retMap.put(entity.getBrokerId(), entity);
}
} else {
- for (Integer brokerId : qryBrokerKeySet) {
+ for (Integer brokerId : totalMatchedSet) {
BrokerConfEntity entity = brokerConfCache.get(brokerId);
if (entity == null
|| (qryEntity != null && !entity.isMatched(qryEntity))) {
@@ -246,8 +266,6 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
return retMap;
}
-
-
/**
* get broker configure info from bdb store
* @return result, only read
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
index 2791809..87c6fbb 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
import org.apache.inlong.tubemq.corebase.utils.KeyBuilderUtils;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.server.common.exception.LoadMetaException;
import org.apache.inlong.tubemq.server.common.utils.ProcessResult;
import org.apache.inlong.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
@@ -235,6 +236,9 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
GroupConsumeCtrlEntity entity;
List<GroupConsumeCtrlEntity> result = new ArrayList<>();
for (String recordKey : keySet) {
+ if (recordKey == null) {
+ continue;
+ }
entity = grpConsumeCtrlCache.get(recordKey);
if (entity != null) {
result.add(entity);
@@ -262,34 +266,6 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
}
@Override
- public Set<String> getConsumeCtrlKeyByTopicName(Set<String> topicNameSet) {
- ConcurrentHashSet<String> qrySet;
- Set<String> retResult = new HashSet<>();
- for (String topicName : topicNameSet) {
- qrySet = grpConsumeCtrlTopicCache.get(topicName);
- if (qrySet == null || qrySet.isEmpty()) {
- continue;
- }
- retResult.addAll(qrySet);
- }
- return retResult;
- }
-
- @Override
- public Set<String> getConsumeCtrlKeyByGroupName(Set<String> groupNameSet) {
- ConcurrentHashSet<String> qrySet;
- Set<String> retResult = new HashSet<>();
- for (String groupName : groupNameSet) {
- qrySet = grpConsumeCtrlGroupCache.get(groupName);
- if (qrySet == null || qrySet.isEmpty()) {
- continue;
- }
- retResult.addAll(qrySet);
- }
- return retResult;
- }
-
- @Override
public GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(
String groupName, String topicName) {
String recKey = KeyBuilderUtils.buildGroupTopicRecKey(groupName, topicName);
@@ -299,35 +275,13 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
@Override
public Map<String/* group */, List<GroupConsumeCtrlEntity>> getConsumeCtrlInfoMap(
Set<String> groupSet, Set<String> topicSet, GroupConsumeCtrlEntity qryEntry) {
- ConcurrentHashSet<String> recSet;
- Set<String> qryHitRecSet = null;
Map<String, List<GroupConsumeCtrlEntity>> retEntityMap = new HashMap<>();
- // filter group items
- if (groupSet != null && !groupSet.isEmpty()) {
- qryHitRecSet = new HashSet<>();
- for (String group : groupSet) {
- recSet = grpConsumeCtrlGroupCache.get(group);
- if (recSet != null && !recSet.isEmpty()) {
- qryHitRecSet.addAll(recSet);
- }
- }
- }
- // filter topic items
- if (topicSet != null && !topicSet.isEmpty()) {
- if (qryHitRecSet == null) {
- qryHitRecSet = new HashSet<>();
- }
- for (String topic : topicSet) {
- recSet = grpConsumeCtrlTopicCache.get(topic);
- if (recSet != null && !recSet.isEmpty()) {
- qryHitRecSet.addAll(recSet);
- }
- }
- }
+ // filter matched keys by groupSet and topicSet
+ Set<String> totalMatchedSet = getMatchedRecords(groupSet, topicSet);
// get matched records
GroupConsumeCtrlEntity tmpEntity;
List<GroupConsumeCtrlEntity> itemLst;
- if (qryHitRecSet == null) {
+ if (totalMatchedSet == null) {
for (GroupConsumeCtrlEntity entity : grpConsumeCtrlCache.values()) {
if (entity == null || (qryEntry != null && !entity.isMatched(qryEntry))) {
continue;
@@ -337,7 +291,7 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
itemLst.add(entity);
}
} else {
- for (String recKey : qryHitRecSet) {
+ for (String recKey : totalMatchedSet) {
tmpEntity = grpConsumeCtrlCache.get(recKey);
if (tmpEntity == null || (qryEntry != null && !tmpEntity.isMatched(qryEntry))) {
continue;
@@ -365,12 +319,64 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
return retEntitys;
}
+ @Override
+ public Set<String> getMatchedRecords(Set<String> groupSet, Set<String> topicSet) {
+ Set<String> groupKeySet = null;
+ Set<String> topicKeySet = null;
+ Set<String> totalMatchedSet = null;
+ ConcurrentHashSet<String> recSet;
+ // filter group items
+ if (groupSet != null && !groupSet.isEmpty()) {
+ groupKeySet = new HashSet<>();
+ for (String group : groupSet) {
+ recSet = grpConsumeCtrlGroupCache.get(group);
+ if (recSet != null && !recSet.isEmpty()) {
+ groupKeySet.addAll(recSet);
+ }
+ }
+ if (groupKeySet.isEmpty()) {
+ return Collections.emptySet();
+ }
+ }
+ // filter topic items
+ if (topicSet != null && !topicSet.isEmpty()) {
+ topicKeySet = new HashSet<>();
+ for (String topic : topicSet) {
+ recSet = grpConsumeCtrlTopicCache.get(topic);
+ if (recSet != null && !recSet.isEmpty()) {
+ topicKeySet.addAll(recSet);
+ }
+ }
+ if (topicKeySet.isEmpty()) {
+ return Collections.emptySet();
+ }
+ }
+ // get intersection from groupKeySet and topicKeySet
+ if (groupKeySet != null || topicKeySet != null) {
+ if (groupKeySet == null) {
+ totalMatchedSet = new HashSet<>(topicKeySet);
+ } else {
+ if (topicKeySet == null) {
+ totalMatchedSet = new HashSet<>(groupKeySet);
+ } else {
+ totalMatchedSet = new HashSet<>();
+ for (String record : groupKeySet) {
+ if (topicKeySet.contains(record)) {
+ totalMatchedSet.add(record);
+ }
+ }
+ }
+ }
+ }
+ return totalMatchedSet;
+ }
+
/**
* Put Group consume configure info into bdb store
*
* @param memEntity need add record
* @param result process result with old value
- * @return
+ * @return true sucess, false failue
*/
private boolean putGroupConsumeCtrlConfig2Bdb(
GroupConsumeCtrlEntity memEntity, ProcessResult result) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
index 32bb156..2070b97 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
@@ -24,6 +24,7 @@ import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.StoreConfig;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -233,33 +234,11 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
Set<Integer> brokerIdSet,
TopicDeployEntity qryEntity) {
List<TopicDeployEntity> items;
- Set<String> qryTopicKeySet = null;
- ConcurrentHashSet<String> keySet;
Map<String, List<TopicDeployEntity>> retEntityMap = new HashMap<>();
- // get deploy records set by topicName
- if (topicNameSet != null && !topicNameSet.isEmpty()) {
- qryTopicKeySet = new HashSet<>();
- for (String topicName : topicNameSet) {
- keySet = topicNameCacheIndex.get(topicName);
- if (keySet != null && !keySet.isEmpty()) {
- qryTopicKeySet.addAll(keySet);
- }
- }
- }
- // get deploy records set by brokerId
- if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
- if (qryTopicKeySet == null) {
- qryTopicKeySet = new HashSet<>();
- }
- for (Integer brokerId : brokerIdSet) {
- keySet = brokerIdCacheIndex.get(brokerId);
- if (keySet != null && !keySet.isEmpty()) {
- qryTopicKeySet.addAll(keySet);
- }
- }
- }
+ // get matched keys by topicNameSet and brokerIdSet
+ Set<String> matchedKeySet = getMatchedRecords(topicNameSet, brokerIdSet);
// filter record by qryEntity
- if (qryTopicKeySet == null) {
+ if (matchedKeySet == null) {
for (TopicDeployEntity entry : topicConfCache.values()) {
if (entry == null || (qryEntity != null && !entry.isMatched(qryEntity))) {
continue;
@@ -270,7 +249,7 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
}
} else {
TopicDeployEntity entry;
- for (String recKey : qryTopicKeySet) {
+ for (String recKey : matchedKeySet) {
entry = topicConfCache.get(recKey);
if (entry == null || (qryEntity != null && !entry.isMatched(qryEntity))) {
continue;
@@ -285,43 +264,21 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
@Override
public Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(
- Set<Integer> brokerIdSet, Set<String> topicNameSet) {
+ Set<String> topicNameSet, Set<Integer> brokerIdSet) {
List<TopicDeployEntity> items;
- Set<String> qryTopicKey = null;
- ConcurrentHashSet<String> keySet;
Map<Integer, List<TopicDeployEntity>> retEntityMap = new HashMap<>();
if (brokerIdSet != null) {
for (Integer brokerId : brokerIdSet) {
retEntityMap.put(brokerId, new ArrayList<>());
}
}
- if (topicNameSet != null && !topicNameSet.isEmpty()) {
- qryTopicKey = new HashSet<>();
- for (String topicName : topicNameSet) {
- keySet = topicNameCacheIndex.get(topicName);
- if (keySet != null && !keySet.isEmpty()) {
- qryTopicKey.addAll(keySet);
- }
- }
+ // get matched keys by topicNameSet and brokerIdSet
+ Set<String> matchedKeySet = getMatchedRecords(topicNameSet, brokerIdSet);
+ // get record by keys
+ if (matchedKeySet == null) {
+ matchedKeySet = new HashSet<>(topicConfCache.keySet());
}
- if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
- if (qryTopicKey == null) {
- qryTopicKey = new HashSet<>();
- }
- for (Integer brokerId : brokerIdSet) {
- keySet = brokerIdCacheIndex.get(brokerId);
- if (keySet != null && !keySet.isEmpty()) {
- qryTopicKey.addAll(keySet);
- }
- }
- }
- if (qryTopicKey == null) {
- qryTopicKey = new HashSet<>(topicConfCache.keySet());
- }
- if (qryTopicKey.isEmpty()) {
- return retEntityMap;
- }
- for (String recordKey: qryTopicKey) {
+ for (String recordKey: matchedKeySet) {
TopicDeployEntity entity = topicConfCache.get(recordKey);
if (entity == null) {
continue;
@@ -338,43 +295,29 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
Set<String> topicSet, Set<Integer> brokerIdSet) {
TopicDeployEntity tmpEntity;
List<TopicDeployEntity> itemLst;
- ConcurrentHashSet<String> recSet;
- Set<String> hitKeys = new HashSet<>();
Map<String, List<TopicDeployEntity>> retEntityMap = new HashMap<>();
- if (((topicSet == null) || (topicSet.isEmpty()))
- && ((brokerIdSet == null) || (brokerIdSet.isEmpty()))) {
+ // get matched keys by topicNameSet and brokerIdSet
+ Set<String> matchedKeySet = getMatchedRecords(topicSet, brokerIdSet);
+ // get records by matched keys
+ if (matchedKeySet == null) {
for (TopicDeployEntity entity : topicConfCache.values()) {
+ if (entity == null) {
+ continue;
+ }
itemLst = retEntityMap.computeIfAbsent(
entity.getTopicName(), k -> new ArrayList<>());
itemLst.add(entity);
}
- return retEntityMap;
- }
- if ((topicSet == null) || (topicSet.isEmpty())) {
- for (Integer brokerId : brokerIdSet) {
- recSet = brokerIdCacheIndex.get(brokerId);
- if (recSet == null || recSet.isEmpty()) {
- continue;
- }
- hitKeys.addAll(recSet);
- }
} else {
- for (String topic : topicSet) {
- recSet = topicNameCacheIndex.get(topic);
- if (recSet == null || recSet.isEmpty()) {
+ for (String key : matchedKeySet) {
+ tmpEntity = topicConfCache.get(key);
+ if (tmpEntity == null) {
continue;
}
- hitKeys.addAll(recSet);
- }
- }
- for (String key : hitKeys) {
- tmpEntity = topicConfCache.get(key);
- if (tmpEntity == null) {
- continue;
+ itemLst = retEntityMap.computeIfAbsent(
+ tmpEntity.getTopicName(), k -> new ArrayList<>());
+ itemLst.add(tmpEntity);
}
- itemLst = retEntityMap.computeIfAbsent(
- tmpEntity.getTopicName(), k -> new ArrayList<>());
- itemLst.add(tmpEntity);
}
return retEntityMap;
}
@@ -537,6 +480,58 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
}
}
+ private Set<String> getMatchedRecords(Set<String> topicNameSet,
+ Set<Integer> brokerIdSet) {
+ ConcurrentHashSet<String> keySet;
+ Set<String> topicKeySet = null;
+ Set<String> brokerKeySet = null;
+ Set<String> matchedKeySet = null;
+ // get deploy records set by topicName
+ if (topicNameSet != null && !topicNameSet.isEmpty()) {
+ topicKeySet = new HashSet<>();
+ for (String topicName : topicNameSet) {
+ keySet = topicNameCacheIndex.get(topicName);
+ if (keySet != null && !keySet.isEmpty()) {
+ topicKeySet.addAll(keySet);
+ }
+ }
+ if (topicKeySet.isEmpty()) {
+ return Collections.emptySet();
+ }
+ }
+ // get deploy records set by brokerId
+ if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
+ brokerKeySet = new HashSet<>();
+ for (Integer brokerId : brokerIdSet) {
+ keySet = brokerIdCacheIndex.get(brokerId);
+ if (keySet != null && !keySet.isEmpty()) {
+ brokerKeySet.addAll(keySet);
+ }
+ }
+ if (brokerKeySet.isEmpty()) {
+ return Collections.emptySet();
+ }
+ }
+ // get intersection from topicKeySet and brokerKeySet
+ if (topicKeySet != null || brokerKeySet != null) {
+ if (topicKeySet == null) {
+ matchedKeySet = new HashSet<>(brokerKeySet);
+ } else {
+ if (brokerKeySet == null) {
+ matchedKeySet = new HashSet<>(topicKeySet);
+ } else {
+ matchedKeySet = new HashSet<>();
+ for (String record : topicKeySet) {
+ if (brokerKeySet.contains(record)) {
+ matchedKeySet.add(record);
+ }
+ }
+ }
+ }
+ }
+ return matchedKeySet;
+ }
+
private void addOrUpdCacheRecord(TopicDeployEntity entity) {
topicConfCache.put(entity.getRecordKey(), entity);
// add topic index map
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index a9d8205..7f1ab75 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -264,7 +264,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
}
Set<Integer> brokerIdSet = (Set<Integer>) result.getRetData();
Map<Integer, List<TopicDeployEntity>> queryResult =
- metaDataManager.getTopicDeployInfoMap(brokerIdSet, topicNameSet);
+ metaDataManager.getTopicDeployInfoMap(topicNameSet, brokerIdSet);
// build query result
int dataCount = 0;
int totalStoreNum = 0;