You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/25 03:02:23 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-475] add the offset clone api of the consume group

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 23cff5b  [TUBEMQ-475] add the offset clone api of the consume group
23cff5b is described below

commit 23cff5b60137c262de774fa3dee8f7dcea18647a
Author: gosonzhang <go...@tencent.com>
AuthorDate: Fri Dec 25 10:40:51 2020 +0800

    [TUBEMQ-475] add the offset clone api of the consume group
---
 .../broker/metadata/BrokerMetadataManager.java     |   3 +-
 .../server/broker/metadata/MetadataManager.java    |   2 +
 .../server/broker/metadata/TopicMetadata.java      |  16 ++
 .../server/broker/offset/DefaultOffsetManager.java | 199 ++++++++++++++++-
 .../tubemq/server/broker/offset/OffsetService.java |  18 ++
 .../server/broker/web/BrokerAdminServlet.java      | 246 +++++++++++++++++++++
 .../tubemq/server/common/fielddef/WebFieldDef.java |  10 +-
 .../server/common/offsetstorage/OffsetStorage.java |  11 +-
 .../common/offsetstorage/ZkOffsetStorage.java      | 139 +++++++++++-
 .../common/offsetstorage/zookeeper/ZKUtil.java     |  19 ++
 .../org/apache/tubemq/server/master/TMaster.java   |   3 +-
 11 files changed, 647 insertions(+), 19 deletions(-)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java
index 1e2f21e..8568372 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java
@@ -143,7 +143,8 @@ public class BrokerMetadataManager implements MetadataManager {
         return topicConfigMap.get(topic);
     }
 
-    public ConcurrentHashMap<String, TopicMetadata> getTopicConfigMap() {
+    @Override
+    public Map<String, TopicMetadata> getTopicConfigMap() {
         return topicConfigMap;
     }
 
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/MetadataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/MetadataManager.java
index d638e5a..9ee9936 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/MetadataManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/MetadataManager.java
@@ -80,4 +80,6 @@ public interface MetadataManager {
     String getDefDeletePolicy();
 
     String getTopicDeletePolicy(String topic);
+
+    Map<String, TopicMetadata> getTopicConfigMap();
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
index 4ebfa4d..c582606 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
@@ -17,10 +17,15 @@
 
 package org.apache.tubemq.server.broker.metadata;
 
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.TokenConstants;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.common.TStatusConstants;
 
+
 /***
  * Topic's metadata. Contains topic name, partitions count, etc.
  */
@@ -235,6 +240,17 @@ public class TopicMetadata {
         this.unflushInterval = unflushInterval;
     }
 
+    // builder the partitionId set for each store
+    public Set<Integer> getAllPartitionIds() {
+        Set<Integer> partIds = new HashSet<>();
+        for (int i = 0; i < numTopicStores; i++) {
+            for (int j = 0; j < numPartitions; j++) {
+                partIds.add(i * TBaseConstants.META_STORE_INS_BASE + j);
+            }
+        }
+        return partIds;
+    }
+
     public int getStatusId() {
         return statusId;
     }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
index 82a758c..bdd85b3 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -17,13 +17,17 @@
 
 package org.apache.tubemq.server.broker.offset;
 
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.daemon.AbstractDaemonService;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.broker.BrokerConfig;
 import org.apache.tubemq.server.broker.msgstore.MessageStore;
+import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
 import org.apache.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.tubemq.server.common.offsetstorage.OffsetStorage;
 import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
@@ -49,7 +53,8 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
     public DefaultOffsetManager(final BrokerConfig brokerConfig) {
         super("[Offset Manager]", brokerConfig.getZkConfig().getZkCommitPeriodMs());
         this.brokerConfig = brokerConfig;
-        zkOffsetStorage = new ZkOffsetStorage(brokerConfig.getZkConfig());
+        zkOffsetStorage = new ZkOffsetStorage(brokerConfig.getZkConfig(),
+                true, brokerConfig.getBrokerId());
         super.start();
     }
 
@@ -323,6 +328,187 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
     }
 
     /***
+     * Get in-memory and in zk group set
+     *
+     * @return booked group in memory and in zk
+     */
+    @Override
+    public Set<String> getBookedGroups() {
+        Set<String> groupSet = new HashSet<>();
+        groupSet.addAll(cfmOffsetMap.keySet());
+        Map<String, Set<String>> localGroups =
+                zkOffsetStorage.getZkLocalGroupTopicInfos();
+        groupSet.addAll(localGroups.keySet());
+        return groupSet;
+    }
+
+    /***
+     * Get in-memory group set
+     *
+     * @return booked group in memory
+     */
+    public Set<String> getInMemoryGroups() {
+        Set<String> cacheGroup = new HashSet<>();
+        cacheGroup.addAll(cfmOffsetMap.keySet());
+        return cacheGroup;
+    }
+
+    /***
+     * Get in-zookeeper but not in memory's group set
+     *
+     * @return booked group in zookeeper
+     */
+    @Override
+    public Set<String> getUnusedGroupInfo() {
+        Set<String> unUsedGroups = new HashSet<>();
+        Map<String, Set<String>> localGroups =
+                zkOffsetStorage.getZkLocalGroupTopicInfos();
+        for (String groupName : localGroups.keySet()) {
+            if (!cfmOffsetMap.containsKey(groupName)) {
+                unUsedGroups.add(groupName);
+            }
+        }
+        return unUsedGroups;
+    }
+
+    /***
+     * Get the topic set subscribed by the consumer group
+     * @param group
+     * @return topic set subscribed
+     */
+    @Override
+    public Set<String> getGroupSubInfo(String group) {
+        Set<String> result = new HashSet<>();
+        Map<String, OffsetStorageInfo> topicPartOffsetMap = cfmOffsetMap.get(group);
+        if (topicPartOffsetMap == null) {
+            Map<String, Set<String>> localGroups =
+                    zkOffsetStorage.getZkLocalGroupTopicInfos();
+            result = localGroups.get(group);
+        } else {
+            for (OffsetStorageInfo storageInfo : topicPartOffsetMap.values()) {
+                result.add(storageInfo.getTopic());
+            }
+        }
+        return result;
+    }
+
+    /***
+     * Get group's offset by Specified topic-partitions
+     * @param group
+     * @param topicPartMap
+     * @return group offset info in memory or zk
+     */
+    @Override
+    public Map<String, Map<Integer, Long>> queryGroupOffset(
+            String group, Map<String, Set<Integer>> topicPartMap) {
+        Map<String, Map<Integer, Long>> result = new HashMap<>();
+        // search group from memory
+        Map<String, OffsetStorageInfo> topicPartOffsetMap = cfmOffsetMap.get(group);
+        if (topicPartOffsetMap == null) {
+            // query from zookeeper
+            for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+                Map<Integer, Long> qryResult =
+                        zkOffsetStorage.queryGroupOffsetInfo(
+                                group, entry.getKey(), entry.getValue());
+                Map<Integer, Long> offsetMap = new HashMap<>();
+                for (Map.Entry<Integer, Long> item : qryResult.entrySet()) {
+                    if (item.getValue() != null) {
+                        offsetMap.put(item.getKey(), item.getValue());
+                    }
+                }
+                if (!offsetMap.isEmpty()) {
+                    result.put(entry.getKey(), offsetMap);
+                }
+            }
+        } else {
+            // found in memory, get offset values
+            for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+                Map<Integer, Long> offsetMap = new HashMap<>();
+                for (Integer partitionId : entry.getValue()) {
+                    String offsetCacheKey =
+                            getOffsetCacheKey(entry.getKey(), partitionId);
+                    OffsetStorageInfo offsetInfo = topicPartOffsetMap.get(offsetCacheKey);
+                    if (offsetInfo != null) {
+                        offsetMap.put(partitionId, offsetInfo.getOffset());
+                    }
+                }
+                if (!offsetMap.isEmpty()) {
+                    result.put(entry.getKey(), offsetMap);
+                }
+            }
+        }
+        return result;
+    }
+
+
+    /***
+     * Reset offset.
+     *
+     * @param storeManager
+     * @param groups
+     * @param topicPartOffsetMap
+     * @param modifier
+     * @return at least one record modified
+     */
+    @Override
+    public boolean modifyGroupOffset(MessageStoreManager storeManager, Set<String> groups,
+                                     Map<String, Map<Integer, Long>> topicPartOffsetMap,
+                                     String modifier) {
+        long oldOffset = -1;
+        long reSetOffset = -1;
+        boolean changed = false;
+        MessageStore store = null;
+        StringBuilder strBuidler = new StringBuilder(512);
+        // set offset by group
+        for (String group : groups) {
+            for (Map.Entry<String, Map<Integer, Long>> entry : topicPartOffsetMap.entrySet()) {
+                Map<Integer, Long> partOffsetMap = entry.getValue();
+                if (partOffsetMap  == null) {
+                    continue;
+                }
+                // set offset
+                for (Map.Entry<Integer, Long> entry1 : partOffsetMap.entrySet()) {
+                    if (entry1.getValue() == null) {
+                        continue;
+                    }
+                    reSetOffset = entry1.getValue();
+                    // get topic store
+                    try {
+                        store = storeManager.getOrCreateMessageStore(
+                                entry.getKey(), entry1.getKey());
+                    } catch (Throwable e) {
+                        //
+                    }
+                    if (store == null) {
+                        continue;
+                    }
+                    long firstOffset = store.getIndexMinOffset();
+                    long lastOffset = store.getIndexMaxOffset();
+                    // adjust reseted offset value
+                    reSetOffset = reSetOffset < firstOffset
+                            ? firstOffset : Math.min(reSetOffset, lastOffset);
+                    String offsetCacheKey =
+                            getOffsetCacheKey(entry.getKey(), entry1.getKey());
+                    getAndResetTmpOffset(group, offsetCacheKey);
+                    OffsetStorageInfo regInfo = loadOrCreateOffset(group,
+                            entry.getKey(), entry1.getKey(), offsetCacheKey, 0);
+                    oldOffset = regInfo.getAndSetOffset(reSetOffset);
+                    changed = true;
+                    logger.info(strBuidler
+                            .append("[Offset Manager] Update offset by modifier=")
+                            .append(modifier).append(",reset offset=").append(reSetOffset)
+                            .append(",old offset=").append(oldOffset)
+                            .append(",updated offset=").append(regInfo.getOffset())
+                            .append(",group=").append(group)
+                            .append(",topic-partId=").append(offsetCacheKey).toString());
+                    strBuidler.delete(0, strBuidler.length());
+                }
+            }
+        }
+        return changed;
+    }
+
+    /***
      * Set temp offset.
      *
      * @param group
@@ -425,10 +611,10 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
         OffsetStorageInfo regInfo = regInfoMap.get(offsetCacheKey);
         if (regInfo == null) {
             OffsetStorageInfo tmpRegInfo =
-                    zkOffsetStorage.loadOffset(group, topic, brokerConfig.getBrokerId(), partitionId);
+                    zkOffsetStorage.loadOffset(group, topic, partitionId);
             if (tmpRegInfo == null) {
-                tmpRegInfo =
-                        new OffsetStorageInfo(topic, brokerConfig.getBrokerId(), partitionId, defOffset, 0);
+                tmpRegInfo = new OffsetStorageInfo(topic,
+                        brokerConfig.getBrokerId(), partitionId, defOffset, 0);
             }
             regInfo = regInfoMap.putIfAbsent(offsetCacheKey, tmpRegInfo);
             if (regInfo == null) {
@@ -443,4 +629,9 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
                 .append("-").append(partitionId).toString();
     }
 
+    private String getOffsetCacheKey(String topic, String partitionId) {
+        return new StringBuilder(256).append(topic)
+                .append("-").append(partitionId).toString();
+    }
+
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
index 066fb3b..05f0724 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
@@ -17,9 +17,13 @@
 
 package org.apache.tubemq.server.broker.offset;
 
+import java.util.Map;
+import java.util.Set;
 import org.apache.tubemq.server.broker.msgstore.MessageStore;
+import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
 import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
 
+
 /***
  * Offset manager service interface.
  */
@@ -51,4 +55,18 @@ public interface OffsetService {
 
     long getTmpOffset(final String group, final String topic, int partitionId);
 
+    Set<String> getBookedGroups();
+
+    Set<String> getInMemoryGroups();
+
+    Set<String> getUnusedGroupInfo();
+
+    Set<String> getGroupSubInfo(String group);
+
+    Map<String, Map<Integer, Long>> queryGroupOffset(
+            String group, Map<String, Set<Integer>> topicPartMap);
+
+    boolean modifyGroupOffset(MessageStoreManager storeManager, Set<String> groups,
+                              Map<String, Map<Integer, Long>> topicPartOffsetMap,
+                              String modifier);
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index 1b3f524..91bfd23 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -17,6 +17,8 @@
 
 package org.apache.tubemq.server.broker.web;
 
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -26,6 +28,7 @@ import javax.servlet.http.HttpServletRequest;
 import org.apache.tubemq.corebase.TokenConstants;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.broker.TubeBroker;
+import org.apache.tubemq.server.broker.metadata.TopicMetadata;
 import org.apache.tubemq.server.broker.msgstore.MessageStore;
 import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
 import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
@@ -71,6 +74,15 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         // get all registered methods
         innRegisterWebMethod("admin_get_methods",
                 "adminQueryAllMethods");
+        // Query all consumer groups booked on the Broker.
+        innRegisterWebMethod("admin_query_group",
+                "adminQueryBookedGroup");
+        // query consumer group's offset
+        innRegisterWebMethod("admin_query_offset",
+                "adminQueryGroupOffSet");
+        // clone consumer group's offset from source to target
+        innRegisterWebMethod("admin_clone_offset",
+                "adminCloneGroupOffSet");
     }
 
     public void adminQueryAllMethods(HttpServletRequest req,
@@ -560,5 +572,239 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         sBuilder.append("],\"totalCnt\":").append(totalCnt).append("}");
     }
 
+    /***
+     * Query all consumer groups booked on the Broker.
+     *
+     * @param req
+     * @param sBuilder process result
+     */
+    public void adminQueryBookedGroup(HttpServletRequest req,
+                                      StringBuilder sBuilder) {
+        // get group list
+        ProcessResult result = WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.WITHDIVIDE, false, false);
+        if (!result.success) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        boolean withDivide = (boolean) result.retData1;
+        // get offset service
+        int itemCnt = 0;
+        int totalCnt = 0;
+        OffsetService offsetService = broker.getOffsetManager();
+        sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
+        if (withDivide) {
+            // query in-memory group name set
+            Set<String> onlineGroups = offsetService.getInMemoryGroups();
+            sBuilder.append("{\"type\":\"in-cache\",\"groupName\":[");
+            for (String group : onlineGroups) {
+                if (itemCnt++ > 0) {
+                    sBuilder.append(",");
+                }
+                sBuilder.append("\"").append(group).append("\"");
+            }
+            sBuilder.append("],\"groupCount\":").append(itemCnt).append("}");
+            totalCnt++;
+            sBuilder.append(",");
+            // query in-zk group name set
+            itemCnt = 0;
+            Set<String> onZKGroup = offsetService.getUnusedGroupInfo();
+            sBuilder.append("{\"type\":\"in-zk\",\"groupName\":[");
+            for (String group : onZKGroup) {
+                if (itemCnt++ > 0) {
+                    sBuilder.append(",");
+                }
+                sBuilder.append("\"").append(group).append("\"");
+            }
+            sBuilder.append("],\"groupCount\":").append(itemCnt).append("}");
+            totalCnt++;
+        } else {
+            Set<String> allGroups = offsetService.getBookedGroups();
+            sBuilder.append("{\"type\":\"all\",\"groupName\":[");
+            for (String group : allGroups) {
+                if (itemCnt++ > 0) {
+                    sBuilder.append(",");
+                }
+                sBuilder.append("\"").append(group).append("\"");
+            }
+            sBuilder.append("],\"groupCount\":").append(itemCnt).append("}");
+            totalCnt++;
+        }
+        sBuilder.append("],\"dataCount\":").append(totalCnt).append("}");
+    }
+
+    /***
+     * Query consumer group offset.
+     *
+     * @param req
+     * @param sBuilder process result
+     */
+    public void adminQueryGroupOffSet(HttpServletRequest req,
+                                      StringBuilder sBuilder) {
+        // get group list
+        ProcessResult result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSGROUPNAME, false, null);
+        if (!result.success) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        // filter invalid groups
+        Set<String> qryGroupNameSet = new HashSet<>();
+        Set<String> inGroupNameSet = (Set<String>) result.retData1;
+        Set<String> bookedGroupSet = broker.getOffsetManager().getBookedGroups();
+        if (inGroupNameSet.isEmpty()) {
+            qryGroupNameSet = bookedGroupSet;
+        } else {
+            for (String group : inGroupNameSet) {
+                if (bookedGroupSet.contains(group)) {
+                    qryGroupNameSet.add(group);
+                }
+            }
+        }
+        // get the topic set to be queried
+        result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null);
+        if (!result.success) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        // get target consume group name
+        Set<String> topicSet = (Set<String>) result.retData1;
+        // verify the acquired Topic set and
+        //   query the corresponding offset information
+        Map<String, Map<String, Map<Integer, Long>>> groupOffsetMaps = new HashMap<>();
+        for (String group : qryGroupNameSet) {
+            Map<String, Set<Integer>> topicPartMap =
+                    validAndGetPartitions(group, topicSet);
+            Map<String, Map<Integer, Long>> groupOffsetMap =
+                    broker.getOffsetManager().queryGroupOffset(group, topicPartMap);
+            groupOffsetMaps.put(group, groupOffsetMap);
+        }
+        // builder result
+        int totalCnt = 0;
+        sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
+        for (Map.Entry<String, Map<String, Map<Integer, Long>>> entry
+                : groupOffsetMaps.entrySet()) {
+            if (totalCnt++ > 0) {
+                sBuilder.append(",");
+            }
+            Map<String, Map<Integer, Long>> topicPartMap = entry.getValue();
+            sBuilder.append("{\"groupName\":\"").append(entry.getKey())
+                    .append("\",\"subInfo\":[");
+            int topicCnt = 0;
+            for (Map.Entry<String, Map<Integer, Long>> entry1 : topicPartMap.entrySet()) {
+                if (topicCnt++ > 0) {
+                    sBuilder.append(",");
+                }
+                Map<Integer, Long> partOffMap = entry1.getValue();
+                sBuilder.append("{\"topicName\":\"").append(entry1.getKey())
+                        .append("\",\"offsets\":[");
+                int partCnt = 0;
+                for (Map.Entry<Integer, Long> entry2 : partOffMap.entrySet()) {
+                    if (partCnt++ > 0) {
+                        sBuilder.append(",");
+                    }
+                    sBuilder.append("{\"").append(this.broker.getTubeConfig().getBrokerId())
+                            .append(TokenConstants.ATTR_SEP).append(entry1.getKey())
+                            .append(TokenConstants.ATTR_SEP).append(entry2.getKey())
+                            .append("\":").append(entry2.getValue()).append("}");
+                }
+                sBuilder.append("],\"partCount\":").append(partCnt).append("}");
+            }
+            sBuilder.append("],\"topicCount\":").append(topicCnt).append("}");
+        }
+        sBuilder.append("],\"totalCnt\":").append(totalCnt).append("}");
+    }
+
+    /***
+     * Clone consume group offset, clone A group's offset to other group.
+     *
+     * @param req
+     * @param sBuilder process result
+     */
+    public void adminCloneGroupOffSet(HttpServletRequest req,
+                                      StringBuilder sBuilder) {
+        // get source consume group name
+        ProcessResult result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.SRCGROUPNAME, true, null);
+        if (!result.success) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        final String srcGroupName = (String) result.retData1;
+        // get modify user
+        result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.MODIFYUSER, true, null);
+        if (!result.success) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        final String modifier = (String) result.retData1;
+        // get source consume group's topic set cloned to target group
+        result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null);
+        if (!result.success) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        // get target consume group name
+        Set<String> srcTopicNameSet = (Set<String>) result.retData1;
+        result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.TGTCOMPSGROUPNAME, true, null);
+        if (!result.success) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        Set<String> tgtGroupNameSet = (Set<String>) result.retData1;
+        // check sourceGroup if existed
+        Set<String> bookedGroups = broker.getOffsetManager().getBookedGroups();
+        if (!bookedGroups.contains(srcGroupName)) {
+            WebParameterUtils.buildFailResult(sBuilder,
+                    new StringBuilder(512).append("Parameter ")
+                            .append(WebFieldDef.SRCGROUPNAME.name).append(": ")
+                            .append(srcGroupName)
+                            .append(" has not been registered on this Broker!").toString());
+            return;
+        }
+        // valid topic and get topic's partitionIds
+        Map<String, Set<Integer>> topicPartMap =
+                validAndGetPartitions(srcGroupName, srcTopicNameSet);
+        if (topicPartMap.isEmpty()) {
+            WebParameterUtils.buildFailResult(sBuilder,
+                    new StringBuilder(512).append("Parameter ")
+                            .append(WebFieldDef.SRCGROUPNAME.name).append(": not found ")
+                            .append(srcGroupName).append(" subscribed topic set!").toString());
+            return;
+        }
+        // query offset from source group
+        Map<String, Map<Integer, Long>> srcGroupOffsets =
+                broker.getOffsetManager().queryGroupOffset(srcGroupName, topicPartMap);
+        boolean changed = broker.getOffsetManager().modifyGroupOffset(
+                broker.getStoreManager(), tgtGroupNameSet, srcGroupOffsets, modifier);
+        // builder return result
+        sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
+    }
+
+    private Map<String, Set<Integer>> validAndGetPartitions(String group, Set<String> topicSet) {
+        Map<String, Set<Integer>> topicPartMap = new HashMap<>();
+        // query stored topic set stored in memory or zk
+        if (topicSet.isEmpty()) {
+            topicSet = broker.getOffsetManager().getGroupSubInfo(group);
+        }
+        // get topic's partitionIds
+        if (topicSet != null) {
+            Map<String, TopicMetadata> topicConfigMap =
+                    broker.getMetadataManager().getTopicConfigMap();
+            if (topicConfigMap != null) {
+                for (String topic : topicSet) {
+                    TopicMetadata topicMetadata = topicConfigMap.get(topic);
+                    if (topicMetadata != null) {
+                        topicPartMap.put(topic, topicMetadata.getAllPartitionIds());
+                    }
+                }
+            }
+        }
+        return topicPartMap;
+    }
 
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index f73959e..ec97421 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -70,7 +70,15 @@ public enum WebFieldDef {
     COMPSBROKERID(15, "brokerId", "brokerId", WebFieldType.COMPINT,
             "Broker ID", RegexDef.TMP_NUMBER),
     WITHIP(16, "withIp", "ip", WebFieldType.BOOLEAN,
-            "Require return ip information, default is false");
+            "Require return ip information, default is false"),
+    WITHDIVIDE(17, "divide", "div", WebFieldType.BOOLEAN,
+            "Need to divide the returned result, default is false"),
+    SRCGROUPNAME(18, "sourceGroupName", "srcGroup", WebFieldType.STRING,
+            "Offset clone source group name", TBaseConstants.META_MAX_GROUPNAME_LENGTH,
+            RegexDef.TMP_GROUP),
+    TGTCOMPSGROUPNAME(19, "targetGroupName", "tgtGroup",
+            WebFieldType.COMPSTRING, "Offset clone target group name",
+            TBaseConstants.META_MAX_GROUPNAME_LENGTH, RegexDef.TMP_GROUP);
 
 
 
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java
index 470aafd..dca7ca8 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java
@@ -18,6 +18,8 @@
 package org.apache.tubemq.server.common.offsetstorage;
 
 import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
 
 
 public interface OffsetStorage {
@@ -25,9 +27,16 @@ public interface OffsetStorage {
     void close();
 
     OffsetStorageInfo loadOffset(final String group,
-                                 final String topic, int brokerId, int partitionId);
+                                 final String topic, int partitionId);
 
     void commitOffset(final String group,
                       final Collection<OffsetStorageInfo> offsetInfoList,
                       boolean isFailRetry);
+
+    Map<String, Map<String, Set<String>>> getZkGroupTopicBrokerInfos();
+
+    Map<String, Set<String>> getZkLocalGroupTopicInfos();
+
+    Map<Integer, Long> queryGroupOffsetInfo(String group, String topic,
+                                            Set<Integer> partitionIds);
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
index 7c39bfd..8094151 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
@@ -19,6 +19,12 @@ package org.apache.tubemq.server.common.offsetstorage;
 
 import java.net.BindException;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.tubemq.corebase.TokenConstants;
 import org.apache.tubemq.server.broker.exception.OffsetStoreException;
 import org.apache.tubemq.server.common.TServerConstants;
@@ -42,7 +48,7 @@ public class ZkOffsetStorage implements OffsetStorage {
                 public void uncaughtException(Thread t, Throwable e) {
                     if (e instanceof BindException) {
                         logger.error("Bind failed.", e);
-                        System.exit(1);
+                        // System.exit(1);
                     }
                     if (e instanceof IllegalStateException
                             && e.getMessage().contains("Shutdown in progress")) {
@@ -56,27 +62,33 @@ public class ZkOffsetStorage implements OffsetStorage {
 
     private final String tubeZkRoot;
     private final String consumerZkDir;
+    private final boolean isBroker;
+    private final int brokerId;
     private ZKConfig zkConfig;
     private ZooKeeperWatcher zkw;
+    // group-topic-brokerid
+    private final Map<String, Map<String, Set<String>>> zkGroupTopicBrokerInfos = new HashMap<>();
+    // group-topic
+    private final Map<String, Set<String>> zkLocalGroupTopicInfos = new HashMap<>();
 
-    /**
-     * Constructor of ZkOffsetStorage
-     *
-     * @param zkConfig the zookeeper configuration
-     */
-    public ZkOffsetStorage(final ZKConfig zkConfig) {
+
+    public ZkOffsetStorage(final ZKConfig zkConfig, boolean isBroker, int brokerId) {
         this.zkConfig = zkConfig;
+        this.brokerId = brokerId;
+        this.isBroker = isBroker;
         this.tubeZkRoot = normalize(this.zkConfig.getZkNodeRoot());
         this.consumerZkDir = this.tubeZkRoot + "/consumers-v3";
         try {
             this.zkw = new ZooKeeperWatcher(zkConfig);
         } catch (Throwable e) {
             logger.error(new StringBuilder(256)
-                    .append("Failed to connect ZooKeeper server (")
+                    .append("[ZkOffsetStorage] Failed to connect ZooKeeper server (")
                     .append(this.zkConfig.getZkServerAddr()).append(") !").toString(), e);
             System.exit(1);
         }
-        logger.info("ZooKeeper Offset Storage initiated!");
+        logger.info("[ZkOffsetStorage] Get group-topic-broker info from ZooKeeper");
+        queryAllZKGroupTopicInfo();
+        logger.info("[ZkOffsetStorage] ZooKeeper Offset Storage initiated!");
     }
 
     @Override
@@ -90,6 +102,16 @@ public class ZkOffsetStorage implements OffsetStorage {
     }
 
     @Override
+    public Map<String, Map<String, Set<String>>> getZkGroupTopicBrokerInfos() {
+        return zkGroupTopicBrokerInfos;
+    }
+
+    @Override
+    public Map<String, Set<String>> getZkLocalGroupTopicInfos() {
+        return zkLocalGroupTopicInfos;
+    }
+
+    @Override
     public void commitOffset(final String group,
                              final Collection<OffsetStorageInfo> offsetInfoList,
                              boolean isFailRetry) {
@@ -125,10 +147,11 @@ public class ZkOffsetStorage implements OffsetStorage {
     }
 
     @Override
-    public OffsetStorageInfo loadOffset(final String group, final String topic, int brokerId, int partitionId) {
+    public OffsetStorageInfo loadOffset(final String group, final String topic, int partitionId) {
         String znode = new StringBuilder(512).append(this.consumerZkDir).append("/")
                 .append(group).append("/offsets/").append(topic).append("/")
-                .append(brokerId).append(TokenConstants.HYPHEN).append(partitionId).toString();
+                .append(brokerId).append(TokenConstants.HYPHEN)
+                .append(partitionId).toString();
         String offsetZkInfo;
         try {
             offsetZkInfo = ZKUtil.readDataMaybeNull(this.zkw, znode);
@@ -183,6 +206,100 @@ public class ZkOffsetStorage implements OffsetStorage {
         }
     }
 
+    /**
+     * Get offset stored in zookeeper, if not found or error, set null
+     * <p/>
+     *
+     * @return partitionId--offset map info
+     */
+    @Override
+    public Map<Integer, Long> queryGroupOffsetInfo(String group, String topic,
+                                                  Set<Integer> partitionIds) {
+        StringBuilder sBuider = new StringBuilder(512);
+        String basePath = sBuider.append(this.consumerZkDir).append("/")
+                .append(group).append("/offsets/").append(topic).append("/")
+                .append(brokerId).append(TokenConstants.HYPHEN).toString();
+        sBuider.delete(0, sBuider.length());
+        String offsetZkInfo = null;
+        Map<Integer, Long> offsetMap = new HashMap<>(partitionIds.size());
+        for (Integer partitionId : partitionIds) {
+            String offsetNode = sBuider.append(basePath).append(partitionId).toString();
+            sBuider.delete(0, sBuider.length());
+            try {
+                offsetZkInfo = ZKUtil.readDataMaybeNull(this.zkw, offsetNode);
+                if (offsetZkInfo == null) {
+                    offsetMap.put(partitionId, null);
+                } else {
+                    String[] offsetInfoStrs =
+                            offsetZkInfo.split(TokenConstants.HYPHEN);
+                    offsetMap.put(partitionId, Long.parseLong(offsetInfoStrs[1]));
+                }
+            } catch (Throwable e) {
+                offsetMap.put(partitionId, null);
+            }
+        }
+        return offsetMap;
+    }
+
+    /**
+     * Get group-topic-brokerid map info stored in zookeeper.
+     * <p/>
+     * The broker only cares about the content of its own node,
+     * so this part only queries when the node starts, and
+     * caches relevant data in the memory for finding
+     *
+     */
+    private void queryAllZKGroupTopicInfo() {
+        StringBuilder sBuider = new StringBuilder(512);
+        // get all booked groups name
+        String groupNode = sBuider.append(this.consumerZkDir).toString();
+        List<String> bookedGroups = ZKUtil.getChildren(this.zkw, groupNode);
+        sBuider.delete(0, sBuider.length());
+        if (bookedGroups != null) {
+            // get topic info by group
+            for (String group : bookedGroups) {
+                String topicNode = sBuider.append(groupNode)
+                        .append("/").append(group).append("/offsets").toString();
+                List<String> consumeTopics = ZKUtil.getChildren(this.zkw, topicNode);
+                sBuider.delete(0, sBuider.length());
+                Set<String> topicSet = new HashSet<>();
+                Map<String, Set<String>> topicBrokerSet = new HashMap<>();
+                if (consumeTopics != null) {
+                    // get broker info by topic
+                    for (String topic : consumeTopics) {
+                        String brokerNode = sBuider.append(topicNode)
+                                .append("/").append(topic).toString();
+                        List<String> brokerIds = ZKUtil.getChildren(this.zkw, brokerNode);
+                        sBuider.delete(0, sBuider.length());
+                        Set<String> brokerIdSet = new HashSet<>();
+                        if (brokerIds != null) {
+                            for (String idStr : brokerIds) {
+                                if (idStr != null) {
+                                    String[] brokerPartIdStrs =
+                                            idStr.split(TokenConstants.HYPHEN);
+                                    brokerIdSet.add(brokerPartIdStrs[0]);
+                                }
+                            }
+                            if (isBroker && brokerIdSet.contains(String.valueOf(brokerId))) {
+                                topicSet.add(topic);
+                            }
+                        }
+                        topicBrokerSet.put(topic, brokerIdSet);
+                    }
+                }
+                if (!topicSet.isEmpty()) {
+                    zkLocalGroupTopicInfos.put(group, topicSet);
+                }
+                zkGroupTopicBrokerInfos.put(group, topicBrokerSet);
+            }
+        }
+        logger.info(new StringBuilder(256)
+                .append("[ZkOffsetStorage] query from zookeeper, total group size = ")
+                .append(zkGroupTopicBrokerInfos.size()).append(", local group size = ")
+                .append(zkLocalGroupTopicInfos.size()).toString());
+    }
+
+
     private String normalize(final String root) {
         if (root.startsWith("/")) {
             return this.removeLastSlash(root);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java
index 77d5436..da86191 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java
@@ -19,6 +19,8 @@ package org.apache.tubemq.server.common.offsetstorage.zookeeper;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.commons.codec.binary.StringUtils;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.server.common.fileconfig.ZKConfig;
@@ -147,6 +149,23 @@ public class ZKUtil {
         return getDataInternal(zkw, znode, null, true);
     }
 
+    /**
+     * Get the children data at the specified znode.
+     * <p/>
+     * Returns the children data. Returns null if
+     * the node does not exist or there is an exception.
+     *
+     * @param zkw   zk reference
+     * @param znode path of node
+     * @return children data of the specified znode, or null
+     */
+    public static List<String> getChildren(ZooKeeperWatcher zkw, String znode) {
+        try {
+            return zkw.getRecoverableZooKeeper().getChildren(znode, false);
+        } catch (Throwable e) {
+            return null;
+        }
+    }
 
     private static byte[] getDataInternal(ZooKeeperWatcher zkw, String znode, Stat stat,
                                           boolean watcherSet) throws KeeperException {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index 34570de..cc6f681 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -176,7 +176,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         this.consumerEventManager = new ConsumerEventManager(consumerHolder);
         this.topicPSInfoManager = new TopicPSInfoManager();
         this.loadBalancer = new DefaultLoadBalancer();
-        this.zkOffsetStorage = new ZkOffsetStorage(this.masterConfig.getZkConfig());
+        this.zkOffsetStorage = new ZkOffsetStorage(this.masterConfig.getZkConfig(),
+                false, TBaseConstants.META_VALUE_UNDEFINED);
         this.heartbeatManager = new HeartbeatManager();
         heartbeatManager.regConsumerCheckBusiness(masterConfig.getConsumerHeartbeatTimeoutMs(),
                 new TimeoutListener() {