You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/03/25 01:38:33 UTC
[incubator-tubemq] branch TUBEMQ-570 updated: [TUBEMQ-581] Add data
cache in BDB metadata Mapper implementations
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
new 51d8138 [TUBEMQ-581] Add data cache in BDB metadata Mapper implementations
51d8138 is described below
commit 51d81383365359559a0dfde8fe8b869b2ba742b1
Author: gosonzhang <go...@tencent.com>
AuthorDate: Tue Mar 23 20:37:48 2021 +0800
[TUBEMQ-581] Add data cache in BDB metadata Mapper implementations
---
.../server/common/statusdef/TopicStatus.java | 2 +-
.../metastore/DataOpErrCode.java} | 23 ++-
.../master/metastore/dao/entity/BaseEntity.java | 46 ++++-
.../metastore/dao/entity/BrokerConfEntity.java | 74 +++++++
.../metastore/dao/entity/ClusterSettingEntity.java | 32 +++
.../metastore/dao/entity/GroupBlackListEntity.java | 46 +++++
.../metastore/dao/entity/GroupConfigEntity.java | 74 +++++++
.../dao/entity/GroupFilterCtrlEntity.java | 54 +++++
.../metastore/dao/entity/TopicConfEntity.java | 56 +++++-
.../metastore/dao/entity/TopicCtrlEntity.java | 60 ++++++
.../metastore/dao/entity/TopicPropGroup.java | 68 ++++++-
.../metastore/dao/mapper/AbstractMapper.java | 3 +-
.../metastore/dao/mapper/BrokerConfigMapper.java | 10 +-
.../metastore/dao/mapper/ClusterConfigMapper.java | 8 +-
.../metastore/dao/mapper/GroupBlackListMapper.java | 18 +-
.../metastore/dao/mapper/GroupConfigMapper.java | 11 +-
.../dao/mapper/GroupFilterCtrlMapper.java | 11 +-
.../metastore/dao/mapper/TopicConfigMapper.java | 10 +-
.../metastore/dao/mapper/TopicCtrlMapper.java | 13 +-
.../impl/bdbimpl/BdbBrokerConfigMapperImpl.java | 117 +++++++++--
.../impl/bdbimpl/BdbClusterConfigMapperImpl.java | 87 ++++++--
.../impl/bdbimpl/BdbGroupBlackListMapperImpl.java | 223 +++++++++++++++++++--
.../impl/bdbimpl/BdbGroupConfigMapperImpl.java | 107 ++++++++--
.../impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java | 183 +++++++++++++++--
.../impl/bdbimpl/BdbTopicConfigMapperImpl.java | 164 +++++++++++++--
.../impl/bdbimpl/BdbTopicCtrlMapperImpl.java | 106 ++++++++--
.../AliveObserver.java} | 11 +-
.../KeepAlive.java} | 20 +-
28 files changed, 1488 insertions(+), 149 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java
index f54ab7e..13384a4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java
@@ -18,7 +18,7 @@
package org.apache.tubemq.server.common.statusdef;
public enum TopicStatus {
-
+ STATUS_TOPIC_UNDEFINED(-2, "Undefined."),
STATUS_TOPIC_OK(0, "Normal"),
STATUS_TOPIC_SOFT_DELETE(1, "Soft deleted"),
STATUS_TOPIC_SOFT_REMOVE(2, "Soft removed"),
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/DataOpErrCode.java
similarity index 66%
copy from tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java
copy to tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/DataOpErrCode.java
index f54ab7e..7f1d6c3 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/DataOpErrCode.java
@@ -15,20 +15,23 @@
* limitations under the License.
*/
-package org.apache.tubemq.server.common.statusdef;
+package org.apache.tubemq.server.master.metastore;
-public enum TopicStatus {
- STATUS_TOPIC_OK(0, "Normal"),
- STATUS_TOPIC_SOFT_DELETE(1, "Soft deleted"),
- STATUS_TOPIC_SOFT_REMOVE(2, "Soft removed"),
- STATUS_TOPIC_HARD_REMOVE(3, "Hard removed");
+public enum DataOpErrCode {
+ DERR_SUCCESS(200, "Success."),
+ DERR_NOT_EXIST(401, "Record not exist."),
+ DERR_EXISTED(402, "Record has existed."),
+ DERR_UNCHANGED(403, "Record not changed."),
+ DERR_STORE_ABNORMAL(501, "Store layer throw exception."),
+
+ STATUS_DISABLE(0, "Disable.");
private int code;
private String description;
- TopicStatus(int code, String description) {
+ DataOpErrCode(int code, String description) {
this.code = code;
this.description = description;
}
@@ -41,13 +44,13 @@ public enum TopicStatus {
return description;
}
- public static TopicStatus valueOf(int code) {
- for (TopicStatus status : TopicStatus.values()) {
+ public static DataOpErrCode valueOf(int code) {
+ for (DataOpErrCode status : DataOpErrCode.values()) {
if (status.getCode() == code) {
return status;
}
}
- throw new IllegalArgumentException(String.format("unknown topic status code %s", code));
+ throw new IllegalArgumentException(String.format("unknown data operate error code %s", code));
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BaseEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BaseEntity.java
index 2ffebe4..9aacb03 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BaseEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BaseEntity.java
@@ -20,11 +20,12 @@ package org.apache.tubemq.server.master.metastore.dao.entity;
import com.google.gson.Gson;
import java.io.Serializable;
import java.util.Date;
+import java.util.Objects;
+import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.TServerConstants;
-
// AbstractEntity: entity's abstract class
public class BaseEntity implements Serializable {
@@ -131,4 +132,47 @@ public class BaseEntity implements Serializable {
return gson.toJson(this);
}
+ /**
+ * Check whether the specified query item value matches
+ * Allowed query items:
+ * dataVersionId, createUser, modifyUser
+ * @return true: matched, false: not match
+ */
+ public boolean isMatched(BaseEntity target) {
+ if (target == null) {
+ return true;
+ }
+ if ((target.getDataVersionId() != TBaseConstants.META_VALUE_UNDEFINED
+ && this.getDataVersionId() != target.getDataVersionId())
+ || (TStringUtils.isNotBlank(target.getCreateUser())
+ && !target.getCreateUser().equals(createUser))
+ || (TStringUtils.isNotBlank(target.getModifyUser())
+ && !target.getModifyUser().equals(modifyUser))) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof BaseEntity)) {
+ return false;
+ }
+ BaseEntity that = (BaseEntity) o;
+ return dataVersionId == that.dataVersionId &&
+ Objects.equals(createUser, that.createUser) &&
+ Objects.equals(createDate, that.createDate) &&
+ Objects.equals(modifyUser, that.modifyUser) &&
+ Objects.equals(modifyDate, that.modifyDate) &&
+ Objects.equals(attributes, that.attributes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dataVersionId, createUser,
+ createDate, modifyUser, modifyDate, attributes);
+ }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BrokerConfEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BrokerConfEntity.java
index 3a0feb0..49ae565 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BrokerConfEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BrokerConfEntity.java
@@ -18,8 +18,10 @@
package org.apache.tubemq.server.master.metastore.dao.entity;
import java.util.Date;
+import java.util.Objects;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.statusdef.ManageStatus;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
@@ -264,4 +266,76 @@ public class BrokerConfEntity extends BaseEntity {
.append(TokenConstants.ATTR_SEP).append(brokerTLSPort).toString();
}
+ /**
+ * Check whether the specified query item value matches
+ * Allowed query items:
+ * brokerId, brokerIp, brokerPort, brokerTLSPort, regionId, groupId
+ * manageStatus, brokerWebPort
+ * @return true: matched, false: not match
+ */
+ public boolean isMatched(BrokerConfEntity target) {
+ if (target == null) {
+ return true;
+ }
+ if (!super.isMatched(target)) {
+ return false;
+ }
+ if ((target.getBrokerId() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getBrokerId() != this.brokerId)
+ || (TStringUtils.isNotBlank(target.getBrokerIp())
+ && !target.getBrokerIp().equals(this.brokerIp))
+ || (target.getBrokerPort() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getBrokerPort() != this.brokerPort)
+ || (target.getBrokerTLSPort() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getBrokerTLSPort() != this.brokerTLSPort)
+ || (target.getRegionId() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getRegionId() != this.regionId)
+ || (target.getGroupId() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getGroupId() != this.groupId)
+ || (target.getManageStatus() != ManageStatus.STATUS_MANAGE_UNDEFINED
+ && target.getManageStatus() != this.manageStatus)
+ || (target.getBrokerWebPort() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getBrokerWebPort() != this.brokerWebPort)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof BrokerConfEntity)) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ BrokerConfEntity entity = (BrokerConfEntity) o;
+ return brokerId == entity.brokerId &&
+ brokerPort == entity.brokerPort &&
+ brokerTLSPort == entity.brokerTLSPort &&
+ brokerWebPort == entity.brokerWebPort &&
+ isConfDataUpdated == entity.isConfDataUpdated &&
+ isBrokerLoaded == entity.isBrokerLoaded &&
+ regionId == entity.regionId &&
+ groupId == entity.groupId &&
+ Objects.equals(brokerIp, entity.brokerIp) &&
+ manageStatus == entity.manageStatus &&
+ Objects.equals(topicProps, entity.topicProps) &&
+ Objects.equals(brokerAddress, entity.brokerAddress) &&
+ Objects.equals(brokerFullInfo, entity.brokerFullInfo) &&
+ Objects.equals(brokerSimpleInfo, entity.brokerSimpleInfo) &&
+ Objects.equals(brokerTLSSimpleInfo, entity.brokerTLSSimpleInfo) &&
+ Objects.equals(brokerTLSFullInfo, entity.brokerTLSFullInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), brokerId, brokerIp, brokerPort, brokerTLSPort,
+ brokerWebPort, manageStatus, isConfDataUpdated, isBrokerLoaded, regionId,
+ groupId, topicProps, brokerAddress, brokerFullInfo, brokerSimpleInfo,
+ brokerTLSSimpleInfo, brokerTLSFullInfo);
+ }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/ClusterSettingEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/ClusterSettingEntity.java
index 3f54faa..a64f350 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/ClusterSettingEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/ClusterSettingEntity.java
@@ -17,12 +17,14 @@
package org.apache.tubemq.server.master.metastore.dao.entity;
+import java.util.Objects;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.server.common.statusdef.EnableStatus;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
import org.apache.tubemq.server.master.metastore.TStoreConstants;
+
/*
* store the cluster default setting
*
@@ -179,4 +181,34 @@ public class ClusterSettingEntity extends BaseEntity {
this.clsDefTopicProps = clsDefTopicProps;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ClusterSettingEntity)) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ ClusterSettingEntity entity = (ClusterSettingEntity) o;
+ return brokerPort == entity.brokerPort &&
+ brokerTLSPort == entity.brokerTLSPort &&
+ brokerWebPort == entity.brokerWebPort &&
+ maxMsgSizeInB == entity.maxMsgSizeInB &&
+ qryPriorityId == entity.qryPriorityId &&
+ gloFlowCtrlRuleCnt == entity.gloFlowCtrlRuleCnt &&
+ recordKey.equals(entity.recordKey) &&
+ Objects.equals(clsDefTopicProps, entity.clsDefTopicProps) &&
+ gloFlowCtrlStatus == entity.gloFlowCtrlStatus &&
+ Objects.equals(gloFlowCtrlRuleInfo, entity.gloFlowCtrlRuleInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), recordKey, brokerPort, brokerTLSPort,
+ brokerWebPort, clsDefTopicProps, maxMsgSizeInB, qryPriorityId,
+ gloFlowCtrlStatus, gloFlowCtrlRuleCnt, gloFlowCtrlRuleInfo);
+ }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupBlackListEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupBlackListEntity.java
index b0ebd8c..3b680bf 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupBlackListEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupBlackListEntity.java
@@ -18,8 +18,10 @@
package org.apache.tubemq.server.master.metastore.dao.entity;
import java.util.Date;
+import java.util.Objects;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
@@ -87,4 +89,48 @@ public class GroupBlackListEntity extends BaseEntity {
return reason;
}
+ /**
+ * Check whether the specified query item value matches
+ * Allowed query items:
+ * groupName, topicName
+ * @return true: matched, false: not match
+ */
+ public boolean isMatched(GroupBlackListEntity target) {
+ if (target == null) {
+ return true;
+ }
+ if (!super.isMatched(target)) {
+ return false;
+ }
+ if ((TStringUtils.isNotBlank(target.getTopicName())
+ && !target.getTopicName().equals(this.topicName))
+ || (TStringUtils.isNotBlank(target.getGroupName())
+ && !target.getGroupName().equals(this.groupName))) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof GroupBlackListEntity)) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ GroupBlackListEntity that = (GroupBlackListEntity) o;
+ return Objects.equals(recordKey, that.recordKey) &&
+ Objects.equals(topicName, that.topicName) &&
+ Objects.equals(groupName, that.groupName) &&
+ Objects.equals(reason, that.reason);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), recordKey, topicName, groupName, reason);
+ }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupConfigEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupConfigEntity.java
index e24b511..f335ace 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupConfigEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupConfigEntity.java
@@ -18,7 +18,9 @@
package org.apache.tubemq.server.master.metastore.dao.entity;
import java.util.Date;
+import java.util.Objects;
import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.statusdef.EnableStatus;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntity;
@@ -149,4 +151,76 @@ public class GroupConfigEntity extends BaseEntity {
this.allowedBrokerClientRate = allowedBrokerClientRate;
}
+ public EnableStatus getResCheckStatus() {
+ return resCheckStatus;
+ }
+
+ public void setResCheckStatus(EnableStatus resCheckStatus) {
+ this.resCheckStatus = resCheckStatus;
+ }
+
+ public EnableStatus getFlowCtrlStatus() {
+ return flowCtrlStatus;
+ }
+
+ public void setFlowCtrlStatus(EnableStatus flowCtrlStatus) {
+ this.flowCtrlStatus = flowCtrlStatus;
+ }
+
+ /**
+ * Check whether the specified query item value matches
+ * Allowed query items:
+ * groupName, qryPriorityId, resCheckStatus,
+ * flowCtrlStatus, allowedBrokerClientRate
+ * @return true: matched, false: not match
+ */
+ public boolean isMatched(GroupConfigEntity target) {
+ if (target == null) {
+ return true;
+ }
+ if (!super.isMatched(target)) {
+ return false;
+ }
+ if ((target.getQryPriorityId() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getQryPriorityId() != this.qryPriorityId)
+ || (TStringUtils.isNotBlank(target.getGroupName())
+ && !target.getGroupName().equals(this.groupName))
+ || (target.getResCheckStatus() != EnableStatus.STATUS_UNDEFINE
+ && target.getResCheckStatus() != this.resCheckStatus)
+ || (target.getFlowCtrlStatus() != EnableStatus.STATUS_UNDEFINE
+ && target.getFlowCtrlStatus() != this.flowCtrlStatus)
+ || (target.getAllowedBrokerClientRate() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getAllowedBrokerClientRate() != this.allowedBrokerClientRate)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof GroupConfigEntity)) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ GroupConfigEntity that = (GroupConfigEntity) o;
+ return allowedBrokerClientRate == that.allowedBrokerClientRate &&
+ qryPriorityId == that.qryPriorityId &&
+ ruleCnt == that.ruleCnt &&
+ groupName.equals(that.groupName) &&
+ resCheckStatus == that.resCheckStatus &&
+ flowCtrlStatus == that.flowCtrlStatus &&
+ Objects.equals(flowCtrlInfo, that.flowCtrlInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), groupName, resCheckStatus,
+ allowedBrokerClientRate, qryPriorityId, flowCtrlStatus,
+ ruleCnt, flowCtrlInfo);
+ }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java
index f81c7c4..ab18c8f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java
@@ -18,8 +18,10 @@
package org.apache.tubemq.server.master.metastore.dao.entity;
import java.util.Date;
+import java.util.Objects;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.statusdef.EnableStatus;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
@@ -125,4 +127,56 @@ public class GroupFilterCtrlEntity extends BaseEntity {
this.filterCondStr = filterCondStr;
}
+ public EnableStatus getFilterConsumeStatus() {
+ return filterConsumeStatus;
+ }
+
+ /**
+ * Check whether the specified query item value matches
+ * Allowed query items:
+ * topicName, groupName, filterConsumeStatus
+ * @return true: matched, false: not match
+ */
+ public boolean isMatched(GroupFilterCtrlEntity target) {
+ if (target == null) {
+ return true;
+ }
+ if (!super.isMatched(target)) {
+ return false;
+ }
+ if ((TStringUtils.isNotBlank(target.getTopicName())
+ && !target.getTopicName().equals(this.topicName))
+ || (TStringUtils.isNotBlank(target.getGroupName())
+ && !target.getGroupName().equals(this.groupName))
+ || (target.getFilterConsumeStatus() != EnableStatus.STATUS_UNDEFINE
+ && target.getFilterConsumeStatus() != this.filterConsumeStatus)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof GroupFilterCtrlEntity)) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ GroupFilterCtrlEntity that = (GroupFilterCtrlEntity) o;
+ return recordKey.equals(that.recordKey) &&
+ Objects.equals(topicName, that.topicName) &&
+ Objects.equals(groupName, that.groupName) &&
+ filterConsumeStatus == that.filterConsumeStatus &&
+ Objects.equals(filterCondStr, that.filterCondStr);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), recordKey,
+ topicName, groupName, filterConsumeStatus, filterCondStr);
+ }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicConfEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicConfEntity.java
index 52cfc61..b090329 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicConfEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicConfEntity.java
@@ -18,8 +18,10 @@
package org.apache.tubemq.server.master.metastore.dao.entity;
import java.util.Date;
+import java.util.Objects;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.statusdef.TopicStatus;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
@@ -35,7 +37,7 @@ public class TopicConfEntity extends BaseEntity {
private int brokerId = TBaseConstants.META_VALUE_UNDEFINED;
// topic id, require globally unique
private int topicId = TBaseConstants.META_VALUE_UNDEFINED;
- private TopicStatus deployStatus = TopicStatus.STATUS_TOPIC_OK; // topic status
+ private TopicStatus deployStatus = TopicStatus.STATUS_TOPIC_UNDEFINED; // topic status
private TopicPropGroup topicProps = null;
@@ -140,4 +142,56 @@ public class TopicConfEntity extends BaseEntity {
public boolean isValidTopicStatus() {
return this.deployStatus == TopicStatus.STATUS_TOPIC_OK;
}
+
+ /**
+ * Check whether the specified query item value matches
+ * Allowed query items:
+ * brokerId, topicId, topicName, topicStatus
+ * @return true: matched, false: not match
+ */
+ public boolean isMatched(TopicConfEntity target) {
+ if (target == null) {
+ return true;
+ }
+ if (!super.isMatched(target)) {
+ return false;
+ }
+ if ((target.getBrokerId() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getBrokerId() != this.brokerId)
+ || (target.getTopicId() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getTopicId() != this.topicId)
+ || (TStringUtils.isNotBlank(target.getTopicName())
+ && !target.getTopicName().equals(this.topicName))
+ || (target.getTopicStatus() != TopicStatus.STATUS_TOPIC_UNDEFINED
+ && target.getTopicStatus() != this.deployStatus)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof TopicConfEntity)) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ TopicConfEntity that = (TopicConfEntity) o;
+ return brokerId == that.brokerId &&
+ topicId == that.topicId &&
+ Objects.equals(recordKey, that.recordKey) &&
+ Objects.equals(topicName, that.topicName) &&
+ deployStatus == that.deployStatus &&
+ Objects.equals(topicProps, that.topicProps);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), recordKey,
+ topicName, brokerId, topicId, deployStatus, topicProps);
+ }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicCtrlEntity.java
index 406da37..582672a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicCtrlEntity.java
@@ -18,7 +18,9 @@
package org.apache.tubemq.server.master.metastore.dao.entity;
import java.util.Date;
+import java.util.Objects;
import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.statusdef.EnableStatus;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
@@ -92,4 +94,62 @@ public class TopicCtrlEntity extends BaseEntity {
this.authCtrlStatus = EnableStatus.STATUS_DISABLE;
}
}
+
+ public EnableStatus getAuthCtrlStatus() {
+ return authCtrlStatus;
+ }
+
+ public int getMaxMsgSizeInB() {
+ return maxMsgSizeInB;
+ }
+
+ public void setMaxMsgSizeInB(int maxMsgSizeInB) {
+ this.maxMsgSizeInB = maxMsgSizeInB;
+ }
+
+ /**
+ * Check whether the specified query item value matches
+ * Allowed query items:
+ * topicName, maxMsgSizeInB, authCtrlStatus
+ * @return true: matched, false: not match
+ */
+ public boolean isMatched(TopicCtrlEntity target) {
+ if (target == null) {
+ return true;
+ }
+ if (!super.isMatched(target)) {
+ return false;
+ }
+ if ((target.getMaxMsgSizeInB() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getMaxMsgSizeInB() != this.maxMsgSizeInB)
+ || (TStringUtils.isNotBlank(target.getTopicName())
+ && !target.getTopicName().equals(this.topicName))
+ || (target.getAuthCtrlStatus() != EnableStatus.STATUS_UNDEFINE
+ && target.getAuthCtrlStatus() != this.authCtrlStatus)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof TopicCtrlEntity)) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ TopicCtrlEntity that = (TopicCtrlEntity) o;
+ return maxMsgSizeInB == that.maxMsgSizeInB &&
+ topicName.equals(that.topicName) &&
+ authCtrlStatus == that.authCtrlStatus;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), topicName, authCtrlStatus, maxMsgSizeInB);
+ }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicPropGroup.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicPropGroup.java
index 2567f96..6c2e088 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicPropGroup.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicPropGroup.java
@@ -18,8 +18,9 @@
package org.apache.tubemq.server.master.metastore.dao.entity;
import java.io.Serializable;
+import java.util.Objects;
import org.apache.tubemq.corebase.TBaseConstants;
-
+import org.apache.tubemq.corebase.utils.TStringUtils;
/*
@@ -167,4 +168,69 @@ public class TopicPropGroup implements Serializable {
public int getDataStoreType() {
return dataStoreType;
}
+
+ /**
+ * Check whether the specified query item value matches
+ * Allowed query items:
+ * numTopicStores, numPartitions, unflushThreshold, unflushInterval, unflushDataHold,
+ * memCacheMsgSizeInMB, memCacheMsgCntInK, memCacheFlushIntvl, deletePolicy
+ * @return true: matched, false: not match
+ */
+ public boolean isMatched(TopicPropGroup target) {
+ if (target == null) {
+ return true;
+ }
+ if ((target.getNumTopicStores() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getNumTopicStores() != this.numTopicStores)
+ || (target.getNumPartitions() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getNumPartitions() != this.numPartitions)
+ || (target.getUnflushThreshold() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getUnflushThreshold() != this.unflushThreshold)
+ || (target.getUnflushInterval() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getUnflushInterval() != this.unflushInterval)
+ || (target.getUnflushDataHold() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getUnflushDataHold() != this.unflushDataHold)
+ || (target.getMemCacheMsgSizeInMB() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getMemCacheMsgSizeInMB() != this.memCacheMsgSizeInMB)
+ || (target.getMemCacheMsgCntInK() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getMemCacheMsgCntInK() != this.memCacheMsgCntInK)
+ || (target.getMemCacheFlushIntvl() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getMemCacheFlushIntvl() != this.memCacheFlushIntvl)
+ || (TStringUtils.isNotBlank(target.getDeletePolicy())
+ && !target.getDeletePolicy().equals(this.deletePolicy))) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof TopicPropGroup)) {
+ return false;
+ }
+ TopicPropGroup that = (TopicPropGroup) o;
+ return numTopicStores == that.numTopicStores &&
+ numPartitions == that.numPartitions &&
+ unflushThreshold == that.unflushThreshold &&
+ unflushInterval == that.unflushInterval &&
+ unflushDataHold == that.unflushDataHold &&
+ memCacheMsgSizeInMB == that.memCacheMsgSizeInMB &&
+ memCacheMsgCntInK == that.memCacheMsgCntInK &&
+ memCacheFlushIntvl == that.memCacheFlushIntvl &&
+ acceptPublish == that.acceptPublish &&
+ acceptSubscribe == that.acceptSubscribe &&
+ dataStoreType == that.dataStoreType &&
+ Objects.equals(deletePolicy, that.deletePolicy) &&
+ Objects.equals(dataPath, that.dataPath);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(numTopicStores, numPartitions, unflushThreshold, unflushInterval,
+ unflushDataHold, memCacheMsgSizeInMB, memCacheMsgCntInK, memCacheFlushIntvl,
+ acceptPublish, acceptSubscribe, deletePolicy, dataStoreType, dataPath);
+ }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/AbstractMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/AbstractMapper.java
index 7deadef..dbed1c9 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/AbstractMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/AbstractMapper.java
@@ -18,14 +18,13 @@
package org.apache.tubemq.server.master.metastore.dao.mapper;
import org.apache.tubemq.server.common.exception.LoadMetaException;
-import org.apache.tubemq.server.common.utils.ProcessResult;
public interface AbstractMapper {
void close();
- void loadConfig(ProcessResult result) throws LoadMetaException;
+ void loadConfig() throws LoadMetaException;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/BrokerConfigMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/BrokerConfigMapper.java
index ec5bd46..7155298 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/BrokerConfigMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/BrokerConfigMapper.java
@@ -17,15 +17,21 @@
package org.apache.tubemq.server.master.metastore.dao.mapper;
+import java.util.Map;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.metastore.dao.entity.BrokerConfEntity;
+
public interface BrokerConfigMapper extends AbstractMapper {
- boolean putBrokerConfig(BrokerConfEntity memEntity, ProcessResult result);
+ boolean addBrokerConf(BrokerConfEntity memEntity, ProcessResult result);
+
+ boolean updBrokerConf(BrokerConfEntity memEntity, ProcessResult result);
- boolean delBrokerConfig(int brokerId);
+ boolean delBrokerConf(int brokerId);
+ Map<Integer, BrokerConfEntity> getBrokerConfByBrokerId(BrokerConfEntity qryEntity);
+ BrokerConfEntity getBrokerConfByBrokerId(int brokerId);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
index 6fcb5e4..e9b7827 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
@@ -23,7 +23,11 @@ import org.apache.tubemq.server.master.metastore.dao.entity.ClusterSettingEntity
public interface ClusterConfigMapper extends AbstractMapper {
- boolean putClusterConfig(ClusterSettingEntity memEntity, ProcessResult result);
+ boolean addClusterConfig(ClusterSettingEntity memEntity, ProcessResult result);
- boolean delClusterConfig(String key);
+ boolean updClusterConfig(ClusterSettingEntity memEntity, ProcessResult result);
+
+ ClusterSettingEntity getClusterConfig();
+
+ boolean delClusterConfig();
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
index e9f49c2..645f139 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
@@ -17,15 +17,29 @@
package org.apache.tubemq.server.master.metastore.dao.mapper;
+import java.util.List;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.metastore.dao.entity.GroupBlackListEntity;
+
+
public interface GroupBlackListMapper extends AbstractMapper {
- boolean putGroupBlackListConfig(GroupBlackListEntity memEntity, ProcessResult result);
+ // about group blacklist api
+ boolean addGroupBlackListConf(GroupBlackListEntity entity, ProcessResult result);
+
+ boolean updGroupBlackListConf(GroupBlackListEntity entity, ProcessResult result);
+
+ boolean delGroupBlackListConf(String recordKey);
+
+ boolean delGroupBlackListConfByGroupName(String groupName);
+
+ List<GroupBlackListEntity> getGrpBlkLstConfByGroupName(String groupName);
+
+ List<GroupBlackListEntity> getGrpBlkLstConfByTopicName(String topicName);
- boolean delGroupBlackListConfig(String key);
+ List<GroupBlackListEntity> getGroupBlackListConf(GroupBlackListEntity qryEntity);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java
index e01775c..123c39f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java
@@ -17,6 +17,7 @@
package org.apache.tubemq.server.master.metastore.dao.mapper;
+import java.util.Map;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.metastore.dao.entity.GroupConfigEntity;
@@ -24,9 +25,15 @@ import org.apache.tubemq.server.master.metastore.dao.entity.GroupConfigEntity;
public interface GroupConfigMapper extends AbstractMapper {
- boolean putGroupConfigConfig(GroupConfigEntity memEntity, ProcessResult result);
+ boolean addGroupConf(GroupConfigEntity entity, ProcessResult result);
- boolean delGroupConfigConfig(String key);
+ boolean updGroupConf(GroupConfigEntity entity, ProcessResult result);
+
+ boolean delGroupConf(String groupName);
+
+ GroupConfigEntity getGroupConf(String groupName);
+
+ Map<String, GroupConfigEntity> getGroupConf(GroupConfigEntity qryEntity);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java
index 1925b41..a155f7a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java
@@ -17,15 +17,22 @@
package org.apache.tubemq.server.master.metastore.dao.mapper;
+import java.util.List;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.metastore.dao.entity.GroupFilterCtrlEntity;
+
public interface GroupFilterCtrlMapper extends AbstractMapper {
- boolean putGroupFilterCtrlConfig(GroupFilterCtrlEntity memEntity, ProcessResult result);
+ boolean addGroupFilterCtrlConf(GroupFilterCtrlEntity entity, ProcessResult result);
+
+ boolean updGroupFilterCtrlConf(GroupFilterCtrlEntity entity, ProcessResult result);
+
+ boolean delGroupFilterCtrlConf(String recordKey);
- boolean delGroupFilterCtrlConfig(String key);
+ List<GroupFilterCtrlEntity> getGroupFilterCtrlConf(String groupName);
+ List<GroupFilterCtrlEntity> getGroupFilterCtrlConf(GroupFilterCtrlEntity qryEntity);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
index fda4f6c..25a127c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
@@ -17,15 +17,21 @@
package org.apache.tubemq.server.master.metastore.dao.mapper;
+import java.util.List;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.metastore.dao.entity.TopicConfEntity;
+
public interface TopicConfigMapper extends AbstractMapper {
- boolean putTopicConfig(TopicConfEntity memEntity, ProcessResult result);
+ boolean addTopicConf(TopicConfEntity entity, ProcessResult result);
+
+ boolean updTopicConf(TopicConfEntity entity, ProcessResult result);
+
+ boolean delTopicConf(String recordKey);
- boolean delTopicConfig(String key);
+ List<TopicConfEntity> getTopicConf(TopicConfEntity qryEntity);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
index c42ea45..5b66618 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
@@ -17,16 +17,25 @@
package org.apache.tubemq.server.master.metastore.dao.mapper;
+import java.util.List;
import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.metastore.dao.entity.TopicConfEntity;
import org.apache.tubemq.server.master.metastore.dao.entity.TopicCtrlEntity;
+
public interface TopicCtrlMapper extends AbstractMapper {
- boolean putTopicCtrlConfig(TopicCtrlEntity memEntity, ProcessResult result);
+ boolean addTopicCtrlConf(TopicCtrlEntity entity, ProcessResult result);
+
+ boolean updTopicCtrlConf(TopicCtrlEntity entity, ProcessResult result);
+
+ boolean delTopicCtrlConf(String recordKey);
+
+ TopicCtrlEntity getTopicCtrlConf(String topicName);
- boolean delTopicCtrlConfig(String key);
+ List<TopicCtrlEntity> getTopicCtrlConf(TopicConfEntity qryEntity);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
index 8d1c693..5c11f4e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
@@ -25,10 +25,12 @@ import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.StoreConfig;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.server.common.exception.LoadMetaException;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
+import org.apache.tubemq.server.master.metastore.DataOpErrCode;
import org.apache.tubemq.server.master.metastore.dao.entity.BrokerConfEntity;
import org.apache.tubemq.server.master.metastore.dao.mapper.BrokerConfigMapper;
import org.slf4j.Logger;
@@ -41,11 +43,11 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
private static final Logger logger =
LoggerFactory.getLogger(BdbBrokerConfigMapperImpl.class);
-
// broker config store
private EntityStore brokerConfStore;
private PrimaryIndex<Integer/* brokerId */, BdbBrokerConfEntity> brokerConfIndex;
-
+ private ConcurrentHashMap<Integer/* brokerId */, BrokerConfEntity> metaDataCache =
+ new ConcurrentHashMap<>();
public BdbBrokerConfigMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
brokerConfStore = new EntityStore(repEnv,
@@ -56,6 +58,7 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
@Override
public void close() {
+ metaDataCache.clear();
if (brokerConfStore != null) {
try {
brokerConfStore.close();
@@ -67,12 +70,12 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
}
@Override
- public void loadConfig(ProcessResult result) throws LoadMetaException {
+ public void loadConfig() throws LoadMetaException {
long count = 0L;
- Map<Integer, BrokerConfEntity> metaDataMap = new HashMap<>();
EntityCursor<BdbBrokerConfEntity> cursor = null;
logger.info("[BDB Impl] load broker configure start...");
try {
+ metaDataCache.clear();
cursor = brokerConfIndex.entities();
for (BdbBrokerConfEntity bdbEntity : cursor) {
if (bdbEntity == null) {
@@ -81,11 +84,10 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
}
BrokerConfEntity memEntity =
new BrokerConfEntity(bdbEntity);
- metaDataMap.put(memEntity.getBrokerId(), memEntity);
+ metaDataCache.put(memEntity.getBrokerId(), memEntity);
count++;
}
logger.info("[BDB Impl] total broker configure records are {}", count);
- result.setSuccResult(metaDataMap);
} catch (Exception e) {
logger.error("[BDB Impl] load broker configure failure ", e);
throw new LoadMetaException(e.getMessage());
@@ -97,6 +99,96 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
logger.info("[BDB Impl] load broker configure successfully...");
}
+ @Override
+ public boolean addBrokerConf(BrokerConfEntity memEntity, ProcessResult result) {
+ BrokerConfEntity curEntity =
+ metaDataCache.get(memEntity.getBrokerId());
+ if (curEntity != null) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The broker ").append(memEntity.getBrokerIp())
+ .append("'s configure already exists, please delete it first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (putBrokerConfig2Bdb(memEntity, result)) {
+ metaDataCache.put(memEntity.getBrokerId(), memEntity);
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean updBrokerConf(BrokerConfEntity memEntity, ProcessResult result) {
+ BrokerConfEntity curEntity =
+ metaDataCache.get(memEntity.getBrokerId());
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The broker ").append(memEntity.getBrokerIp())
+ .append("'s configure is not exists, please add record first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (curEntity.equals(memEntity)) {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The broker ").append(memEntity.getBrokerIp())
+ .append("'s configure have not changed, please delete it first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (putBrokerConfig2Bdb(memEntity, result)) {
+ metaDataCache.put(memEntity.getBrokerId(), memEntity);
+ }
+ return result.isSuccess();
+ }
+
+ /**
+ * delete broker configure info from bdb store
+ * @return
+ */
+ @Override
+ public boolean delBrokerConf(int brokerId) {
+ BrokerConfEntity curEntity =
+ metaDataCache.get(brokerId);
+ if (curEntity == null) {
+ return true;
+ }
+ delBrokerConfigFromBdb(brokerId);
+ metaDataCache.remove(brokerId);
+ return true;
+ }
+
+ /**
+ * get broker configure info from bdb store
+ * @return result, only read
+ */
+ @Override
+ public Map<Integer, BrokerConfEntity> getBrokerConfByBrokerId(BrokerConfEntity qryEntity) {
+ Map<Integer, BrokerConfEntity> retMap = new HashMap<>();
+ if (qryEntity == null) {
+ for (BrokerConfEntity entity : metaDataCache.values()) {
+ retMap.put(entity.getBrokerId(), entity);
+ }
+ } else {
+ for (BrokerConfEntity entity : metaDataCache.values()) {
+ if (entity.isMatched(qryEntity)) {
+ retMap.put(entity.getBrokerId(), entity);
+ }
+ }
+ }
+ return retMap;
+ }
+
+ /**
+ * get broker configure info from bdb store
+ * @return result, only read
+ */
+ @Override
+ public BrokerConfEntity getBrokerConfByBrokerId(int brokerId) {
+ return metaDataCache.get(brokerId);
+ }
+
/**
* Put cluster setting info into bdb store
*
@@ -104,8 +196,7 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
* @param result process result with old value
* @return
*/
- @Override
- public boolean putBrokerConfig(BrokerConfEntity memEntity, ProcessResult result) {
+ private boolean putBrokerConfig2Bdb(BrokerConfEntity memEntity, ProcessResult result) {
BdbBrokerConfEntity retData = null;
BdbBrokerConfEntity bdbEntity =
memEntity.buildBdbBrokerConfEntity();
@@ -113,17 +204,17 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
retData = brokerConfIndex.put(bdbEntity);
} catch (Throwable e) {
logger.error("[BDB Impl] put broker configure failure ", e);
- result.setFailResult(new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
- .append("Put broker configure failure: ")
- .append(e.getMessage()).toString());
+ result.setFailResult(DataOpErrCode.DERR_STORE_ABNORMAL.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("Put broker configure failure: ")
+ .append(e.getMessage()).toString());
return result.isSuccess();
}
result.setSuccResult(retData == null);
return result.isSuccess();
}
- @Override
- public boolean delBrokerConfig(int brokerId) {
+ private boolean delBrokerConfigFromBdb(int brokerId) {
try {
brokerConfIndex.delete(brokerId);
} catch (Throwable e) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
index b9fb67e..07413fd 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
@@ -22,12 +22,14 @@ import com.sleepycat.persist.EntityCursor;
import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.StoreConfig;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.server.common.exception.LoadMetaException;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
+import org.apache.tubemq.server.master.metastore.DataOpErrCode;
+import org.apache.tubemq.server.master.metastore.TStoreConstants;
import org.apache.tubemq.server.master.metastore.dao.entity.ClusterSettingEntity;
import org.apache.tubemq.server.master.metastore.dao.mapper.ClusterConfigMapper;
import org.slf4j.Logger;
@@ -41,7 +43,7 @@ public class BdbClusterConfigMapperImpl implements ClusterConfigMapper {
private EntityStore clsDefSettingStore;
private PrimaryIndex<String, BdbClusterSettingEntity> clsDefSettingIndex;
-
+ Map<String, ClusterSettingEntity> metaDataCache = new ConcurrentHashMap<>();
public BdbClusterConfigMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
clsDefSettingStore = new EntityStore(repEnv,
@@ -52,6 +54,7 @@ public class BdbClusterConfigMapperImpl implements ClusterConfigMapper {
@Override
public void close() {
+ metaDataCache.clear();
if (clsDefSettingStore != null) {
try {
clsDefSettingStore.close();
@@ -63,12 +66,12 @@ public class BdbClusterConfigMapperImpl implements ClusterConfigMapper {
}
@Override
- public void loadConfig(ProcessResult result) throws LoadMetaException {
+ public void loadConfig() throws LoadMetaException {
long count = 0L;
- Map<String, ClusterSettingEntity> metaDataMap = new HashMap<>();
EntityCursor<BdbClusterSettingEntity> cursor = null;
logger.info("[BDB Impl] load cluster configure start...");
try {
+ metaDataCache.clear();
cursor = clsDefSettingIndex.entities();
for (BdbClusterSettingEntity bdbEntity : cursor) {
if (bdbEntity == null) {
@@ -77,11 +80,10 @@ public class BdbClusterConfigMapperImpl implements ClusterConfigMapper {
}
ClusterSettingEntity memEntity =
new ClusterSettingEntity(bdbEntity);
- metaDataMap.put(memEntity.getRecordKey(), memEntity);
+ metaDataCache.put(memEntity.getRecordKey(), memEntity);
count++;
}
logger.info("[BDB Impl] total cluster configure records are {}", count);
- result.setSuccResult(metaDataMap);
} catch (Exception e) {
logger.error("[BDB Impl] load cluster configure failure ", e);
throw new LoadMetaException(e.getMessage());
@@ -101,7 +103,68 @@ public class BdbClusterConfigMapperImpl implements ClusterConfigMapper {
* @return
*/
@Override
- public boolean putClusterConfig(ClusterSettingEntity memEntity, ProcessResult result) {
+ public boolean addClusterConfig(ClusterSettingEntity memEntity, ProcessResult result) {
+ if (!metaDataCache.isEmpty()) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ "The cluster setting already exists, please delete or update!");
+ return result.isSuccess();
+ }
+ if (putClusterConfig2Bdb(memEntity, result)) {
+ metaDataCache.put(memEntity.getRecordKey(), memEntity);
+ }
+ return result.isSuccess();
+ }
+
+ /**
+ * Update cluster setting info in bdb store
+ *
+ * @param memEntity need add record
+ * @param result process result with old value
+ * @return
+ */
+ @Override
+ public boolean updClusterConfig(ClusterSettingEntity memEntity, ProcessResult result) {
+ if (metaDataCache.isEmpty()) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ "The cluster setting is null, please add record first!");
+ return result.isSuccess();
+ }
+ ClusterSettingEntity curEntity = metaDataCache.get(memEntity.getRecordKey());
+ if (curEntity.equals(memEntity)) {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ "The cluster settings have not changed!");
+ return result.isSuccess();
+ }
+ if (putClusterConfig2Bdb(memEntity, result)) {
+ metaDataCache.put(memEntity.getRecordKey(), memEntity);
+ }
+ return result.isSuccess();
+ }
+
+ /**
+ * get current cluster setting from bdb store
+ * @return current cluster setting, null or object, only read
+ */
+ @Override
+ public ClusterSettingEntity getClusterConfig() {
+ return metaDataCache.get(TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
+ }
+
+ /**
+ * delete current cluster setting from bdb store
+ * @return
+ */
+ @Override
+ public boolean delClusterConfig() {
+ if (metaDataCache.isEmpty()) {
+ return true;
+ }
+ delClusterConfigFromBdb(TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
+ metaDataCache.clear();
+ return true;
+ }
+
+ private boolean putClusterConfig2Bdb(ClusterSettingEntity memEntity, ProcessResult result) {
BdbClusterSettingEntity retData = null;
BdbClusterSettingEntity bdbEntity =
memEntity.buildBdbClsDefSettingEntity();
@@ -109,17 +172,17 @@ public class BdbClusterConfigMapperImpl implements ClusterConfigMapper {
retData = clsDefSettingIndex.put(bdbEntity);
} catch (Throwable e) {
logger.error("[BDB Impl] put cluster configure failure ", e);
- result.setFailResult(new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
- .append("Put cluster configure failure: ")
- .append(e.getMessage()).toString());
+ result.setFailResult(DataOpErrCode.DERR_STORE_ABNORMAL.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("Put cluster configure failure: ")
+ .append(e.getMessage()).toString());
return result.isSuccess();
}
result.setSuccResult(retData == null);
return result.isSuccess();
}
- @Override
- public boolean delClusterConfig(String key) {
+ private boolean delClusterConfigFromBdb(String key) {
try {
clsDefSettingIndex.delete(key);
} catch (Throwable e) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupBlackListMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupBlackListMapperImpl.java
index 7dcb9b0..f4c9586 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupBlackListMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupBlackListMapperImpl.java
@@ -23,12 +23,16 @@ import com.sleepycat.persist.EntityCursor;
import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.StoreConfig;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
import org.apache.tubemq.server.common.exception.LoadMetaException;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
+import org.apache.tubemq.server.master.metastore.DataOpErrCode;
import org.apache.tubemq.server.master.metastore.dao.entity.GroupBlackListEntity;
import org.apache.tubemq.server.master.metastore.dao.mapper.GroupBlackListMapper;
import org.slf4j.Logger;
@@ -38,14 +42,18 @@ import org.slf4j.LoggerFactory;
public class BdbGroupBlackListMapperImpl implements GroupBlackListMapper {
-
private static final Logger logger =
LoggerFactory.getLogger(BdbGroupBlackListMapperImpl.class);
-
-
// consumer group black list store
private EntityStore blackGroupStore;
private PrimaryIndex<String/* recordKey */, BdbBlackGroupEntity> blackGroupIndex;
+ private ConcurrentHashMap<String/* recordKey */, GroupBlackListEntity>
+ groupBlackListCache = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<String/* topicName */, ConcurrentHashSet<String>>
+ groupBlackListTopicCache = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<String/* groupName */, ConcurrentHashSet<String> >
+ groupBlackListGroupCache = new ConcurrentHashMap<>();
+
public BdbGroupBlackListMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
@@ -57,6 +65,9 @@ public class BdbGroupBlackListMapperImpl implements GroupBlackListMapper {
@Override
public void close() {
+ // clear cache data
+ clearCacheData();
+ // close store object
if (blackGroupStore != null) {
try {
blackGroupStore.close();
@@ -68,12 +79,14 @@ public class BdbGroupBlackListMapperImpl implements GroupBlackListMapper {
}
@Override
- public void loadConfig(ProcessResult result) throws LoadMetaException {
+ public void loadConfig() throws LoadMetaException {
long count = 0L;
- Map<String, GroupBlackListEntity> metaDataMap = new HashMap<>();
EntityCursor<BdbBlackGroupEntity> cursor = null;
logger.info("[BDB Impl] load blacklist configure start...");
try {
+ // clear cache data
+ clearCacheData();
+ // read data from bdb
cursor = blackGroupIndex.entities();
for (BdbBlackGroupEntity bdbEntity : cursor) {
if (bdbEntity == null) {
@@ -82,11 +95,10 @@ public class BdbGroupBlackListMapperImpl implements GroupBlackListMapper {
}
GroupBlackListEntity memEntity =
new GroupBlackListEntity(bdbEntity);
- metaDataMap.put(memEntity.getRecordKey(), memEntity);
+ addOrUpdCacheRecord(memEntity);
count++;
}
logger.info("[BDB Impl] total blacklist configure records are {}", count);
- result.setSuccResult(metaDataMap);
} catch (Exception e) {
logger.error("[BDB Impl] load blacklist configure failure ", e);
throw new LoadMetaException(e.getMessage());
@@ -98,6 +110,126 @@ public class BdbGroupBlackListMapperImpl implements GroupBlackListMapper {
logger.info("[BDB Impl] load blacklist configure successfully...");
}
+ @Override
+ public boolean addGroupBlackListConf(GroupBlackListEntity memEntity, ProcessResult result) {
+ GroupBlackListEntity curEntity =
+ groupBlackListCache.get(memEntity.getRecordKey());
+ if (curEntity != null) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The blacklist ").append(memEntity.getRecordKey())
+ .append("'s configure already exists, please delete it first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (putGroupBlackListConfig2Bdb(memEntity, result)) {
+ addOrUpdCacheRecord(memEntity);
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean updGroupBlackListConf(GroupBlackListEntity memEntity, ProcessResult result) {
+ GroupBlackListEntity curEntity =
+ groupBlackListCache.get(memEntity.getRecordKey());
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The blacklist ").append(memEntity.getRecordKey())
+ .append("'s configure is not exists, please add record first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (curEntity.equals(memEntity)) {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The blacklist ").append(memEntity.getRecordKey())
+ .append("'s configure have not changed, please delete it first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (putGroupBlackListConfig2Bdb(memEntity, result)) {
+ addOrUpdCacheRecord(memEntity);
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean delGroupBlackListConf(String recordKey) {
+ GroupBlackListEntity curEntity =
+ groupBlackListCache.get(recordKey);
+ if (curEntity == null) {
+ return true;
+ }
+ delGroupBlackListConfigFromBdb(recordKey);
+ delCacheRecord(recordKey);
+ return true;
+ }
+
+ @Override
+ public List<GroupBlackListEntity> getGrpBlkLstConfByGroupName(String groupName) {
+ ConcurrentHashSet<String> keySet =
+ groupBlackListGroupCache.get(groupName);
+ if (keySet == null || keySet.isEmpty()) {
+ return Collections.emptyList();
+ }
+ GroupBlackListEntity entity;
+ List<GroupBlackListEntity> result = new ArrayList<>();
+ for (String recordKey : keySet) {
+ entity = groupBlackListCache.get(recordKey);
+ if (entity != null) {
+ result.add(entity);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public boolean delGroupBlackListConfByGroupName(String groupName) {
+ List<GroupBlackListEntity> curEntitys =
+ getGrpBlkLstConfByTopicName(groupName);
+ if (curEntitys.isEmpty()) {
+ return true;
+ }
+ for (GroupBlackListEntity entity : curEntitys) {
+ delGroupBlackListConf(entity.getRecordKey());
+ }
+ return true;
+ }
+
+ @Override
+ public List<GroupBlackListEntity> getGrpBlkLstConfByTopicName(String topicName) {
+ ConcurrentHashSet<String> keySet =
+ groupBlackListTopicCache.get(topicName);
+ if (keySet == null || keySet.isEmpty()) {
+ return Collections.emptyList();
+ }
+ GroupBlackListEntity entity;
+ List<GroupBlackListEntity> result = new ArrayList<>();
+ for (String recordKey : keySet) {
+ entity = groupBlackListCache.get(recordKey);
+ if (entity != null) {
+ result.add(entity);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public List<GroupBlackListEntity> getGroupBlackListConf(GroupBlackListEntity qryEntity) {
+ List<GroupBlackListEntity> retEntitys = new ArrayList<>();
+ if (qryEntity == null) {
+ retEntitys.addAll(groupBlackListCache.values());
+ } else {
+ for (GroupBlackListEntity entity : groupBlackListCache.values()) {
+ if (entity.isMatched(qryEntity)) {
+ retEntitys.add(entity);
+ }
+ }
+ }
+ return retEntitys;
+ }
+
/**
* Put blacklist configure info into bdb store
*
@@ -105,26 +237,25 @@ public class BdbGroupBlackListMapperImpl implements GroupBlackListMapper {
* @param result process result with old value
* @return
*/
- @Override
- public boolean putGroupBlackListConfig(GroupBlackListEntity memEntity, ProcessResult result) {
+ private boolean putGroupBlackListConfig2Bdb(GroupBlackListEntity memEntity,
+ ProcessResult result) {
BdbBlackGroupEntity retData = null;
- BdbBlackGroupEntity bdbEntity =
- memEntity.buildBdbBlackListEntity();
+ BdbBlackGroupEntity bdbEntity = memEntity.buildBdbBlackListEntity();
try {
retData = blackGroupIndex.put(bdbEntity);
} catch (Throwable e) {
logger.error("[BDB Impl] put blacklist configure failure ", e);
- result.setFailResult(new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
- .append("Put blacklist configure failure: ")
- .append(e.getMessage()).toString());
+ result.setFailResult(DataOpErrCode.DERR_STORE_ABNORMAL.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("Put blacklist configure failure: ")
+ .append(e.getMessage()).toString());
return result.isSuccess();
}
result.setSuccResult(retData == null);
return result.isSuccess();
}
- @Override
- public boolean delGroupBlackListConfig(String recordKey) {
+ private boolean delGroupBlackListConfigFromBdb(String recordKey) {
try {
blackGroupIndex.delete(recordKey);
} catch (Throwable e) {
@@ -134,4 +265,60 @@ public class BdbGroupBlackListMapperImpl implements GroupBlackListMapper {
return true;
}
+
+ private void delCacheRecord(String recordKey) {
+ GroupBlackListEntity curEntity = groupBlackListCache.get(recordKey);
+ if (curEntity == null) {
+ return;
+ }
+ // add topic index
+ ConcurrentHashSet<String> keySet =
+ groupBlackListTopicCache.get(curEntity.getTopicName());
+ if (keySet != null) {
+ keySet.remove(recordKey);
+ if (keySet.isEmpty()) {
+ groupBlackListTopicCache.remove(curEntity.getTopicName());
+ }
+ }
+ // delete group index
+ keySet = groupBlackListGroupCache.remove(curEntity.getGroupName());
+ if (keySet != null) {
+ keySet.remove(recordKey);
+ if (keySet.isEmpty()) {
+ groupBlackListGroupCache.remove(curEntity.getGroupName());
+ }
+ }
+ }
+
+ private void addOrUpdCacheRecord(GroupBlackListEntity entity) {
+ groupBlackListCache.put(entity.getRecordKey(), entity);
+ // add topic index map
+ ConcurrentHashSet<String> keySet =
+ groupBlackListTopicCache.get(entity.getTopicName());
+ if (keySet == null) {
+ ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
+ keySet = groupBlackListTopicCache.putIfAbsent(entity.getTopicName(), tmpSet);
+ if (keySet == null) {
+ keySet = tmpSet;
+ }
+ }
+ keySet.add(entity.getRecordKey());
+ // add group index map
+ keySet = groupBlackListGroupCache.get(entity.getGroupName());
+ if (keySet == null) {
+ ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
+ keySet = groupBlackListGroupCache.putIfAbsent(entity.getGroupName(), tmpSet);
+ if (keySet == null) {
+ keySet = tmpSet;
+ }
+ }
+ keySet.add(entity.getRecordKey());
+ }
+
+ private void clearCacheData() {
+ groupBlackListTopicCache.clear();
+ groupBlackListGroupCache.clear();
+ groupBlackListCache.clear();
+ }
+
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupConfigMapperImpl.java
index bb76154..b483822 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupConfigMapperImpl.java
@@ -25,10 +25,12 @@ import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.StoreConfig;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.server.common.exception.LoadMetaException;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntity;
+import org.apache.tubemq.server.master.metastore.DataOpErrCode;
import org.apache.tubemq.server.master.metastore.dao.entity.GroupConfigEntity;
import org.apache.tubemq.server.master.metastore.dao.mapper.GroupConfigMapper;
import org.slf4j.Logger;
@@ -38,15 +40,13 @@ import org.slf4j.LoggerFactory;
public class BdbGroupConfigMapperImpl implements GroupConfigMapper {
-
private static final Logger logger =
LoggerFactory.getLogger(BdbGroupConfigMapperImpl.class);
-
-
// consumer group configure store
private EntityStore groupConfStore;
private PrimaryIndex<String/* groupName */, BdbGroupFlowCtrlEntity> groupConfIndex;
-
+ private ConcurrentHashMap<String/* groupName */, GroupConfigEntity> groupConfCache =
+ new ConcurrentHashMap<>();
public BdbGroupConfigMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
groupConfStore = new EntityStore(repEnv,
@@ -57,6 +57,7 @@ public class BdbGroupConfigMapperImpl implements GroupConfigMapper {
@Override
public void close() {
+ groupConfCache.clear();
if (groupConfStore != null) {
try {
groupConfStore.close();
@@ -68,12 +69,12 @@ public class BdbGroupConfigMapperImpl implements GroupConfigMapper {
}
@Override
- public void loadConfig(ProcessResult result) throws LoadMetaException {
+ public void loadConfig() throws LoadMetaException {
long count = 0L;
- Map<String, GroupConfigEntity> metaDataMap = new HashMap<>();
EntityCursor<BdbGroupFlowCtrlEntity> cursor = null;
logger.info("[BDB Impl] load group configure start...");
try {
+ groupConfCache.clear();
cursor = groupConfIndex.entities();
for (BdbGroupFlowCtrlEntity bdbEntity : cursor) {
if (bdbEntity == null) {
@@ -82,11 +83,10 @@ public class BdbGroupConfigMapperImpl implements GroupConfigMapper {
}
GroupConfigEntity memEntity =
new GroupConfigEntity(bdbEntity);
- metaDataMap.put(memEntity.getGroupName(), memEntity);
+ groupConfCache.put(memEntity.getGroupName(), memEntity);
count++;
}
logger.info("[BDB Impl] total group configure records are {}", count);
- result.setSuccResult(metaDataMap);
} catch (Exception e) {
logger.error("[BDB Impl] load group configure failure ", e);
throw new LoadMetaException(e.getMessage());
@@ -98,6 +98,84 @@ public class BdbGroupConfigMapperImpl implements GroupConfigMapper {
logger.info("[BDB Impl] load group configure successfully...");
}
+ @Override
+ public boolean addGroupConf(GroupConfigEntity memEntity, ProcessResult result) {
+ GroupConfigEntity curEntity =
+ groupConfCache.get(memEntity.getGroupName());
+ if (curEntity != null) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The group ").append(memEntity.getGroupName())
+ .append("'s configure already exists, please delete it first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (putGroupConfigConfig2Bdb(memEntity, result)) {
+ groupConfCache.put(memEntity.getGroupName(), memEntity);
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean updGroupConf(GroupConfigEntity memEntity, ProcessResult result) {
+ GroupConfigEntity curEntity =
+ groupConfCache.get(memEntity.getGroupName());
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The group ").append(memEntity.getGroupName())
+ .append("'s configure is not exists, please add record first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (curEntity.equals(memEntity)) {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The group ").append(memEntity.getGroupName())
+ .append("'s configure have not changed, please delete it first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (putGroupConfigConfig2Bdb(memEntity, result)) {
+ groupConfCache.put(memEntity.getGroupName(), memEntity);
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean delGroupConf(String groupName) {
+ GroupConfigEntity curEntity =
+ groupConfCache.get(groupName);
+ if (curEntity == null) {
+ return true;
+ }
+ delGroupConfigConfigFromBdb(groupName);
+ groupConfCache.remove(groupName);
+ return true;
+ }
+
+ @Override
+ public GroupConfigEntity getGroupConf(String groupName) {
+ return groupConfCache.get(groupName);
+ }
+
+ @Override
+ public Map<String, GroupConfigEntity> getGroupConf(GroupConfigEntity qryEntity) {
+ Map<String, GroupConfigEntity> retMap = new HashMap<>();
+ if (qryEntity == null) {
+ for (GroupConfigEntity entity : groupConfCache.values()) {
+ retMap.put(entity.getGroupName(), entity);
+ }
+ } else {
+ for (GroupConfigEntity entity : groupConfCache.values()) {
+ if (entity.isMatched(qryEntity)) {
+ retMap.put(entity.getGroupName(), entity);
+ }
+ }
+ }
+ return retMap;
+ }
+
/**
* Put Group configure info into bdb store
*
@@ -105,8 +183,7 @@ public class BdbGroupConfigMapperImpl implements GroupConfigMapper {
* @param result process result with old value
* @return
*/
- @Override
- public boolean putGroupConfigConfig(GroupConfigEntity memEntity, ProcessResult result) {
+ private boolean putGroupConfigConfig2Bdb(GroupConfigEntity memEntity, ProcessResult result) {
BdbGroupFlowCtrlEntity retData = null;
BdbGroupFlowCtrlEntity bdbEntity =
memEntity.buildBdbGroupFlowCtrlEntity();
@@ -114,17 +191,17 @@ public class BdbGroupConfigMapperImpl implements GroupConfigMapper {
retData = groupConfIndex.put(bdbEntity);
} catch (Throwable e) {
logger.error("[BDB Impl] put group configure failure ", e);
- result.setFailResult(new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
- .append("Put group configure failure: ")
- .append(e.getMessage()).toString());
+ result.setFailResult(DataOpErrCode.DERR_STORE_ABNORMAL.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("Put group configure failure: ")
+ .append(e.getMessage()).toString());
return result.isSuccess();
}
result.setSuccResult(retData == null);
return result.isSuccess();
}
- @Override
- public boolean delGroupConfigConfig(String recordKey) {
+ private boolean delGroupConfigConfigFromBdb(String recordKey) {
try {
groupConfIndex.delete(recordKey);
} catch (Throwable e) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java
index 5280053..80ab930 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java
@@ -23,12 +23,16 @@ import com.sleepycat.persist.EntityCursor;
import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.StoreConfig;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
import org.apache.tubemq.server.common.exception.LoadMetaException;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
+import org.apache.tubemq.server.master.metastore.DataOpErrCode;
import org.apache.tubemq.server.master.metastore.dao.entity.GroupFilterCtrlEntity;
import org.apache.tubemq.server.master.metastore.dao.mapper.GroupFilterCtrlMapper;
import org.slf4j.Logger;
@@ -38,14 +42,19 @@ import org.slf4j.LoggerFactory;
public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
-
private static final Logger logger =
LoggerFactory.getLogger(BdbGroupFilterCtrlMapperImpl.class);
-
-
// consumer group filter control store
private EntityStore groupFilterStore;
private PrimaryIndex<String/* recordKey */, BdbGroupFilterCondEntity> groupFilterIndex;
+ // configure cache
+ private ConcurrentHashMap<String/* recordKey */, GroupFilterCtrlEntity>
+ groupFilterCtrlCache = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<String/* topicName */, ConcurrentHashSet<String>>
+ groupFilterCtrlTopicCache = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<String/* groupName */, ConcurrentHashSet<String> >
+ groupFilterCtrlGroupCache = new ConcurrentHashMap<>();
+
public BdbGroupFilterCtrlMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
@@ -57,6 +66,7 @@ public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
@Override
public void close() {
+ clearCacheData();
if (groupFilterStore != null) {
try {
groupFilterStore.close();
@@ -68,12 +78,12 @@ public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
}
@Override
- public void loadConfig(ProcessResult result) throws LoadMetaException {
+ public void loadConfig() throws LoadMetaException {
long count = 0L;
- Map<String, GroupFilterCtrlEntity> metaDataMap = new HashMap<>();
EntityCursor<BdbGroupFilterCondEntity> cursor = null;
logger.info("[BDB Impl] load filter configure start...");
try {
+ clearCacheData();
cursor = groupFilterIndex.entities();
for (BdbGroupFilterCondEntity bdbEntity : cursor) {
if (bdbEntity == null) {
@@ -82,11 +92,10 @@ public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
}
GroupFilterCtrlEntity memEntity =
new GroupFilterCtrlEntity(bdbEntity);
- metaDataMap.put(memEntity.getRecordKey(), memEntity);
+ addOrUpdCacheRecord(memEntity);
count++;
}
logger.info("[BDB Impl] total filter configure records are {}", count);
- result.setSuccResult(metaDataMap);
} catch (Exception e) {
logger.error("[BDB Impl] load filter configure failure ", e);
throw new LoadMetaException(e.getMessage());
@@ -98,6 +107,95 @@ public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
logger.info("[BDB Impl] load filter configure successfully...");
}
+ @Override
+ public boolean addGroupFilterCtrlConf(GroupFilterCtrlEntity memEntity, ProcessResult result) {
+ GroupFilterCtrlEntity curEntity =
+ groupFilterCtrlCache.get(memEntity.getRecordKey());
+ if (curEntity != null) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The group filter ").append(memEntity.getRecordKey())
+ .append("'s configure already exists, please delete it first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (putGroupFilterCtrlConfig2Bdb(memEntity, result)) {
+ addOrUpdCacheRecord(memEntity);
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean updGroupFilterCtrlConf(GroupFilterCtrlEntity memEntity, ProcessResult result) {
+ GroupFilterCtrlEntity curEntity =
+ groupFilterCtrlCache.get(memEntity.getRecordKey());
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The group filter ").append(memEntity.getRecordKey())
+ .append("'s configure is not exists, please add record first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (curEntity.equals(memEntity)) {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The group filter ").append(memEntity.getRecordKey())
+ .append("'s configure have not changed, please delete it first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (putGroupFilterCtrlConfig2Bdb(memEntity, result)) {
+ addOrUpdCacheRecord(memEntity);
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean delGroupFilterCtrlConf(String recordKey) {
+ GroupFilterCtrlEntity curEntity =
+ groupFilterCtrlCache.get(recordKey);
+ if (curEntity == null) {
+ return true;
+ }
+ delGroupFilterCtrlConfigFromBdb(recordKey);
+ delCacheRecord(recordKey);
+ return true;
+ }
+
+ @Override
+ public List<GroupFilterCtrlEntity> getGroupFilterCtrlConf(String groupName) {
+ ConcurrentHashSet<String> keySet =
+ groupFilterCtrlGroupCache.get(groupName);
+ if (keySet == null || keySet.isEmpty()) {
+ return Collections.emptyList();
+ }
+ GroupFilterCtrlEntity entity;
+ List<GroupFilterCtrlEntity> result = new ArrayList<>();
+ for (String recordKey : keySet) {
+ entity = groupFilterCtrlCache.get(recordKey);
+ if (entity != null) {
+ result.add(entity);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public List<GroupFilterCtrlEntity> getGroupFilterCtrlConf(GroupFilterCtrlEntity qryEntity) {
+ List<GroupFilterCtrlEntity> retEntitys = new ArrayList<>();
+ if (qryEntity == null) {
+ retEntitys.addAll(groupFilterCtrlCache.values());
+ } else {
+ for (GroupFilterCtrlEntity entity : groupFilterCtrlCache.values()) {
+ if (entity.isMatched(qryEntity)) {
+ retEntitys.add(entity);
+ }
+ }
+ }
+ return retEntitys;
+ }
+
/**
* Put Group filter configure info into bdb store
*
@@ -105,8 +203,7 @@ public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
* @param result process result with old value
* @return
*/
- @Override
- public boolean putGroupFilterCtrlConfig(GroupFilterCtrlEntity memEntity, ProcessResult result) {
+ private boolean putGroupFilterCtrlConfig2Bdb(GroupFilterCtrlEntity memEntity, ProcessResult result) {
BdbGroupFilterCondEntity retData = null;
BdbGroupFilterCondEntity bdbEntity =
memEntity.buildBdbGroupFilterCondEntity();
@@ -114,17 +211,17 @@ public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
retData = groupFilterIndex.put(bdbEntity);
} catch (Throwable e) {
logger.error("[BDB Impl] put filter configure failure ", e);
- result.setFailResult(new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
- .append("Put filter configure failure: ")
- .append(e.getMessage()).toString());
+ result.setFailResult(DataOpErrCode.DERR_STORE_ABNORMAL.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("Put filter configure failure: ")
+ .append(e.getMessage()).toString());
return result.isSuccess();
}
result.setSuccResult(retData == null);
return result.isSuccess();
}
- @Override
- public boolean delGroupFilterCtrlConfig(String recordKey) {
+ private boolean delGroupFilterCtrlConfigFromBdb(String recordKey) {
try {
groupFilterIndex.delete(recordKey);
} catch (Throwable e) {
@@ -134,4 +231,58 @@ public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
return true;
}
+ private void delCacheRecord(String recordKey) {
+ GroupFilterCtrlEntity curEntity = groupFilterCtrlCache.get(recordKey);
+ if (curEntity == null) {
+ return;
+ }
+ // add topic index
+ ConcurrentHashSet<String> keySet =
+ groupFilterCtrlTopicCache.get(curEntity.getTopicName());
+ if (keySet != null) {
+ keySet.remove(recordKey);
+ if (keySet.isEmpty()) {
+ groupFilterCtrlTopicCache.remove(curEntity.getTopicName());
+ }
+ }
+ // delete group index
+ keySet = groupFilterCtrlGroupCache.remove(curEntity.getGroupName());
+ if (keySet != null) {
+ keySet.remove(recordKey);
+ if (keySet.isEmpty()) {
+ groupFilterCtrlGroupCache.remove(curEntity.getGroupName());
+ }
+ }
+ }
+
+ private void addOrUpdCacheRecord(GroupFilterCtrlEntity entity) {
+ groupFilterCtrlCache.put(entity.getRecordKey(), entity);
+ // add topic index map
+ ConcurrentHashSet<String> keySet =
+ groupFilterCtrlTopicCache.get(entity.getTopicName());
+ if (keySet == null) {
+ ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
+ keySet = groupFilterCtrlTopicCache.putIfAbsent(entity.getTopicName(), tmpSet);
+ if (keySet == null) {
+ keySet = tmpSet;
+ }
+ }
+ keySet.add(entity.getRecordKey());
+ // add group index map
+ keySet = groupFilterCtrlGroupCache.get(entity.getGroupName());
+ if (keySet == null) {
+ ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
+ keySet = groupFilterCtrlGroupCache.putIfAbsent(entity.getGroupName(), tmpSet);
+ if (keySet == null) {
+ keySet = tmpSet;
+ }
+ }
+ keySet.add(entity.getRecordKey());
+ }
+
+ private void clearCacheData() {
+ groupFilterCtrlTopicCache.clear();
+ groupFilterCtrlGroupCache.clear();
+ groupFilterCtrlCache.clear();
+ }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicConfigMapperImpl.java
index 334c423..e96cf7e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicConfigMapperImpl.java
@@ -23,12 +23,15 @@ import com.sleepycat.persist.EntityCursor;
import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.StoreConfig;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
import org.apache.tubemq.server.common.exception.LoadMetaException;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
+import org.apache.tubemq.server.master.metastore.DataOpErrCode;
import org.apache.tubemq.server.master.metastore.dao.entity.TopicConfEntity;
import org.apache.tubemq.server.master.metastore.dao.mapper.TopicConfigMapper;
import org.slf4j.Logger;
@@ -38,14 +41,20 @@ import org.slf4j.LoggerFactory;
public class BdbTopicConfigMapperImpl implements TopicConfigMapper {
-
private static final Logger logger =
LoggerFactory.getLogger(BdbTopicConfigMapperImpl.class);
-
// Topic configure store
private EntityStore topicConfStore;
private PrimaryIndex<String/* recordKey */, BdbTopicConfEntity> topicConfIndex;
+ // data cache
+ private ConcurrentHashMap<String/* recordKey */, TopicConfEntity> topicConfCache =
+ new ConcurrentHashMap<>();
+ private ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashSet<String>>
+ topicConfBrokerCacheIndex = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<String/* topicName */, ConcurrentHashSet<String> >
+ topicConfTopicNameCacheIndex = new ConcurrentHashMap<>();
+
public BdbTopicConfigMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
@@ -68,9 +77,8 @@ public class BdbTopicConfigMapperImpl implements TopicConfigMapper {
}
@Override
- public void loadConfig(ProcessResult result) throws LoadMetaException {
+ public void loadConfig() throws LoadMetaException {
long count = 0L;
- Map<String, TopicConfEntity> metaDataMap = new HashMap<>();
EntityCursor<BdbTopicConfEntity> cursor = null;
logger.info("[BDB Impl] load topic configure start...");
try {
@@ -80,13 +88,11 @@ public class BdbTopicConfigMapperImpl implements TopicConfigMapper {
logger.warn("[BDB Impl] found Null data while loading topic configure!");
continue;
}
- TopicConfEntity memEntity =
- new TopicConfEntity(bdbEntity);
- metaDataMap.put(memEntity.getRecordKey(), memEntity);
+ TopicConfEntity memEntity = new TopicConfEntity(bdbEntity);
+ addOrUpdCacheRecord(memEntity);
count++;
}
logger.info("[BDB Impl] total topic configure records are {}", count);
- result.setSuccResult(metaDataMap);
} catch (Exception e) {
logger.error("[BDB Impl] load topic configure failure ", e);
throw new LoadMetaException(e.getMessage());
@@ -98,6 +104,77 @@ public class BdbTopicConfigMapperImpl implements TopicConfigMapper {
logger.info("[BDB Impl] load topic configure successfully...");
}
+ @Override
+ public boolean addTopicConf(TopicConfEntity memEntity, ProcessResult result) {
+ TopicConfEntity curEntity =
+ topicConfCache.get(memEntity.getRecordKey());
+ if (curEntity != null) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The topic configure ").append(memEntity.getRecordKey())
+ .append("'s configure already exists, please delete it first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (putTopicConfig2Bdb(memEntity, result)) {
+ addOrUpdCacheRecord(memEntity);
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean updTopicConf(TopicConfEntity memEntity, ProcessResult result) {
+ TopicConfEntity curEntity =
+ topicConfCache.get(memEntity.getRecordKey());
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The topic configure ").append(memEntity.getRecordKey())
+ .append("'s configure is not exists, please add record first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (curEntity.equals(memEntity)) {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The topic configure ").append(memEntity.getRecordKey())
+ .append("'s configure have not changed, please delete it first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (putTopicConfig2Bdb(memEntity, result)) {
+ addOrUpdCacheRecord(memEntity);
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean delTopicConf(String recordKey) {
+ TopicConfEntity curEntity =
+ topicConfCache.get(recordKey);
+ if (curEntity == null) {
+ return true;
+ }
+ delTopicConfigFromBdb(recordKey);
+ delCacheRecord(recordKey);
+ return true;
+ }
+
+ @Override
+ public List<TopicConfEntity> getTopicConf(TopicConfEntity qryEntity) {
+ List<TopicConfEntity> retEntitys = new ArrayList<>();
+ if (qryEntity == null) {
+ retEntitys.addAll(topicConfCache.values());
+ } else {
+ for (TopicConfEntity entity : topicConfCache.values()) {
+ if (entity.isMatched(qryEntity)) {
+ retEntitys.add(entity);
+ }
+ }
+ }
+ return retEntitys;
+ }
+
/**
* Put topic configure info into bdb store
*
@@ -105,8 +182,7 @@ public class BdbTopicConfigMapperImpl implements TopicConfigMapper {
* @param result process result with old value
* @return
*/
- @Override
- public boolean putTopicConfig(TopicConfEntity memEntity, ProcessResult result) {
+ private boolean putTopicConfig2Bdb(TopicConfEntity memEntity, ProcessResult result) {
BdbTopicConfEntity retData = null;
BdbTopicConfEntity bdbEntity =
memEntity.buildBdbTopicConfEntity();
@@ -114,17 +190,17 @@ public class BdbTopicConfigMapperImpl implements TopicConfigMapper {
retData = topicConfIndex.put(bdbEntity);
} catch (Throwable e) {
logger.error("[BDB Impl] put topic configure failure ", e);
- result.setFailResult(new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
- .append("Put topic configure failure: ")
- .append(e.getMessage()).toString());
+ result.setFailResult(DataOpErrCode.DERR_STORE_ABNORMAL.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("Put topic configure failure: ")
+ .append(e.getMessage()).toString());
return result.isSuccess();
}
result.setSuccResult(retData == null);
return result.isSuccess();
}
- @Override
- public boolean delTopicConfig(String recordKey) {
+ private boolean delTopicConfigFromBdb(String recordKey) {
try {
topicConfIndex.delete(recordKey);
} catch (Throwable e) {
@@ -134,4 +210,58 @@ public class BdbTopicConfigMapperImpl implements TopicConfigMapper {
return true;
}
+ private void delCacheRecord(String recordKey) {
+ TopicConfEntity curEntity = topicConfCache.get(recordKey);
+ if (curEntity == null) {
+ return;
+ }
+ // add topic index
+ ConcurrentHashSet<String> keySet =
+ topicConfTopicNameCacheIndex.get(curEntity.getTopicName());
+ if (keySet != null) {
+ keySet.remove(recordKey);
+ if (keySet.isEmpty()) {
+ topicConfTopicNameCacheIndex.remove(curEntity.getTopicName());
+ }
+ }
+ // delete brokerId index
+ keySet = topicConfBrokerCacheIndex.remove(curEntity.getBrokerId());
+ if (keySet != null) {
+ keySet.remove(recordKey);
+ if (keySet.isEmpty()) {
+ topicConfBrokerCacheIndex.remove(curEntity.getBrokerId());
+ }
+ }
+ }
+
+ private void addOrUpdCacheRecord(TopicConfEntity entity) {
+ topicConfCache.put(entity.getRecordKey(), entity);
+ // add topic index map
+ ConcurrentHashSet<String> keySet =
+ topicConfTopicNameCacheIndex.get(entity.getTopicName());
+ if (keySet == null) {
+ ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
+ keySet = topicConfTopicNameCacheIndex.putIfAbsent(entity.getTopicName(), tmpSet);
+ if (keySet == null) {
+ keySet = tmpSet;
+ }
+ }
+ keySet.add(entity.getRecordKey());
+ // add brokerId index map
+ keySet = topicConfBrokerCacheIndex.get(entity.getBrokerId());
+ if (keySet == null) {
+ ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
+ keySet = topicConfBrokerCacheIndex.putIfAbsent(entity.getBrokerId(), tmpSet);
+ if (keySet == null) {
+ keySet = tmpSet;
+ }
+ }
+ keySet.add(entity.getRecordKey());
+ }
+
+ private void clearCacheData() {
+ topicConfTopicNameCacheIndex.clear();
+ topicConfBrokerCacheIndex.clear();
+ topicConfCache.clear();
+ }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
index 9050938..f535dac 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
@@ -23,12 +23,15 @@ import com.sleepycat.persist.EntityCursor;
import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.StoreConfig;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.server.common.exception.LoadMetaException;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
+import org.apache.tubemq.server.master.metastore.DataOpErrCode;
+import org.apache.tubemq.server.master.metastore.dao.entity.TopicConfEntity;
import org.apache.tubemq.server.master.metastore.dao.entity.TopicCtrlEntity;
import org.apache.tubemq.server.master.metastore.dao.mapper.TopicCtrlMapper;
import org.slf4j.Logger;
@@ -45,8 +48,10 @@ public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
// Topic control store
private EntityStore topicCtrlStore;
- private PrimaryIndex<String/* recordKey */, BdbTopicAuthControlEntity> topicCtrlIndex;
-
+ private PrimaryIndex<String/* topicName */, BdbTopicAuthControlEntity> topicCtrlIndex;
+ // data cache
+ private ConcurrentHashMap<String/* topicName */, TopicCtrlEntity> topicCtrlCache =
+ new ConcurrentHashMap<>();
public BdbTopicCtrlMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
topicCtrlStore = new EntityStore(repEnv,
@@ -57,6 +62,7 @@ public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
@Override
public void close() {
+ topicCtrlCache.clear();
if (topicCtrlStore != null) {
try {
topicCtrlStore.close();
@@ -68,25 +74,23 @@ public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
}
@Override
- public void loadConfig(ProcessResult result) throws LoadMetaException {
+ public void loadConfig() throws LoadMetaException {
long count = 0L;
- Map<String, TopicCtrlEntity> metaDataMap = new HashMap<>();
EntityCursor<BdbTopicAuthControlEntity> cursor = null;
logger.info("[BDB Impl] load topic configure start...");
try {
+ topicCtrlCache.clear();
cursor = topicCtrlIndex.entities();
for (BdbTopicAuthControlEntity bdbEntity : cursor) {
if (bdbEntity == null) {
logger.warn("[BDB Impl] found Null data while loading topic control!");
continue;
}
- TopicCtrlEntity memEntity =
- new TopicCtrlEntity(bdbEntity);
- metaDataMap.put(memEntity.getTopicName(), memEntity);
+ TopicCtrlEntity memEntity = new TopicCtrlEntity(bdbEntity);
+ topicCtrlCache.put(memEntity.getTopicName(), memEntity);
count++;
}
logger.info("[BDB Impl] total topic control records are {}", count);
- result.setSuccResult(metaDataMap);
} catch (Exception e) {
logger.error("[BDB Impl] load topic control failure ", e);
throw new LoadMetaException(e.getMessage());
@@ -98,6 +102,82 @@ public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
logger.info("[BDB Impl] load topic control successfully...");
}
+ @Override
+ public boolean addTopicCtrlConf(TopicCtrlEntity memEntity, ProcessResult result) {
+ TopicCtrlEntity curEntity =
+ topicCtrlCache.get(memEntity.getTopicName());
+ if (curEntity != null) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The topic control ").append(memEntity.getTopicName())
+ .append("'s configure already exists, please delete it first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (putTopicCtrlConfig2Bdb(memEntity, result)) {
+ topicCtrlCache.put(memEntity.getTopicName(), memEntity);
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean updTopicCtrlConf(TopicCtrlEntity memEntity, ProcessResult result) {
+ TopicCtrlEntity curEntity =
+ topicCtrlCache.get(memEntity.getTopicName());
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The topic control ").append(memEntity.getTopicName())
+ .append("'s configure is not exists, please add record first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (curEntity.equals(memEntity)) {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("The topic control ").append(memEntity.getTopicName())
+ .append("'s configure have not changed, please delete it first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (putTopicCtrlConfig2Bdb(memEntity, result)) {
+ topicCtrlCache.put(memEntity.getTopicName(), memEntity);
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean delTopicCtrlConf(String topicName) {
+ TopicCtrlEntity curEntity =
+ topicCtrlCache.get(topicName);
+ if (curEntity == null) {
+ return true;
+ }
+ delTopicCtrlConfigFromBdb(topicName);
+ topicCtrlCache.remove(topicName);
+ return true;
+ }
+
+ @Override
+ public TopicCtrlEntity getTopicCtrlConf(String topicName) {
+ return topicCtrlCache.get(topicName);
+ }
+
+ @Override
+ public List<TopicCtrlEntity> getTopicCtrlConf(TopicConfEntity qryEntity) {
+ List<TopicCtrlEntity> retEntitys = new ArrayList<>();
+ if (qryEntity == null) {
+ retEntitys.addAll(topicCtrlCache.values());
+ } else {
+ for (TopicCtrlEntity entity : topicCtrlCache.values()) {
+ if (entity.isMatched(qryEntity)) {
+ retEntitys.add(entity);
+ }
+ }
+ }
+ return retEntitys;
+ }
+
/**
* Put topic control configure info into bdb store
*
@@ -105,8 +185,7 @@ public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
* @param result process result with old value
* @return
*/
- @Override
- public boolean putTopicCtrlConfig(TopicCtrlEntity memEntity, ProcessResult result) {
+ private boolean putTopicCtrlConfig2Bdb(TopicCtrlEntity memEntity, ProcessResult result) {
BdbTopicAuthControlEntity retData = null;
BdbTopicAuthControlEntity bdbEntity =
memEntity.buildBdbTopicAuthControlEntity();
@@ -123,8 +202,7 @@ public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
return result.isSuccess();
}
- @Override
- public boolean delTopicCtrlConfig(String recordKey) {
+ private boolean delTopicCtrlConfigFromBdb(String recordKey) {
try {
topicCtrlIndex.delete(recordKey);
} catch (Throwable e) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/AbstractMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/keepalive/AliveObserver.java
similarity index 70%
copy from tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/AbstractMapper.java
copy to tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/keepalive/AliveObserver.java
index 7deadef..4c2e2ea 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/AbstractMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/keepalive/AliveObserver.java
@@ -15,17 +15,12 @@
* limitations under the License.
*/
-package org.apache.tubemq.server.master.metastore.dao.mapper;
-import org.apache.tubemq.server.common.exception.LoadMetaException;
-import org.apache.tubemq.server.common.utils.ProcessResult;
+package org.apache.tubemq.server.master.metastore.keepalive;
+public interface AliveObserver {
-public interface AbstractMapper {
-
- void close();
-
- void loadConfig(ProcessResult result) throws LoadMetaException;
+ void clearCacheData();
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/keepalive/KeepAlive.java
similarity index 60%
copy from tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
copy to tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/keepalive/KeepAlive.java
index c42ea45..718c4dd 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/keepalive/KeepAlive.java
@@ -15,18 +15,26 @@
* limitations under the License.
*/
-package org.apache.tubemq.server.master.metastore.dao.mapper;
+package org.apache.tubemq.server.master.metastore.keepalive;
-import org.apache.tubemq.server.common.utils.ProcessResult;
-import org.apache.tubemq.server.master.metastore.dao.entity.TopicCtrlEntity;
+import java.net.InetSocketAddress;
+import org.apache.tubemq.server.master.bdbstore.MasterGroupStatus;
-public interface TopicCtrlMapper extends AbstractMapper {
+public interface KeepAlive {
- boolean putTopicCtrlConfig(TopicCtrlEntity memEntity, ProcessResult result);
+ boolean isMasterNow();
- boolean delTopicCtrlConfig(String key);
+ long getMasterSinceTime();
+ InetSocketAddress getMasterAddress();
+ boolean isPrimaryNodeActive();
+
+ void transferMaster() throws Exception;
+
+ void registerObserver(AliveObserver eventObserver);
+
+ MasterGroupStatus getMasterGroupStatus(boolean isFromHeartbeat);
}