You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/25 03:02:23 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-475] add the
offset clone api of the consume group
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new 23cff5b [TUBEMQ-475] add the offset clone api of the consume group
23cff5b is described below
commit 23cff5b60137c262de774fa3dee8f7dcea18647a
Author: gosonzhang <go...@tencent.com>
AuthorDate: Fri Dec 25 10:40:51 2020 +0800
[TUBEMQ-475] add the offset clone api of the consume group
---
.../broker/metadata/BrokerMetadataManager.java | 3 +-
.../server/broker/metadata/MetadataManager.java | 2 +
.../server/broker/metadata/TopicMetadata.java | 16 ++
.../server/broker/offset/DefaultOffsetManager.java | 199 ++++++++++++++++-
.../tubemq/server/broker/offset/OffsetService.java | 18 ++
.../server/broker/web/BrokerAdminServlet.java | 246 +++++++++++++++++++++
.../tubemq/server/common/fielddef/WebFieldDef.java | 10 +-
.../server/common/offsetstorage/OffsetStorage.java | 11 +-
.../common/offsetstorage/ZkOffsetStorage.java | 139 +++++++++++-
.../common/offsetstorage/zookeeper/ZKUtil.java | 19 ++
.../org/apache/tubemq/server/master/TMaster.java | 3 +-
11 files changed, 647 insertions(+), 19 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java
index 1e2f21e..8568372 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java
@@ -143,7 +143,8 @@ public class BrokerMetadataManager implements MetadataManager {
return topicConfigMap.get(topic);
}
- public ConcurrentHashMap<String, TopicMetadata> getTopicConfigMap() {
+ @Override
+ public Map<String, TopicMetadata> getTopicConfigMap() {
return topicConfigMap;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/MetadataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/MetadataManager.java
index d638e5a..9ee9936 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/MetadataManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/MetadataManager.java
@@ -80,4 +80,6 @@ public interface MetadataManager {
String getDefDeletePolicy();
String getTopicDeletePolicy(String topic);
+
+ Map<String, TopicMetadata> getTopicConfigMap();
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
index 4ebfa4d..c582606 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
@@ -17,10 +17,15 @@
package org.apache.tubemq.server.broker.metadata;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.TStatusConstants;
+
/***
* Topic's metadata. Contains topic name, partitions count, etc.
*/
@@ -235,6 +240,17 @@ public class TopicMetadata {
this.unflushInterval = unflushInterval;
}
+ // builder the partitionId set for each store
+ public Set<Integer> getAllPartitionIds() {
+ Set<Integer> partIds = new HashSet<>();
+ for (int i = 0; i < numTopicStores; i++) {
+ for (int j = 0; j < numPartitions; j++) {
+ partIds.add(i * TBaseConstants.META_STORE_INS_BASE + j);
+ }
+ }
+ return partIds;
+ }
+
public int getStatusId() {
return statusId;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
index 82a758c..bdd85b3 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -17,13 +17,17 @@
package org.apache.tubemq.server.broker.offset;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.daemon.AbstractDaemonService;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.broker.BrokerConfig;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
+import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.tubemq.server.common.offsetstorage.OffsetStorage;
import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
@@ -49,7 +53,8 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
public DefaultOffsetManager(final BrokerConfig brokerConfig) {
super("[Offset Manager]", brokerConfig.getZkConfig().getZkCommitPeriodMs());
this.brokerConfig = brokerConfig;
- zkOffsetStorage = new ZkOffsetStorage(brokerConfig.getZkConfig());
+ zkOffsetStorage = new ZkOffsetStorage(brokerConfig.getZkConfig(),
+ true, brokerConfig.getBrokerId());
super.start();
}
@@ -323,6 +328,187 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
}
/***
+ * Get in-memory and in zk group set
+ *
+ * @return booked group in memory and in zk
+ */
+ @Override
+ public Set<String> getBookedGroups() {
+ Set<String> groupSet = new HashSet<>();
+ groupSet.addAll(cfmOffsetMap.keySet());
+ Map<String, Set<String>> localGroups =
+ zkOffsetStorage.getZkLocalGroupTopicInfos();
+ groupSet.addAll(localGroups.keySet());
+ return groupSet;
+ }
+
+ /***
+ * Get in-memory group set
+ *
+ * @return booked group in memory
+ */
+ public Set<String> getInMemoryGroups() {
+ Set<String> cacheGroup = new HashSet<>();
+ cacheGroup.addAll(cfmOffsetMap.keySet());
+ return cacheGroup;
+ }
+
+ /***
+ * Get in-zookeeper but not in memory's group set
+ *
+ * @return booked group in zookeeper
+ */
+ @Override
+ public Set<String> getUnusedGroupInfo() {
+ Set<String> unUsedGroups = new HashSet<>();
+ Map<String, Set<String>> localGroups =
+ zkOffsetStorage.getZkLocalGroupTopicInfos();
+ for (String groupName : localGroups.keySet()) {
+ if (!cfmOffsetMap.containsKey(groupName)) {
+ unUsedGroups.add(groupName);
+ }
+ }
+ return unUsedGroups;
+ }
+
+ /***
+ * Get the topic set subscribed by the consumer group
+ * @param group
+ * @return topic set subscribed
+ */
+ @Override
+ public Set<String> getGroupSubInfo(String group) {
+ Set<String> result = new HashSet<>();
+ Map<String, OffsetStorageInfo> topicPartOffsetMap = cfmOffsetMap.get(group);
+ if (topicPartOffsetMap == null) {
+ Map<String, Set<String>> localGroups =
+ zkOffsetStorage.getZkLocalGroupTopicInfos();
+ result = localGroups.get(group);
+ } else {
+ for (OffsetStorageInfo storageInfo : topicPartOffsetMap.values()) {
+ result.add(storageInfo.getTopic());
+ }
+ }
+ return result;
+ }
+
+ /***
+ * Get group's offset by Specified topic-partitions
+ * @param group
+ * @param topicPartMap
+ * @return group offset info in memory or zk
+ */
+ @Override
+ public Map<String, Map<Integer, Long>> queryGroupOffset(
+ String group, Map<String, Set<Integer>> topicPartMap) {
+ Map<String, Map<Integer, Long>> result = new HashMap<>();
+ // search group from memory
+ Map<String, OffsetStorageInfo> topicPartOffsetMap = cfmOffsetMap.get(group);
+ if (topicPartOffsetMap == null) {
+ // query from zookeeper
+ for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+ Map<Integer, Long> qryResult =
+ zkOffsetStorage.queryGroupOffsetInfo(
+ group, entry.getKey(), entry.getValue());
+ Map<Integer, Long> offsetMap = new HashMap<>();
+ for (Map.Entry<Integer, Long> item : qryResult.entrySet()) {
+ if (item.getValue() != null) {
+ offsetMap.put(item.getKey(), item.getValue());
+ }
+ }
+ if (!offsetMap.isEmpty()) {
+ result.put(entry.getKey(), offsetMap);
+ }
+ }
+ } else {
+ // found in memory, get offset values
+ for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+ Map<Integer, Long> offsetMap = new HashMap<>();
+ for (Integer partitionId : entry.getValue()) {
+ String offsetCacheKey =
+ getOffsetCacheKey(entry.getKey(), partitionId);
+ OffsetStorageInfo offsetInfo = topicPartOffsetMap.get(offsetCacheKey);
+ if (offsetInfo != null) {
+ offsetMap.put(partitionId, offsetInfo.getOffset());
+ }
+ }
+ if (!offsetMap.isEmpty()) {
+ result.put(entry.getKey(), offsetMap);
+ }
+ }
+ }
+ return result;
+ }
+
+
+ /***
+ * Reset offset.
+ *
+ * @param storeManager
+ * @param groups
+ * @param topicPartOffsetMap
+ * @param modifier
+ * @return at least one record modified
+ */
+ @Override
+ public boolean modifyGroupOffset(MessageStoreManager storeManager, Set<String> groups,
+ Map<String, Map<Integer, Long>> topicPartOffsetMap,
+ String modifier) {
+ long oldOffset = -1;
+ long reSetOffset = -1;
+ boolean changed = false;
+ MessageStore store = null;
+ StringBuilder strBuidler = new StringBuilder(512);
+ // set offset by group
+ for (String group : groups) {
+ for (Map.Entry<String, Map<Integer, Long>> entry : topicPartOffsetMap.entrySet()) {
+ Map<Integer, Long> partOffsetMap = entry.getValue();
+ if (partOffsetMap == null) {
+ continue;
+ }
+ // set offset
+ for (Map.Entry<Integer, Long> entry1 : partOffsetMap.entrySet()) {
+ if (entry1.getValue() == null) {
+ continue;
+ }
+ reSetOffset = entry1.getValue();
+ // get topic store
+ try {
+ store = storeManager.getOrCreateMessageStore(
+ entry.getKey(), entry1.getKey());
+ } catch (Throwable e) {
+ //
+ }
+ if (store == null) {
+ continue;
+ }
+ long firstOffset = store.getIndexMinOffset();
+ long lastOffset = store.getIndexMaxOffset();
+ // adjust reseted offset value
+ reSetOffset = reSetOffset < firstOffset
+ ? firstOffset : Math.min(reSetOffset, lastOffset);
+ String offsetCacheKey =
+ getOffsetCacheKey(entry.getKey(), entry1.getKey());
+ getAndResetTmpOffset(group, offsetCacheKey);
+ OffsetStorageInfo regInfo = loadOrCreateOffset(group,
+ entry.getKey(), entry1.getKey(), offsetCacheKey, 0);
+ oldOffset = regInfo.getAndSetOffset(reSetOffset);
+ changed = true;
+ logger.info(strBuidler
+ .append("[Offset Manager] Update offset by modifier=")
+ .append(modifier).append(",reset offset=").append(reSetOffset)
+ .append(",old offset=").append(oldOffset)
+ .append(",updated offset=").append(regInfo.getOffset())
+ .append(",group=").append(group)
+ .append(",topic-partId=").append(offsetCacheKey).toString());
+ strBuidler.delete(0, strBuidler.length());
+ }
+ }
+ }
+ return changed;
+ }
+
+ /***
* Set temp offset.
*
* @param group
@@ -425,10 +611,10 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
OffsetStorageInfo regInfo = regInfoMap.get(offsetCacheKey);
if (regInfo == null) {
OffsetStorageInfo tmpRegInfo =
- zkOffsetStorage.loadOffset(group, topic, brokerConfig.getBrokerId(), partitionId);
+ zkOffsetStorage.loadOffset(group, topic, partitionId);
if (tmpRegInfo == null) {
- tmpRegInfo =
- new OffsetStorageInfo(topic, brokerConfig.getBrokerId(), partitionId, defOffset, 0);
+ tmpRegInfo = new OffsetStorageInfo(topic,
+ brokerConfig.getBrokerId(), partitionId, defOffset, 0);
}
regInfo = regInfoMap.putIfAbsent(offsetCacheKey, tmpRegInfo);
if (regInfo == null) {
@@ -443,4 +629,9 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
.append("-").append(partitionId).toString();
}
+ private String getOffsetCacheKey(String topic, String partitionId) {
+ return new StringBuilder(256).append(topic)
+ .append("-").append(partitionId).toString();
+ }
+
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
index 066fb3b..05f0724 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
@@ -17,9 +17,13 @@
package org.apache.tubemq.server.broker.offset;
+import java.util.Map;
+import java.util.Set;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
+import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
+
/***
* Offset manager service interface.
*/
@@ -51,4 +55,18 @@ public interface OffsetService {
long getTmpOffset(final String group, final String topic, int partitionId);
+ Set<String> getBookedGroups();
+
+ Set<String> getInMemoryGroups();
+
+ Set<String> getUnusedGroupInfo();
+
+ Set<String> getGroupSubInfo(String group);
+
+ Map<String, Map<Integer, Long>> queryGroupOffset(
+ String group, Map<String, Set<Integer>> topicPartMap);
+
+ boolean modifyGroupOffset(MessageStoreManager storeManager, Set<String> groups,
+ Map<String, Map<Integer, Long>> topicPartOffsetMap,
+ String modifier);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index 1b3f524..91bfd23 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -17,6 +17,8 @@
package org.apache.tubemq.server.broker.web;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -26,6 +28,7 @@ import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.broker.TubeBroker;
+import org.apache.tubemq.server.broker.metadata.TopicMetadata;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
@@ -71,6 +74,15 @@ public class BrokerAdminServlet extends AbstractWebHandler {
// get all registered methods
innRegisterWebMethod("admin_get_methods",
"adminQueryAllMethods");
+ // Query all consumer groups booked on the Broker.
+ innRegisterWebMethod("admin_query_group",
+ "adminQueryBookedGroup");
+ // query consumer group's offset
+ innRegisterWebMethod("admin_query_offset",
+ "adminQueryGroupOffSet");
+ // clone consumer group's offset from source to target
+ innRegisterWebMethod("admin_clone_offset",
+ "adminCloneGroupOffSet");
}
public void adminQueryAllMethods(HttpServletRequest req,
@@ -560,5 +572,239 @@ public class BrokerAdminServlet extends AbstractWebHandler {
sBuilder.append("],\"totalCnt\":").append(totalCnt).append("}");
}
+ /***
+ * Query all consumer groups booked on the Broker.
+ *
+ * @param req
+ * @param sBuilder process result
+ */
+ public void adminQueryBookedGroup(HttpServletRequest req,
+ StringBuilder sBuilder) {
+ // get group list
+ ProcessResult result = WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.WITHDIVIDE, false, false);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ boolean withDivide = (boolean) result.retData1;
+ // get offset service
+ int itemCnt = 0;
+ int totalCnt = 0;
+ OffsetService offsetService = broker.getOffsetManager();
+ sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
+ if (withDivide) {
+ // query in-memory group name set
+ Set<String> onlineGroups = offsetService.getInMemoryGroups();
+ sBuilder.append("{\"type\":\"in-cache\",\"groupName\":[");
+ for (String group : onlineGroups) {
+ if (itemCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder.append("\"").append(group).append("\"");
+ }
+ sBuilder.append("],\"groupCount\":").append(itemCnt).append("}");
+ totalCnt++;
+ sBuilder.append(",");
+ // query in-zk group name set
+ itemCnt = 0;
+ Set<String> onZKGroup = offsetService.getUnusedGroupInfo();
+ sBuilder.append("{\"type\":\"in-zk\",\"groupName\":[");
+ for (String group : onZKGroup) {
+ if (itemCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder.append("\"").append(group).append("\"");
+ }
+ sBuilder.append("],\"groupCount\":").append(itemCnt).append("}");
+ totalCnt++;
+ } else {
+ Set<String> allGroups = offsetService.getBookedGroups();
+ sBuilder.append("{\"type\":\"all\",\"groupName\":[");
+ for (String group : allGroups) {
+ if (itemCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder.append("\"").append(group).append("\"");
+ }
+ sBuilder.append("],\"groupCount\":").append(itemCnt).append("}");
+ totalCnt++;
+ }
+ sBuilder.append("],\"dataCount\":").append(totalCnt).append("}");
+ }
+
+ /***
+ * Query consumer group offset.
+ *
+ * @param req
+ * @param sBuilder process result
+ */
+ public void adminQueryGroupOffSet(HttpServletRequest req,
+ StringBuilder sBuilder) {
+ // get group list
+ ProcessResult result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, false, null);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ // filter invalid groups
+ Set<String> qryGroupNameSet = new HashSet<>();
+ Set<String> inGroupNameSet = (Set<String>) result.retData1;
+ Set<String> bookedGroupSet = broker.getOffsetManager().getBookedGroups();
+ if (inGroupNameSet.isEmpty()) {
+ qryGroupNameSet = bookedGroupSet;
+ } else {
+ for (String group : inGroupNameSet) {
+ if (bookedGroupSet.contains(group)) {
+ qryGroupNameSet.add(group);
+ }
+ }
+ }
+ // get the topic set to be queried
+ result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ // get target consume group name
+ Set<String> topicSet = (Set<String>) result.retData1;
+ // verify the acquired Topic set and
+ // query the corresponding offset information
+ Map<String, Map<String, Map<Integer, Long>>> groupOffsetMaps = new HashMap<>();
+ for (String group : qryGroupNameSet) {
+ Map<String, Set<Integer>> topicPartMap =
+ validAndGetPartitions(group, topicSet);
+ Map<String, Map<Integer, Long>> groupOffsetMap =
+ broker.getOffsetManager().queryGroupOffset(group, topicPartMap);
+ groupOffsetMaps.put(group, groupOffsetMap);
+ }
+ // builder result
+ int totalCnt = 0;
+ sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
+ for (Map.Entry<String, Map<String, Map<Integer, Long>>> entry
+ : groupOffsetMaps.entrySet()) {
+ if (totalCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ Map<String, Map<Integer, Long>> topicPartMap = entry.getValue();
+ sBuilder.append("{\"groupName\":\"").append(entry.getKey())
+ .append("\",\"subInfo\":[");
+ int topicCnt = 0;
+ for (Map.Entry<String, Map<Integer, Long>> entry1 : topicPartMap.entrySet()) {
+ if (topicCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ Map<Integer, Long> partOffMap = entry1.getValue();
+ sBuilder.append("{\"topicName\":\"").append(entry1.getKey())
+ .append("\",\"offsets\":[");
+ int partCnt = 0;
+ for (Map.Entry<Integer, Long> entry2 : partOffMap.entrySet()) {
+ if (partCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder.append("{\"").append(this.broker.getTubeConfig().getBrokerId())
+ .append(TokenConstants.ATTR_SEP).append(entry1.getKey())
+ .append(TokenConstants.ATTR_SEP).append(entry2.getKey())
+ .append("\":").append(entry2.getValue()).append("}");
+ }
+ sBuilder.append("],\"partCount\":").append(partCnt).append("}");
+ }
+ sBuilder.append("],\"topicCount\":").append(topicCnt).append("}");
+ }
+ sBuilder.append("],\"totalCnt\":").append(totalCnt).append("}");
+ }
+
+ /***
+ * Clone consume group offset, clone A group's offset to other group.
+ *
+ * @param req
+ * @param sBuilder process result
+ */
+ public void adminCloneGroupOffSet(HttpServletRequest req,
+ StringBuilder sBuilder) {
+ // get source consume group name
+ ProcessResult result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.SRCGROUPNAME, true, null);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ final String srcGroupName = (String) result.retData1;
+ // get modify user
+ result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.MODIFYUSER, true, null);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ final String modifier = (String) result.retData1;
+ // get source consume group's topic set cloned to target group
+ result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ // get target consume group name
+ Set<String> srcTopicNameSet = (Set<String>) result.retData1;
+ result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.TGTCOMPSGROUPNAME, true, null);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ Set<String> tgtGroupNameSet = (Set<String>) result.retData1;
+ // check sourceGroup if existed
+ Set<String> bookedGroups = broker.getOffsetManager().getBookedGroups();
+ if (!bookedGroups.contains(srcGroupName)) {
+ WebParameterUtils.buildFailResult(sBuilder,
+ new StringBuilder(512).append("Parameter ")
+ .append(WebFieldDef.SRCGROUPNAME.name).append(": ")
+ .append(srcGroupName)
+ .append(" has not been registered on this Broker!").toString());
+ return;
+ }
+ // valid topic and get topic's partitionIds
+ Map<String, Set<Integer>> topicPartMap =
+ validAndGetPartitions(srcGroupName, srcTopicNameSet);
+ if (topicPartMap.isEmpty()) {
+ WebParameterUtils.buildFailResult(sBuilder,
+ new StringBuilder(512).append("Parameter ")
+ .append(WebFieldDef.SRCGROUPNAME.name).append(": not found ")
+ .append(srcGroupName).append(" subscribed topic set!").toString());
+ return;
+ }
+ // query offset from source group
+ Map<String, Map<Integer, Long>> srcGroupOffsets =
+ broker.getOffsetManager().queryGroupOffset(srcGroupName, topicPartMap);
+ boolean changed = broker.getOffsetManager().modifyGroupOffset(
+ broker.getStoreManager(), tgtGroupNameSet, srcGroupOffsets, modifier);
+ // builder return result
+ sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
+ }
+
+ private Map<String, Set<Integer>> validAndGetPartitions(String group, Set<String> topicSet) {
+ Map<String, Set<Integer>> topicPartMap = new HashMap<>();
+ // query stored topic set stored in memory or zk
+ if (topicSet.isEmpty()) {
+ topicSet = broker.getOffsetManager().getGroupSubInfo(group);
+ }
+ // get topic's partitionIds
+ if (topicSet != null) {
+ Map<String, TopicMetadata> topicConfigMap =
+ broker.getMetadataManager().getTopicConfigMap();
+ if (topicConfigMap != null) {
+ for (String topic : topicSet) {
+ TopicMetadata topicMetadata = topicConfigMap.get(topic);
+ if (topicMetadata != null) {
+ topicPartMap.put(topic, topicMetadata.getAllPartitionIds());
+ }
+ }
+ }
+ }
+ return topicPartMap;
+ }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index f73959e..ec97421 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -70,7 +70,15 @@ public enum WebFieldDef {
COMPSBROKERID(15, "brokerId", "brokerId", WebFieldType.COMPINT,
"Broker ID", RegexDef.TMP_NUMBER),
WITHIP(16, "withIp", "ip", WebFieldType.BOOLEAN,
- "Require return ip information, default is false");
+ "Require return ip information, default is false"),
+ WITHDIVIDE(17, "divide", "div", WebFieldType.BOOLEAN,
+ "Need to divide the returned result, default is false"),
+ SRCGROUPNAME(18, "sourceGroupName", "srcGroup", WebFieldType.STRING,
+ "Offset clone source group name", TBaseConstants.META_MAX_GROUPNAME_LENGTH,
+ RegexDef.TMP_GROUP),
+ TGTCOMPSGROUPNAME(19, "targetGroupName", "tgtGroup",
+ WebFieldType.COMPSTRING, "Offset clone target group name",
+ TBaseConstants.META_MAX_GROUPNAME_LENGTH, RegexDef.TMP_GROUP);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java
index 470aafd..dca7ca8 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java
@@ -18,6 +18,8 @@
package org.apache.tubemq.server.common.offsetstorage;
import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
public interface OffsetStorage {
@@ -25,9 +27,16 @@ public interface OffsetStorage {
void close();
OffsetStorageInfo loadOffset(final String group,
- final String topic, int brokerId, int partitionId);
+ final String topic, int partitionId);
void commitOffset(final String group,
final Collection<OffsetStorageInfo> offsetInfoList,
boolean isFailRetry);
+
+ Map<String, Map<String, Set<String>>> getZkGroupTopicBrokerInfos();
+
+ Map<String, Set<String>> getZkLocalGroupTopicInfos();
+
+ Map<Integer, Long> queryGroupOffsetInfo(String group, String topic,
+ Set<Integer> partitionIds);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
index 7c39bfd..8094151 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
@@ -19,6 +19,12 @@ package org.apache.tubemq.server.common.offsetstorage;
import java.net.BindException;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.server.broker.exception.OffsetStoreException;
import org.apache.tubemq.server.common.TServerConstants;
@@ -42,7 +48,7 @@ public class ZkOffsetStorage implements OffsetStorage {
public void uncaughtException(Thread t, Throwable e) {
if (e instanceof BindException) {
logger.error("Bind failed.", e);
- System.exit(1);
+ // System.exit(1);
}
if (e instanceof IllegalStateException
&& e.getMessage().contains("Shutdown in progress")) {
@@ -56,27 +62,33 @@ public class ZkOffsetStorage implements OffsetStorage {
private final String tubeZkRoot;
private final String consumerZkDir;
+ private final boolean isBroker;
+ private final int brokerId;
private ZKConfig zkConfig;
private ZooKeeperWatcher zkw;
+ // group-topic-brokerid
+ private final Map<String, Map<String, Set<String>>> zkGroupTopicBrokerInfos = new HashMap<>();
+ // group-topic
+ private final Map<String, Set<String>> zkLocalGroupTopicInfos = new HashMap<>();
- /**
- * Constructor of ZkOffsetStorage
- *
- * @param zkConfig the zookeeper configuration
- */
- public ZkOffsetStorage(final ZKConfig zkConfig) {
+
+ public ZkOffsetStorage(final ZKConfig zkConfig, boolean isBroker, int brokerId) {
this.zkConfig = zkConfig;
+ this.brokerId = brokerId;
+ this.isBroker = isBroker;
this.tubeZkRoot = normalize(this.zkConfig.getZkNodeRoot());
this.consumerZkDir = this.tubeZkRoot + "/consumers-v3";
try {
this.zkw = new ZooKeeperWatcher(zkConfig);
} catch (Throwable e) {
logger.error(new StringBuilder(256)
- .append("Failed to connect ZooKeeper server (")
+ .append("[ZkOffsetStorage] Failed to connect ZooKeeper server (")
.append(this.zkConfig.getZkServerAddr()).append(") !").toString(), e);
System.exit(1);
}
- logger.info("ZooKeeper Offset Storage initiated!");
+ logger.info("[ZkOffsetStorage] Get group-topic-broker info from ZooKeeper");
+ queryAllZKGroupTopicInfo();
+ logger.info("[ZkOffsetStorage] ZooKeeper Offset Storage initiated!");
}
@Override
@@ -90,6 +102,16 @@ public class ZkOffsetStorage implements OffsetStorage {
}
@Override
+ public Map<String, Map<String, Set<String>>> getZkGroupTopicBrokerInfos() {
+ return zkGroupTopicBrokerInfos;
+ }
+
+ @Override
+ public Map<String, Set<String>> getZkLocalGroupTopicInfos() {
+ return zkLocalGroupTopicInfos;
+ }
+
+ @Override
public void commitOffset(final String group,
final Collection<OffsetStorageInfo> offsetInfoList,
boolean isFailRetry) {
@@ -125,10 +147,11 @@ public class ZkOffsetStorage implements OffsetStorage {
}
@Override
- public OffsetStorageInfo loadOffset(final String group, final String topic, int brokerId, int partitionId) {
+ public OffsetStorageInfo loadOffset(final String group, final String topic, int partitionId) {
String znode = new StringBuilder(512).append(this.consumerZkDir).append("/")
.append(group).append("/offsets/").append(topic).append("/")
- .append(brokerId).append(TokenConstants.HYPHEN).append(partitionId).toString();
+ .append(brokerId).append(TokenConstants.HYPHEN)
+ .append(partitionId).toString();
String offsetZkInfo;
try {
offsetZkInfo = ZKUtil.readDataMaybeNull(this.zkw, znode);
@@ -183,6 +206,100 @@ public class ZkOffsetStorage implements OffsetStorage {
}
}
+ /**
+ * Get offset stored in zookeeper, if not found or error, set null
+ * <p/>
+ *
+ * @return partitionId--offset map info
+ */
+ @Override
+ public Map<Integer, Long> queryGroupOffsetInfo(String group, String topic,
+ Set<Integer> partitionIds) {
+ StringBuilder sBuider = new StringBuilder(512);
+ String basePath = sBuider.append(this.consumerZkDir).append("/")
+ .append(group).append("/offsets/").append(topic).append("/")
+ .append(brokerId).append(TokenConstants.HYPHEN).toString();
+ sBuider.delete(0, sBuider.length());
+ String offsetZkInfo = null;
+ Map<Integer, Long> offsetMap = new HashMap<>(partitionIds.size());
+ for (Integer partitionId : partitionIds) {
+ String offsetNode = sBuider.append(basePath).append(partitionId).toString();
+ sBuider.delete(0, sBuider.length());
+ try {
+ offsetZkInfo = ZKUtil.readDataMaybeNull(this.zkw, offsetNode);
+ if (offsetZkInfo == null) {
+ offsetMap.put(partitionId, null);
+ } else {
+ String[] offsetInfoStrs =
+ offsetZkInfo.split(TokenConstants.HYPHEN);
+ offsetMap.put(partitionId, Long.parseLong(offsetInfoStrs[1]));
+ }
+ } catch (Throwable e) {
+ offsetMap.put(partitionId, null);
+ }
+ }
+ return offsetMap;
+ }
+
+ /**
+ * Get group-topic-brokerid map info stored in zookeeper.
+ * <p/>
+ * The broker only cares about the content of its own node,
+ * so this part only queries when the node starts, and
+ * caches relevant data in the memory for finding
+ *
+ */
+ private void queryAllZKGroupTopicInfo() {
+ StringBuilder sBuider = new StringBuilder(512);
+ // get all booked groups name
+ String groupNode = sBuider.append(this.consumerZkDir).toString();
+ List<String> bookedGroups = ZKUtil.getChildren(this.zkw, groupNode);
+ sBuider.delete(0, sBuider.length());
+ if (bookedGroups != null) {
+ // get topic info by group
+ for (String group : bookedGroups) {
+ String topicNode = sBuider.append(groupNode)
+ .append("/").append(group).append("/offsets").toString();
+ List<String> consumeTopics = ZKUtil.getChildren(this.zkw, topicNode);
+ sBuider.delete(0, sBuider.length());
+ Set<String> topicSet = new HashSet<>();
+ Map<String, Set<String>> topicBrokerSet = new HashMap<>();
+ if (consumeTopics != null) {
+ // get broker info by topic
+ for (String topic : consumeTopics) {
+ String brokerNode = sBuider.append(topicNode)
+ .append("/").append(topic).toString();
+ List<String> brokerIds = ZKUtil.getChildren(this.zkw, brokerNode);
+ sBuider.delete(0, sBuider.length());
+ Set<String> brokerIdSet = new HashSet<>();
+ if (brokerIds != null) {
+ for (String idStr : brokerIds) {
+ if (idStr != null) {
+ String[] brokerPartIdStrs =
+ idStr.split(TokenConstants.HYPHEN);
+ brokerIdSet.add(brokerPartIdStrs[0]);
+ }
+ }
+ if (isBroker && brokerIdSet.contains(String.valueOf(brokerId))) {
+ topicSet.add(topic);
+ }
+ }
+ topicBrokerSet.put(topic, brokerIdSet);
+ }
+ }
+ if (!topicSet.isEmpty()) {
+ zkLocalGroupTopicInfos.put(group, topicSet);
+ }
+ zkGroupTopicBrokerInfos.put(group, topicBrokerSet);
+ }
+ }
+ logger.info(new StringBuilder(256)
+ .append("[ZkOffsetStorage] query from zookeeper, total group size = ")
+ .append(zkGroupTopicBrokerInfos.size()).append(", local group size = ")
+ .append(zkLocalGroupTopicInfos.size()).toString());
+ }
+
+
private String normalize(final String root) {
if (root.startsWith("/")) {
return this.removeLastSlash(root);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java
index 77d5436..da86191 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java
@@ -19,6 +19,8 @@ package org.apache.tubemq.server.common.offsetstorage.zookeeper;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
+
import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.server.common.fileconfig.ZKConfig;
@@ -147,6 +149,23 @@ public class ZKUtil {
return getDataInternal(zkw, znode, null, true);
}
+ /**
+ * Get the children data at the specified znode.
+ * <p/>
+ * Returns the children data. Returns null if
+ * the node does not exist or there is an exception.
+ *
+ * @param zkw zk reference
+ * @param znode path of node
+ * @return children data of the specified znode, or null
+ */
+ public static List<String> getChildren(ZooKeeperWatcher zkw, String znode) {
+ try {
+ return zkw.getRecoverableZooKeeper().getChildren(znode, false);
+ } catch (Throwable e) {
+ return null;
+ }
+ }
private static byte[] getDataInternal(ZooKeeperWatcher zkw, String znode, Stat stat,
boolean watcherSet) throws KeeperException {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index 34570de..cc6f681 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -176,7 +176,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
this.consumerEventManager = new ConsumerEventManager(consumerHolder);
this.topicPSInfoManager = new TopicPSInfoManager();
this.loadBalancer = new DefaultLoadBalancer();
- this.zkOffsetStorage = new ZkOffsetStorage(this.masterConfig.getZkConfig());
+ this.zkOffsetStorage = new ZkOffsetStorage(this.masterConfig.getZkConfig(),
+ false, TBaseConstants.META_VALUE_UNDEFINED);
this.heartbeatManager = new HeartbeatManager();
heartbeatManager.regConsumerCheckBusiness(masterConfig.getConsumerHeartbeatTimeoutMs(),
new TimeoutListener() {