You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/29 10:07:22 UTC

[incubator-tubemq] 24/49: [TUBEMQ-485]Add the batch setting API of consume group offset

This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit 1b854fb413d1226572343627cf52a8e40d0410f4
Author: gosonzhang <go...@tencent.com>
AuthorDate: Mon Jan 4 19:39:55 2021 +0800

    [TUBEMQ-485]Add the batch setting API of consume group offset
---
 .../org/apache/tubemq/corebase/utils/Tuple3.java   |  48 +++++
 .../server/broker/offset/DefaultOffsetManager.java |  83 +++-----
 .../tubemq/server/broker/offset/OffsetService.java |   7 +-
 .../server/broker/web/BrokerAdminServlet.java      | 228 ++++++++++++++++++++-
 .../tubemq/server/common/fielddef/WebFieldDef.java |   8 +-
 .../server/common/utils/WebParameterUtils.java     |  71 +++++++
 .../tubemq/server/common/webbase/WebFieldType.java |   3 +-
 7 files changed, 388 insertions(+), 60 deletions(-)

diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple3.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple3.java
new file mode 100644
index 0000000..a2d98c3
--- /dev/null
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple3.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.corebase.utils;
+
+public class Tuple3<T0, T1, T2> {
+
+    /** Field 0 of the tuple. */
+    public T0 f0 = null;
+    /** Field 1 of the tuple. */
+    public T1 f1 = null;
+    /** Field 2 of the tuple. */
+    public T2 f2 = null;
+
+    /**
+     * Creates a new tuple where all fields are null.
+     */
+    public Tuple3() {
+
+    }
+
+    /**
+     * Creates a new tuple and assigns the given values to the tuple's fields.
+     *
+     * @param value0 The value for field 0
+     * @param value1 The value for field 1
+     * @param value2 The value for field 2
+     */
+    public Tuple3(T0 value0, T1 value1, T2 value2) {
+        this.f0 = value0;
+        this.f1 = value1;
+        this.f2 = value2;
+    }
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
index df3afc4..84dabb2 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -19,6 +19,7 @@ package org.apache.tubemq.server.broker.offset;
 
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -26,9 +27,9 @@ import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.daemon.AbstractDaemonService;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.corebase.utils.Tuple3;
 import org.apache.tubemq.server.broker.BrokerConfig;
 import org.apache.tubemq.server.broker.msgstore.MessageStore;
-import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
 import org.apache.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.tubemq.server.common.offsetstorage.OffsetStorage;
 import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
@@ -119,8 +120,8 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
                 || (readStatus == TBaseConstants.CONSUME_MODEL_READ_FROM_MAX_ALWAYS)) {
             long adjOffset = indexMaxOffset;
             if (readStatus != TBaseConstants.CONSUME_MODEL_READ_FROM_MAX_ALWAYS) {
-                adjOffset = reqOffset > indexMaxOffset ? indexMaxOffset : reqOffset;
-                adjOffset = adjOffset < indexMinOffset ? indexMinOffset : adjOffset;
+                adjOffset = Math.min(reqOffset, indexMaxOffset);
+                adjOffset = Math.max(adjOffset, indexMinOffset);
             }
             regInfo.getAndSetOffset(adjOffset);
         }
@@ -288,7 +289,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
             long firstOffset = store.getIndexMinOffset();
             long lastOffset = store.getIndexMaxOffset();
             reSetOffset = reSetOffset < firstOffset
-                    ? firstOffset : reSetOffset > lastOffset ? lastOffset : reSetOffset;
+                    ? firstOffset : Math.min(reSetOffset, lastOffset);
             String offsetCacheKey = getOffsetCacheKey(topic, partitionId);
             getAndResetTmpOffset(group, offsetCacheKey);
             OffsetStorageInfo regInfo =
@@ -449,70 +450,46 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
         return result;
     }
 
-
     /***
      * Reset offset.
      *
-     * @param storeManager
      * @param groups
-     * @param topicPartOffsetMap
+     * @param topicPartOffsets
      * @param modifier
      * @return at least one record modified
      */
     @Override
-    public boolean modifyGroupOffset(
-            MessageStoreManager storeManager, Set<String> groups,
-            Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap, String modifier) {
+    public boolean modifyGroupOffset(Set<String> groups,
+                                     List<Tuple3<String, Integer, Long>> topicPartOffsets,
+                                     String modifier) {
         long oldOffset = -1;
-        long reSetOffset = -1;
         boolean changed = false;
-        MessageStore store = null;
+        String offsetCacheKey = null;
         StringBuilder strBuidler = new StringBuilder(512);
         // set offset by group
         for (String group : groups) {
-            for (Map.Entry<String, Map<Integer, Tuple2<Long, Long>>> entry
-                    : topicPartOffsetMap.entrySet()) {
-                Map<Integer, Tuple2<Long, Long>> partOffsetMap = entry.getValue();
-                if (partOffsetMap  == null) {
+            for (Tuple3<String, Integer, Long> tuple3 : topicPartOffsets) {
+                if (tuple3 == null
+                        || tuple3.f0 == null
+                        || tuple3.f1 == null
+                        || tuple3.f2 == null) {
                     continue;
                 }
-                // set offset
-                for (Map.Entry<Integer, Tuple2<Long, Long>> entry1 : partOffsetMap.entrySet()) {
-                    if (entry1.getValue() == null) {
-                        continue;
-                    }
-                    Tuple2<Long, Long> offsetTuple = entry1.getValue();
-                    // get topic store
-                    try {
-                        store = storeManager.getOrCreateMessageStore(
-                                entry.getKey(), entry1.getKey());
-                    } catch (Throwable e) {
-                        //
-                    }
-                    if (store == null) {
-                        continue;
-                    }
-                    long firstOffset = store.getIndexMinOffset();
-                    long lastOffset = store.getIndexMaxOffset();
-                    // adjust reseted offset value
-                    reSetOffset = offsetTuple.f0 < firstOffset
-                            ? firstOffset : Math.min(offsetTuple.f0, lastOffset);
-                    String offsetCacheKey =
-                            getOffsetCacheKey(entry.getKey(), entry1.getKey());
-                    getAndResetTmpOffset(group, offsetCacheKey);
-                    OffsetStorageInfo regInfo = loadOrCreateOffset(group,
-                            entry.getKey(), entry1.getKey(), offsetCacheKey, 0);
-                    oldOffset = regInfo.getAndSetOffset(reSetOffset);
-                    changed = true;
-                    logger.info(strBuidler
-                            .append("[Offset Manager] Update offset by modifier=")
-                            .append(modifier).append(",reset offset=").append(reSetOffset)
-                            .append(",old offset=").append(oldOffset)
-                            .append(",updated offset=").append(regInfo.getOffset())
-                            .append(",group=").append(group)
-                            .append(",topic-partId=").append(offsetCacheKey).toString());
-                    strBuidler.delete(0, strBuidler.length());
-                }
+                // set offset value
+                offsetCacheKey = getOffsetCacheKey(tuple3.f0, tuple3.f1);
+                getAndResetTmpOffset(group, offsetCacheKey);
+                OffsetStorageInfo regInfo = loadOrCreateOffset(group,
+                        tuple3.f0, tuple3.f1, offsetCacheKey, 0);
+                oldOffset = regInfo.getAndSetOffset(tuple3.f2);
+                changed = true;
+                logger.info(strBuidler
+                        .append("[Offset Manager] Update offset by modifier=")
+                        .append(modifier).append(",reset offset=").append(tuple3.f2)
+                        .append(",old offset=").append(oldOffset)
+                        .append(",updated offset=").append(regInfo.getOffset())
+                        .append(",group=").append(group)
+                        .append(",topic-partId=").append(offsetCacheKey).toString());
+                strBuidler.delete(0, strBuidler.length());
             }
         }
         return changed;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
index fcebdfc..9dcd29a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
@@ -17,12 +17,13 @@
 
 package org.apache.tubemq.server.broker.offset;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.corebase.utils.Tuple3;
 import org.apache.tubemq.server.broker.msgstore.MessageStore;
-import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
 import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
 
 
@@ -68,7 +69,7 @@ public interface OffsetService {
     Map<String, Map<Integer, Tuple2<Long, Long>>> queryGroupOffset(
             String group, Map<String, Set<Integer>> topicPartMap);
 
-    boolean modifyGroupOffset(MessageStoreManager storeManager, Set<String> groups,
-                              Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap,
+    boolean modifyGroupOffset(Set<String> groups,
+                              List<Tuple3<String, Integer, Long>> topicPartOffsets,
                               String modifier);
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index d8f85d4..c76a6b7 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -17,6 +17,7 @@
 
 package org.apache.tubemq.server.broker.web;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -25,9 +26,11 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.servlet.http.HttpServletRequest;
+
 import org.apache.tubemq.corebase.TokenConstants;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.corebase.utils.Tuple3;
 import org.apache.tubemq.server.broker.TubeBroker;
 import org.apache.tubemq.server.broker.metadata.TopicMetadata;
 import org.apache.tubemq.server.broker.msgstore.MessageStore;
@@ -89,6 +92,9 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         // clone consumer group's offset from source to target
         innRegisterWebMethod("admin_clone_offset",
                 "adminCloneGroupOffSet");
+        // set or update group's offset info
+        innRegisterWebMethod("admin_set_offset",
+                "adminSetGroupOffSet");
     }
 
     public void adminQueryAllMethods(HttpServletRequest req,
@@ -759,6 +765,74 @@ public class BrokerAdminServlet extends AbstractWebHandler {
     }
 
     /***
+     * Add or Modify consumer group offset.
+     *
+     * @param req
+     * @param sBuilder process result
+     */
+    public void adminSetGroupOffSet(HttpServletRequest req,
+                                    StringBuilder sBuilder) {
+        // get group list
+        ProcessResult result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSGROUPNAME, true, null);
+        if (!result.success) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        Set<String> groupNameSet = (Set<String>) result.retData1;
+        // get set mode
+        result = WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.MANUALSET, true, false);
+        if (!result.success) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        boolean manualSet = (Boolean) result.retData1;
+        // get modify user
+        result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.MODIFYUSER, true, null);
+        if (!result.success) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        List<Tuple3<String, Integer, Long>> resetOffsets;
+        final String modifier = (String) result.retData1;
+        if (manualSet) {
+            // get offset json info
+            result = WebParameterUtils.getJsonDictParamValue(req,
+                    WebFieldDef.OFFSETJSON, true, null);
+            if (!result.success) {
+                WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+                return;
+            }
+            Map<String, Long> manOffsets =
+                    (Map<String, Long>) result.retData1;
+            // valid and transfer offset format
+            result = validManOffsetResetInfo(WebFieldDef.OFFSETJSON, manOffsets);
+            if (!result.success) {
+                WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+                return;
+            }
+            resetOffsets =
+                    (List<Tuple3<String, Integer, Long>>) result.retData1;
+        } else {
+            // get the topic set to be set
+            result = WebParameterUtils.getStringParamValue(req,
+                    WebFieldDef.COMPSTOPICNAME, true, null);
+            if (!result.success) {
+                WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+                return;
+            }
+            Set<String> topicSet = (Set<String>) result.retData1;
+            // transfer offset format
+            resetOffsets = buildOffsetResetInfo(topicSet);
+        }
+        boolean changed = broker.getOffsetManager().modifyGroupOffset(
+                groupNameSet, resetOffsets, modifier);
+        sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
+    }
+
+    /***
      * Clone consume group offset, clone A group's offset to other group.
      *
      * @param req
@@ -821,12 +895,162 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         // query offset from source group
         Map<String, Map<Integer, Tuple2<Long, Long>>> srcGroupOffsets =
                 broker.getOffsetManager().queryGroupOffset(srcGroupName, topicPartMap);
+        // transfer offset format
+        List<Tuple3<String, Integer, Long>> resetOffsets =
+                buildOffsetResetInfo(srcGroupOffsets);
         boolean changed = broker.getOffsetManager().modifyGroupOffset(
-                broker.getStoreManager(), tgtGroupNameSet, srcGroupOffsets, modifier);
+                tgtGroupNameSet, resetOffsets, modifier);
         // builder return result
         sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
     }
 
+    // build reset offset info
+    private List<Tuple3<String, Integer, Long>> buildOffsetResetInfo(
+            Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap) {
+        long adjOffset = -1;
+        MessageStore store = null;
+        List<Tuple3<String, Integer, Long>> result = new ArrayList<>();
+        MessageStoreManager storeManager = broker.getStoreManager();
+        for (Map.Entry<String, Map<Integer, Tuple2<Long, Long>>> entry
+                : topicPartOffsetMap.entrySet()) {
+            Map<Integer, Tuple2<Long, Long>> partOffsetMap = entry.getValue();
+            if (partOffsetMap  == null) {
+                continue;
+            }
+            // process offset value
+            for (Map.Entry<Integer, Tuple2<Long, Long>> entry1 : partOffsetMap.entrySet()) {
+                if (entry1.getValue() == null) {
+                    continue;
+                }
+                Tuple2<Long, Long> offsetTuple = entry1.getValue();
+                // get topic store
+                try {
+                    store = storeManager.getOrCreateMessageStore(
+                            entry.getKey(), entry1.getKey());
+                } catch (Throwable e) {
+                    //
+                }
+                if (store == null) {
+                    continue;
+                }
+                long firstOffset = store.getIndexMinOffset();
+                long lastOffset = store.getIndexMaxOffset();
+                // adjust reset offset value
+                adjOffset = offsetTuple.f0 < firstOffset
+                        ? firstOffset : Math.min(offsetTuple.f0, lastOffset);
+                result.add(new Tuple3<>(entry.getKey(), entry1.getKey(), adjOffset));
+            }
+        }
+        return result;
+    }
+
+    // build reset offset info
+    private List<Tuple3<String, Integer, Long>> buildOffsetResetInfo(Set<String> topicSet) {
+        MessageStore store = null;
+        List<Tuple3<String, Integer, Long>> result = new ArrayList<>();
+        MessageStoreManager storeManager = broker.getStoreManager();
+        // get topic's partition set
+        Map<String, Set<Integer>> topicPartMap =
+                validAndGetPartitions(null, topicSet);
+        // fill current topic's max offset value
+        for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+            if (entry.getKey() == null
+                    || entry.getValue() == null
+                    || entry.getValue().isEmpty()) {
+                continue;
+            }
+            Set<Integer> partitionSet = entry.getValue();
+            for (Integer partId : partitionSet) {
+                // get topic store
+                try {
+                    store = storeManager.getOrCreateMessageStore(
+                            entry.getKey(), partId);
+                } catch (Throwable e) {
+                    //
+                }
+                if (store == null) {
+                    continue;
+                }
+                result.add(new Tuple3<>(entry.getKey(),
+                        partId, store.getIndexMaxOffset()));
+            }
+        }
+        return result;
+    }
+
+    // build reset offset info
+    private ProcessResult validManOffsetResetInfo(WebFieldDef fieldDef,
+                                                  Map<String, Long> manOffsetInfoMap) {
+        String brokerId;
+        String topicName;
+        String strPartId;
+        int partitionId;
+        long adjOffset;
+        MessageStore store = null;
+        ProcessResult procResult = new ProcessResult();
+        MessageStoreManager storeManager = broker.getStoreManager();
+        List<Tuple3<String, Integer, Long>> offsetVals = new ArrayList<>();
+        String localBrokerId = String.valueOf(broker.getTubeConfig().getBrokerId());
+        // get topic configure infos
+        Map<String, TopicMetadata> topicConfigMap =
+                broker.getMetadataManager().getTopicConfigMap();
+        for (Map.Entry<String, Long> entry : manOffsetInfoMap.entrySet()) {
+            if (entry.getKey() == null || entry.getValue() == null) {
+                continue;
+            }
+            // parse and check partitionKey value
+            String[] keyItems = entry.getKey().split(TokenConstants.ATTR_SEP);
+            if (keyItems.length != 3) {
+                procResult.setFailResult(fieldDef.id,
+                        new StringBuilder(512).append("Parameter ")
+                                .append(fieldDef.name).append("'s key invalid:")
+                                .append(entry.getKey())
+                                .append(" must be brokerId:topicName:partitionId !").toString());
+                return procResult;
+            }
+            brokerId = keyItems[0].trim();
+            topicName = keyItems[1].trim();
+            strPartId = keyItems[2].trim();
+            if (!localBrokerId.equals(brokerId)
+                    || !topicConfigMap.containsKey(topicName)) {
+                continue;
+            }
+            try {
+                partitionId = Integer.parseInt(strPartId);
+            } catch (NumberFormatException e) {
+                procResult.setFailResult(fieldDef.id,
+                        new StringBuilder(512).append("Parameter ")
+                                .append(fieldDef.name).append("'s key invalid:")
+                                .append(entry.getKey())
+                                .append("'s partitionId value not number!").toString());
+                return procResult;
+            }
+            // check and adjust offset value
+            try {
+                store = storeManager.getOrCreateMessageStore(topicName, partitionId);
+            } catch (Throwable e) {
+                //
+            }
+            if (store == null) {
+                continue;
+            }
+            long firstOffset = store.getIndexMinOffset();
+            long lastOffset = store.getIndexMaxOffset();
+            adjOffset = entry.getValue() < firstOffset
+                    ? firstOffset : Math.min(entry.getValue(), lastOffset);
+            offsetVals.add(new Tuple3<>(topicName, partitionId, adjOffset));
+        }
+        if (offsetVals.isEmpty()) {
+            procResult.setFailResult(fieldDef.id,
+                    new StringBuilder(512).append("Parameter ")
+                            .append(fieldDef.name)
+                            .append("'s value is invalid!").toString());
+        } else {
+            procResult.setSuccResult(offsetVals);
+        }
+        return procResult;
+    }
+
     // builder group's offset info
     private Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> getGroupOffsetInfo(
             Set<String> groupSet, Set<String> topicSet) {
@@ -872,7 +1096,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
     private Map<String, Set<Integer>> validAndGetPartitions(String group, Set<String> topicSet) {
         Map<String, Set<Integer>> topicPartMap = new HashMap<>();
         // query stored topic set stored in memory or zk
-        if (topicSet.isEmpty()) {
+        if (topicSet.isEmpty() && group != null) {
             topicSet = broker.getOffsetManager().getGroupSubInfo(group);
         }
         // get topic's partitionIds
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index ec97421..45b862d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -78,7 +78,13 @@ public enum WebFieldDef {
             RegexDef.TMP_GROUP),
     TGTCOMPSGROUPNAME(19, "targetGroupName", "tgtGroup",
             WebFieldType.COMPSTRING, "Offset clone target group name",
-            TBaseConstants.META_MAX_GROUPNAME_LENGTH, RegexDef.TMP_GROUP);
+            TBaseConstants.META_MAX_GROUPNAME_LENGTH, RegexDef.TMP_GROUP),
+    MANUALSET(20, "manualSet", "manSet",
+            WebFieldType.BOOLEAN, "Whether manual offset setting mode"),
+    OFFSETJSON(21, "offsetJsonSet", "offsetSet",
+            WebFieldType.JSONTYPE, "The offset set that needs to be added or modified");
+
+
 
 
 
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 1202d33..fddd5de 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -478,6 +478,77 @@ public class WebParameterUtils {
     }
 
     /**
+     * Parse the parameter value from an json dict
+     *
+     * @param req         Http Servlet Request
+     * @param fieldDef    the parameter field definition
+     * @param required    a boolean value represent whether the parameter is must required
+     * @param defValue    a default value returned if failed to parse value from the given object
+     * @return valid result for the parameter value
+     */
+    public static ProcessResult getJsonDictParamValue(HttpServletRequest req,
+                                                      WebFieldDef fieldDef,
+                                                      boolean required,
+                                                      Map<String, Long> defValue) {
+        ProcessResult procResult = new ProcessResult();
+        // get parameter value
+        String paramValue = req.getParameter(fieldDef.name);
+        if (paramValue == null) {
+            paramValue = req.getParameter(fieldDef.shortName);
+        }
+        if (TStringUtils.isNotBlank(paramValue)) {
+            // Cleanup value extra characters
+            paramValue = escDoubleQuotes(paramValue.trim());
+        }
+        // Check if the parameter exists
+        if (TStringUtils.isBlank(paramValue)) {
+            if (required) {
+                procResult.setFailResult(fieldDef.id,
+                        new StringBuilder(512).append("Parameter ")
+                                .append(fieldDef.name)
+                                .append(" is missing or value is null or blank!").toString());
+            } else {
+                procResult.setSuccResult(defValue);
+            }
+            return procResult;
+        }
+        try {
+            paramValue = URLDecoder.decode(paramValue,
+                    TBaseConstants.META_DEFAULT_CHARSET_NAME);
+        } catch (UnsupportedEncodingException e) {
+            procResult.setFailResult(fieldDef.id,
+                    new StringBuilder(512).append("Parameter ")
+                            .append(fieldDef.name)
+                            .append(" decode error, exception is ")
+                            .append(e.toString()).toString());
+        }
+        if (TStringUtils.isBlank(paramValue)) {
+            if (required) {
+                procResult.setFailResult(fieldDef.id,
+                        new StringBuilder(512).append("Parameter ")
+                                .append(fieldDef.name)
+                                .append("'s value is blank!").toString());
+            } else {
+                procResult.setSuccResult(defValue);
+            }
+            return procResult;
+        }
+        if (fieldDef.valMaxLen != TBaseConstants.META_VALUE_UNDEFINED) {
+            if (paramValue.length() > fieldDef.valMaxLen) {
+                procResult.setFailResult(fieldDef.id,
+                        new StringBuilder(512).append("Parameter ")
+                                .append(fieldDef.name)
+                                .append("'s length over max allowed length (")
+                                .append(fieldDef.valMaxLen).append(")!").toString());
+                return procResult;
+            }
+        }
+        procResult.setSuccResult(new Gson().fromJson(paramValue,
+                new TypeToken<Map<String, Long>>(){}.getType()));
+        return procResult;
+    }
+
+    /**
      * process string default value
      *
      * @param procResult process result
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java
index b83a966..2f32cb1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java
@@ -28,7 +28,8 @@ public enum WebFieldType {
     BOOLEAN(4, "Boolean"),
     DATE(5, "Date"),
     COMPSTRING(6, "Compound string"),
-    COMPINT(7, "Compound integer");
+    COMPINT(7, "Compound integer"),
+    JSONTYPE(8, "Json");
 
 
     public int value;