You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/29 10:07:20 UTC

[incubator-tubemq] 22/49: [TUBEMQ-482] Add offset query api

This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit d8580f21b44e4866b5f63352afd7664b7e67bfe4
Author: gosonzhang <go...@tencent.com>
AuthorDate: Fri Dec 25 16:37:14 2020 +0800

    [TUBEMQ-482] Add offset query api
---
 .../server/broker/metadata/TopicMetadata.java      | 29 ++++++++
 .../server/broker/msgstore/MessageStore.java       | 11 ++-
 .../broker/msgstore/MessageStoreManager.java       | 62 +++++++++++++++-
 .../server/broker/msgstore/StoreService.java       |  6 ++
 .../server/broker/offset/DefaultOffsetManager.java | 46 +++++++-----
 .../tubemq/server/broker/offset/OffsetService.java |  6 +-
 .../server/broker/utils/GroupOffsetInfo.java       | 85 ++++++++++++++++++++++
 .../server/broker/utils/TopicPubStoreInfo.java     | 45 ++++++++++++
 .../server/broker/web/BrokerAdminServlet.java      | 79 +++++++++++++++-----
 9 files changed, 323 insertions(+), 46 deletions(-)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
index c582606..800254b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
@@ -17,7 +17,9 @@
 
 package org.apache.tubemq.server.broker.metadata;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.tubemq.corebase.TBaseConstants;
@@ -251,6 +253,33 @@ public class TopicMetadata {
         return partIds;
     }
 
+    // builder the partitionId set for each store
+    public Map<Integer, Set<Integer>> getStorePartIdMap() {
+        Map<Integer, Set<Integer>> storePartIds = new HashMap<>();
+        for (int i = 0; i < numTopicStores; i++) {
+            Set<Integer> partIds = new HashSet<>();
+            for (int j = 0; j < numPartitions; j++) {
+                partIds.add(i * TBaseConstants.META_STORE_INS_BASE + j);
+            }
+            storePartIds.put(i, partIds);
+        }
+        return storePartIds;
+    }
+
+    public int getStoreIdByPartitionId(int partitionId) {
+        return partitionId % TBaseConstants.META_STORE_INS_BASE;
+    }
+
+    public Set<Integer> getPartIdsByStoreId(int storeId) {
+        Set<Integer> partIds = new HashSet<>();
+        if (storeId >= 0 && storeId < numTopicStores) {
+            for (int i = 0; i < numPartitions; i++) {
+                partIds.add(storeId * TBaseConstants.META_STORE_INS_BASE + i);
+            }
+        }
+        return partIds;
+    }
+
     public int getStatusId() {
         return statusId;
     }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
index a827723..e610a8b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
@@ -133,9 +133,9 @@ public class MessageStore implements Closeable {
         this.writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl();
         int tmpIndexReadCnt = tubeConfig.getIndexTransCount() * partitionNum;
         memMaxIndexReadCnt.set(tmpIndexReadCnt <= 6000
-                ? 6000 : (tmpIndexReadCnt >= 10000 ? 10000 : tmpIndexReadCnt));
+                ? 6000 : (Math.min(tmpIndexReadCnt, 10000)));
         fileMaxIndexReadCnt.set(tmpIndexReadCnt < 8000
-                ? 8000 : (tmpIndexReadCnt >= 13500 ? 13500 : tmpIndexReadCnt));
+                ? 8000 : (Math.min(tmpIndexReadCnt, 13500)));
         memMaxFilterIndexReadCnt.set(memMaxIndexReadCnt.get() * 2);
         fileMaxFilterIndexReadCnt.set(fileMaxIndexReadCnt.get() * 3);
         fileLowReqMaxFilterIndexReadCnt.set(fileMaxFilterIndexReadCnt.get() * 10);
@@ -250,8 +250,7 @@ public class MessageStore implements Closeable {
             }
         }
         // before read from file, adjust request's offset.
-        long reqNewOffset = requestOffset < this.msgFileStore.getIndexMinOffset()
-                ? this.msgFileStore.getIndexMinOffset() : requestOffset;
+        long reqNewOffset = Math.max(requestOffset, this.msgFileStore.getIndexMinOffset());
         if (reqSwitch <= 1 && reqNewOffset >= getFileIndexMaxOffset()) {
             return new GetMessageResult(false, TErrCodeConstants.NOT_FOUND,
                     reqNewOffset, 0, "current offset is exceed max file offset");
@@ -409,9 +408,9 @@ public class MessageStore implements Closeable {
         maxFileValidDurMs.set(parseDeletePolicy(topicMetadata.getDeletePolicy()));
         int tmpIndexReadCnt = tubeConfig.getIndexTransCount() * partitionNum;
         memMaxIndexReadCnt.set(tmpIndexReadCnt <= 6000
-                ? 6000 : (tmpIndexReadCnt >= 10000 ? 10000 : tmpIndexReadCnt));
+                ? 6000 : (Math.min(tmpIndexReadCnt, 10000)));
         fileMaxIndexReadCnt.set(tmpIndexReadCnt < 8000
-                ? 8000 : (tmpIndexReadCnt >= 13500 ? 13500 : tmpIndexReadCnt));
+                ? 8000 : (Math.min(tmpIndexReadCnt, 13500)));
         memMaxFilterIndexReadCnt.set(memMaxIndexReadCnt.get() * 2);
         fileMaxFilterIndexReadCnt.set(fileMaxIndexReadCnt.get() * 3);
         fileLowReqMaxFilterIndexReadCnt.set(fileMaxFilterIndexReadCnt.get() * 10);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
index 6275f3a..2d55ba1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -51,6 +52,7 @@ import org.apache.tubemq.server.broker.metadata.TopicMetadata;
 import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult;
 import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
 import org.apache.tubemq.server.broker.utils.DataStoreUtils;
+import org.apache.tubemq.server.broker.utils.TopicPubStoreInfo;
 import org.apache.tubemq.server.common.TStatusConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,8 +92,7 @@ public class MessageStoreManager implements StoreService {
         this.metadataManager = this.tubeBroker.getMetadataManager();
         this.isRemovingTopic.set(false);
         this.maxMsgTransferSize =
-                tubeConfig.getTransferSize() > DataStoreUtils.MAX_MSG_TRANSFER_SIZE
-                        ? DataStoreUtils.MAX_MSG_TRANSFER_SIZE : tubeConfig.getTransferSize();
+                Math.min(tubeConfig.getTransferSize(), DataStoreUtils.MAX_MSG_TRANSFER_SIZE);
         this.metadataManager.addPropertyChangeListener("topicConfigMap", new PropertyChangeListener() {
             @Override
             public void propertyChange(final PropertyChangeEvent evt) {
@@ -385,6 +386,63 @@ public class MessageStoreManager implements StoreService {
         return Collections.unmodifiableMap(this.dataStores);
     }
 
+    /***
+     * Query topic's publish info.
+     *
+     * @param topicSet query's topic set
+     *
+     * @return the topic's offset info
+     */
+    @Override
+    public Map<String, Map<Integer, TopicPubStoreInfo>> getTopicPublishInfos(
+            Set<String> topicSet) {
+        MessageStore store = null;
+        TopicMetadata topicMetadata = null;
+        Set<String> qryTopicSet = new HashSet<>();
+        Map<String, Map<Integer, TopicPubStoreInfo>> topicPubStoreInfoMap = new HashMap<>();
+        Map<String, TopicMetadata> confTopicInfo = metadataManager.getTopicConfigMap();
+        if (topicSet == null || topicSet.isEmpty()) {
+            qryTopicSet.addAll(confTopicInfo.keySet());
+        } else {
+            for (String topic : topicSet) {
+                if (confTopicInfo.containsKey(topic)) {
+                    qryTopicSet.add(topic);
+                }
+            }
+        }
+        if (qryTopicSet.isEmpty()) {
+            return topicPubStoreInfoMap;
+        }
+        for (String topic : qryTopicSet) {
+            topicMetadata = confTopicInfo.get(topic);
+            if (topicMetadata == null) {
+                continue;
+            }
+            Map<Integer, MessageStore> storeMap = dataStores.get(topic);
+            if (storeMap == null) {
+                continue;
+            }
+            Map<Integer, TopicPubStoreInfo> storeInfoMap = new HashMap<>();
+            for (Map.Entry<Integer, MessageStore> entry : storeMap.entrySet()) {
+                if (entry == null
+                        || entry.getKey() == null
+                        || entry.getValue() == null) {
+                    continue;
+                }
+                store = entry.getValue();
+                for (Integer partitionId : topicMetadata.getPartIdsByStoreId(entry.getKey())) {
+                    TopicPubStoreInfo storeInfo =
+                            new TopicPubStoreInfo(topic, entry.getKey(), partitionId,
+                                    store.getIndexMinOffset(), store.getIndexMaxOffset(),
+                                    store.getDataMinOffset(), store.getDataMaxOffset());
+                    storeInfoMap.put(partitionId, storeInfo);
+                }
+            }
+            topicPubStoreInfoMap.put(topic, storeInfoMap);
+        }
+        return topicPubStoreInfoMap;
+    }
+
     private Set<File> getLogDirSet(final BrokerConfig tubeConfig) throws IOException {
         TopicMetadata topicMetadata = null;
         final Set<String> paths = new HashSet<>();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/StoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/StoreService.java
index d5f7f32..184fb4d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/StoreService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/StoreService.java
@@ -20,6 +20,10 @@ package org.apache.tubemq.server.broker.msgstore;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.tubemq.server.broker.utils.TopicPubStoreInfo;
+
 
 /***
  * Store service interface.
@@ -35,4 +39,6 @@ public interface StoreService {
 
     MessageStore getOrCreateMessageStore(final String topic,
                                          final int partition) throws Throwable;
+
+    Map<String, Map<Integer, TopicPubStoreInfo>> getTopicPublishInfos(Set<String> topicSet);
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
index bdd85b3..df3afc4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.daemon.AbstractDaemonService;
 import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
 import org.apache.tubemq.server.broker.BrokerConfig;
 import org.apache.tubemq.server.broker.msgstore.MessageStore;
 import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
@@ -399,22 +400,26 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
      * @return group offset info in memory or zk
      */
     @Override
-    public Map<String, Map<Integer, Long>> queryGroupOffset(
+    public Map<String, Map<Integer, Tuple2<Long, Long>>> queryGroupOffset(
             String group, Map<String, Set<Integer>> topicPartMap) {
-        Map<String, Map<Integer, Long>> result = new HashMap<>();
+        Map<String, Map<Integer, Tuple2<Long, Long>>> result = new HashMap<>();
         // search group from memory
         Map<String, OffsetStorageInfo> topicPartOffsetMap = cfmOffsetMap.get(group);
         if (topicPartOffsetMap == null) {
             // query from zookeeper
             for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+                if (entry == null || entry.getKey() == null || entry.getValue() == null) {
+                    continue;
+                }
                 Map<Integer, Long> qryResult =
-                        zkOffsetStorage.queryGroupOffsetInfo(
-                                group, entry.getKey(), entry.getValue());
-                Map<Integer, Long> offsetMap = new HashMap<>();
+                        zkOffsetStorage.queryGroupOffsetInfo(group,
+                                entry.getKey(), entry.getValue());
+                Map<Integer, Tuple2<Long, Long>> offsetMap = new HashMap<>();
                 for (Map.Entry<Integer, Long> item : qryResult.entrySet()) {
-                    if (item.getValue() != null) {
-                        offsetMap.put(item.getKey(), item.getValue());
+                    if (item == null || item.getKey() == null || item.getValue() == null)  {
+                        continue;
                     }
+                    offsetMap.put(item.getKey(), new Tuple2<>(item.getValue(), 0L));
                 }
                 if (!offsetMap.isEmpty()) {
                     result.put(entry.getKey(), offsetMap);
@@ -422,14 +427,18 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
             }
         } else {
             // found in memory, get offset values
+            Map<String, Long> tmpPartOffsetMap = tmpOffsetMap.get(group);
             for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
-                Map<Integer, Long> offsetMap = new HashMap<>();
+                Map<Integer, Tuple2<Long, Long>> offsetMap = new HashMap<>();
                 for (Integer partitionId : entry.getValue()) {
                     String offsetCacheKey =
                             getOffsetCacheKey(entry.getKey(), partitionId);
                     OffsetStorageInfo offsetInfo = topicPartOffsetMap.get(offsetCacheKey);
+                    Long tmpOffset = tmpPartOffsetMap.get(offsetCacheKey);
                     if (offsetInfo != null) {
-                        offsetMap.put(partitionId, offsetInfo.getOffset());
+                        offsetMap.put(partitionId,
+                                new Tuple2<>(offsetInfo.getOffset(),
+                                        (tmpOffset == null ? 0 : tmpOffset)));
                     }
                 }
                 if (!offsetMap.isEmpty()) {
@@ -451,9 +460,9 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
      * @return at least one record modified
      */
     @Override
-    public boolean modifyGroupOffset(MessageStoreManager storeManager, Set<String> groups,
-                                     Map<String, Map<Integer, Long>> topicPartOffsetMap,
-                                     String modifier) {
+    public boolean modifyGroupOffset(
+            MessageStoreManager storeManager, Set<String> groups,
+            Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap, String modifier) {
         long oldOffset = -1;
         long reSetOffset = -1;
         boolean changed = false;
@@ -461,17 +470,18 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
         StringBuilder strBuidler = new StringBuilder(512);
         // set offset by group
         for (String group : groups) {
-            for (Map.Entry<String, Map<Integer, Long>> entry : topicPartOffsetMap.entrySet()) {
-                Map<Integer, Long> partOffsetMap = entry.getValue();
+            for (Map.Entry<String, Map<Integer, Tuple2<Long, Long>>> entry
+                    : topicPartOffsetMap.entrySet()) {
+                Map<Integer, Tuple2<Long, Long>> partOffsetMap = entry.getValue();
                 if (partOffsetMap  == null) {
                     continue;
                 }
                 // set offset
-                for (Map.Entry<Integer, Long> entry1 : partOffsetMap.entrySet()) {
+                for (Map.Entry<Integer, Tuple2<Long, Long>> entry1 : partOffsetMap.entrySet()) {
                     if (entry1.getValue() == null) {
                         continue;
                     }
-                    reSetOffset = entry1.getValue();
+                    Tuple2<Long, Long> offsetTuple = entry1.getValue();
                     // get topic store
                     try {
                         store = storeManager.getOrCreateMessageStore(
@@ -485,8 +495,8 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
                     long firstOffset = store.getIndexMinOffset();
                     long lastOffset = store.getIndexMaxOffset();
                     // adjust reseted offset value
-                    reSetOffset = reSetOffset < firstOffset
-                            ? firstOffset : Math.min(reSetOffset, lastOffset);
+                    reSetOffset = offsetTuple.f0 < firstOffset
+                            ? firstOffset : Math.min(offsetTuple.f0, lastOffset);
                     String offsetCacheKey =
                             getOffsetCacheKey(entry.getKey(), entry1.getKey());
                     getAndResetTmpOffset(group, offsetCacheKey);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
index 05f0724..fcebdfc 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
@@ -19,6 +19,8 @@ package org.apache.tubemq.server.broker.offset;
 
 import java.util.Map;
 import java.util.Set;
+
+import org.apache.tubemq.corebase.utils.Tuple2;
 import org.apache.tubemq.server.broker.msgstore.MessageStore;
 import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
 import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
@@ -63,10 +65,10 @@ public interface OffsetService {
 
     Set<String> getGroupSubInfo(String group);
 
-    Map<String, Map<Integer, Long>> queryGroupOffset(
+    Map<String, Map<Integer, Tuple2<Long, Long>>> queryGroupOffset(
             String group, Map<String, Set<Integer>> topicPartMap);
 
     boolean modifyGroupOffset(MessageStoreManager storeManager, Set<String> groups,
-                              Map<String, Map<Integer, Long>> topicPartOffsetMap,
+                              Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap,
                               String modifier);
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java
new file mode 100644
index 0000000..9a4abe3
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.broker.utils;
+
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.Tuple2;
+
+
+public class GroupOffsetInfo {
+    public int partitionId = TBaseConstants.META_VALUE_UNDEFINED;
+    public long offsetMin = TBaseConstants.META_VALUE_UNDEFINED;
+    public long offsetMax = TBaseConstants.META_VALUE_UNDEFINED;
+    public long dataMin = TBaseConstants.META_VALUE_UNDEFINED;
+    public long dataMax = TBaseConstants.META_VALUE_UNDEFINED;
+    public long curOffset = TBaseConstants.META_VALUE_UNDEFINED;
+    public long flightOffset = TBaseConstants.META_VALUE_UNDEFINED;
+    public long offsetLag = TBaseConstants.META_VALUE_UNDEFINED;
+    public long curDataOffset = TBaseConstants.META_VALUE_UNDEFINED;
+    public long dataLag = TBaseConstants.META_VALUE_UNDEFINED;
+
+    public GroupOffsetInfo(int partitionId) {
+        this.partitionId = partitionId;
+    }
+
+    public void setPartPubStoreInfo(TopicPubStoreInfo pubStoreInfo) {
+        if (pubStoreInfo != null) {
+            this.offsetMin = pubStoreInfo.indexStart;
+            this.offsetMax = pubStoreInfo.indexEnd;
+            this.dataMin = pubStoreInfo.dataStart;
+            this.dataMax = pubStoreInfo.dataEnd;
+        }
+    }
+
+    public void setConsumeOffsetInfo(Tuple2<Long, Long> offsetInfo) {
+        if (offsetInfo != null) {
+            this.curOffset = offsetInfo.f0;
+            this.flightOffset = offsetInfo.f1;
+        }
+    }
+
+    public void setConsumeDataOffsetInfo(long curDataOffset) {
+        if (curDataOffset >= 0) {
+            this.curDataOffset = curDataOffset;
+        }
+    }
+
+    public void calculateLag() {
+        if (offsetMax != TBaseConstants.META_VALUE_UNDEFINED
+                && curOffset != TBaseConstants.META_VALUE_UNDEFINED) {
+            offsetLag = offsetMax - curOffset;
+        }
+        if (dataMax != TBaseConstants.META_VALUE_UNDEFINED
+                && curDataOffset != TBaseConstants.META_VALUE_UNDEFINED) {
+            dataLag = dataMax - curDataOffset;
+        }
+    }
+
+    public StringBuilder buildOffsetInfo(StringBuilder sBuilder) {
+        sBuilder.append("{\"partitionId\":").append(partitionId)
+                .append(",\"curOffset\":").append(curOffset)
+                .append(",\"flightOffset\":").append(flightOffset)
+                .append(",\"curDataOffset\":").append(curDataOffset)
+                .append(",\"offsetLag\":").append(offsetLag)
+                .append(",\"dataLag\":").append(dataLag)
+                .append(",\"offsetMax\":").append(offsetMax)
+                .append(",\"dataMax\":").append(dataMax)
+                .append("}");
+        return sBuilder;
+    }
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java
new file mode 100644
index 0000000..b2257dd
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.broker.utils;
+
+import org.apache.tubemq.corebase.TBaseConstants;
+
+
+
+public class TopicPubStoreInfo {
+
+    public String topicName = null;
+    public int storeId = TBaseConstants.META_VALUE_UNDEFINED;
+    public int partitionId = TBaseConstants.META_VALUE_UNDEFINED;
+    public long indexStart = 0L;
+    public long indexEnd = 0L;
+    public long dataStart = 0L;
+    public long dataEnd = 0L;
+
+    public TopicPubStoreInfo(String topicName, int storeId, int partitionId,
+                             long indexStart, long indexEnd, long dataStart, long dataEnd) {
+        this.topicName = topicName;
+        this.storeId = storeId;
+        this.partitionId = partitionId;
+        this.indexStart = indexStart;
+        this.indexEnd = indexEnd;
+        this.dataStart = dataStart;
+        this.dataEnd = dataEnd;
+    }
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index 91bfd23..9a0506e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -27,12 +27,15 @@ import java.util.concurrent.ConcurrentHashMap;
 import javax.servlet.http.HttpServletRequest;
 import org.apache.tubemq.corebase.TokenConstants;
 import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
 import org.apache.tubemq.server.broker.TubeBroker;
 import org.apache.tubemq.server.broker.metadata.TopicMetadata;
 import org.apache.tubemq.server.broker.msgstore.MessageStore;
 import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
 import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
 import org.apache.tubemq.server.broker.offset.OffsetService;
+import org.apache.tubemq.server.broker.utils.GroupOffsetInfo;
+import org.apache.tubemq.server.broker.utils.TopicPubStoreInfo;
 import org.apache.tubemq.server.common.fielddef.WebFieldDef;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.common.utils.WebParameterUtils;
@@ -672,42 +675,34 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         Set<String> topicSet = (Set<String>) result.retData1;
         // verify the acquired Topic set and
         //   query the corresponding offset information
-        Map<String, Map<String, Map<Integer, Long>>> groupOffsetMaps = new HashMap<>();
-        for (String group : qryGroupNameSet) {
-            Map<String, Set<Integer>> topicPartMap =
-                    validAndGetPartitions(group, topicSet);
-            Map<String, Map<Integer, Long>> groupOffsetMap =
-                    broker.getOffsetManager().queryGroupOffset(group, topicPartMap);
-            groupOffsetMaps.put(group, groupOffsetMap);
-        }
+        Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> groupOffsetMaps =
+                getGroupOffsetInfo(qryGroupNameSet, topicSet);
         // builder result
         int totalCnt = 0;
         sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
-        for (Map.Entry<String, Map<String, Map<Integer, Long>>> entry
+        for (Map.Entry<String, Map<String, Map<Integer, GroupOffsetInfo>>> entry
                 : groupOffsetMaps.entrySet()) {
             if (totalCnt++ > 0) {
                 sBuilder.append(",");
             }
-            Map<String, Map<Integer, Long>> topicPartMap = entry.getValue();
+            Map<String, Map<Integer, GroupOffsetInfo>> topicPartMap = entry.getValue();
             sBuilder.append("{\"groupName\":\"").append(entry.getKey())
                     .append("\",\"subInfo\":[");
             int topicCnt = 0;
-            for (Map.Entry<String, Map<Integer, Long>> entry1 : topicPartMap.entrySet()) {
+            for (Map.Entry<String, Map<Integer, GroupOffsetInfo>> entry1 : topicPartMap.entrySet()) {
                 if (topicCnt++ > 0) {
                     sBuilder.append(",");
                 }
-                Map<Integer, Long> partOffMap = entry1.getValue();
+                Map<Integer, GroupOffsetInfo> partOffMap = entry1.getValue();
                 sBuilder.append("{\"topicName\":\"").append(entry1.getKey())
                         .append("\",\"offsets\":[");
                 int partCnt = 0;
-                for (Map.Entry<Integer, Long> entry2 : partOffMap.entrySet()) {
+                for (Map.Entry<Integer, GroupOffsetInfo> entry2 : partOffMap.entrySet()) {
                     if (partCnt++ > 0) {
                         sBuilder.append(",");
                     }
-                    sBuilder.append("{\"").append(this.broker.getTubeConfig().getBrokerId())
-                            .append(TokenConstants.ATTR_SEP).append(entry1.getKey())
-                            .append(TokenConstants.ATTR_SEP).append(entry2.getKey())
-                            .append("\":").append(entry2.getValue()).append("}");
+                    GroupOffsetInfo offsetInfo = entry2.getValue();
+                    offsetInfo.buildOffsetInfo(sBuilder);
                 }
                 sBuilder.append("],\"partCount\":").append(partCnt).append("}");
             }
@@ -777,7 +772,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
             return;
         }
         // query offset from source group
-        Map<String, Map<Integer, Long>> srcGroupOffsets =
+        Map<String, Map<Integer, Tuple2<Long, Long>>> srcGroupOffsets =
                 broker.getOffsetManager().queryGroupOffset(srcGroupName, topicPartMap);
         boolean changed = broker.getOffsetManager().modifyGroupOffset(
                 broker.getStoreManager(), tgtGroupNameSet, srcGroupOffsets, modifier);
@@ -785,6 +780,48 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
     }
 
+    // builder group's offset info
+    private Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> getGroupOffsetInfo(
+            Set<String> groupSet, Set<String> topicSet) {
+        long curReadDataOffset = -2;
+        long curDataLag = -2;
+        Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> groupOffsetMaps = new HashMap<>();
+        for (String group : groupSet) {
+            Map<String, Map<Integer, GroupOffsetInfo>> topicOffsetRet = new HashMap<>();
+            // valid and get topic's partitionIds
+            Map<String, Set<Integer>> topicPartMap = validAndGetPartitions(group, topicSet);
+            // get topic's publish info
+            Map<String, Map<Integer, TopicPubStoreInfo>> topicStorePubInfoMap =
+                    broker.getStoreManager().getTopicPublishInfos(topicPartMap.keySet());
+            // get group's booked offset info
+            Map<String, Map<Integer, Tuple2<Long, Long>>> groupOffsetMap =
+                    broker.getOffsetManager().queryGroupOffset(group, topicPartMap);
+            // get offset info array
+            for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+                String topic = entry.getKey();
+                Map<Integer, GroupOffsetInfo> partOffsetRet = new HashMap<>();
+                Map<Integer, TopicPubStoreInfo> storeInfoMap = topicStorePubInfoMap.get(topic);
+                Map<Integer, Tuple2<Long, Long>> partBookedMap = groupOffsetMap.get(topic);
+                for (Integer partitionId : entry.getValue()) {
+                    GroupOffsetInfo offsetInfo = new GroupOffsetInfo(partitionId);
+                    offsetInfo.setPartPubStoreInfo(storeInfoMap.get(partitionId));
+                    offsetInfo.setConsumeOffsetInfo(partBookedMap.get(partitionId));
+                    String queryKey = buildQueryID(group, topic, partitionId);
+                    ConsumerNodeInfo nodeInfo = broker.getConsumerNodeInfo(queryKey);
+                    if (nodeInfo != null) {
+                        offsetInfo.setConsumeDataOffsetInfo(nodeInfo.getLastDataRdOffset());
+                    }
+                    offsetInfo.calculateLag();
+                    partOffsetRet.put(partitionId, offsetInfo);
+                }
+                topicOffsetRet.put(topic, partOffsetRet);
+            }
+            groupOffsetMaps.put(group, topicOffsetRet);
+        }
+        return groupOffsetMaps;
+    }
+
+
     private Map<String, Set<Integer>> validAndGetPartitions(String group, Set<String> topicSet) {
         Map<String, Set<Integer>> topicPartMap = new HashMap<>();
         // query stored topic set stored in memory or zk
@@ -807,4 +844,10 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         return topicPartMap;
     }
 
+    private String buildQueryID(String group, String topic, int partitionId) {
+        return new StringBuilder(512).append(group)
+                .append(TokenConstants.ATTR_SEP).append(topic)
+                .append(TokenConstants.ATTR_SEP).append(partitionId).toString();
+    }
+
 }