You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/29 10:07:22 UTC
[incubator-tubemq] 24/49: [TUBEMQ-485]Add the batch setting API of
consume group offset
This is an automated email from the ASF dual-hosted git repository.
yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 1b854fb413d1226572343627cf52a8e40d0410f4
Author: gosonzhang <go...@tencent.com>
AuthorDate: Mon Jan 4 19:39:55 2021 +0800
[TUBEMQ-485]Add the batch setting API of consume group offset
---
.../org/apache/tubemq/corebase/utils/Tuple3.java | 48 +++++
.../server/broker/offset/DefaultOffsetManager.java | 83 +++-----
.../tubemq/server/broker/offset/OffsetService.java | 7 +-
.../server/broker/web/BrokerAdminServlet.java | 228 ++++++++++++++++++++-
.../tubemq/server/common/fielddef/WebFieldDef.java | 8 +-
.../server/common/utils/WebParameterUtils.java | 71 +++++++
.../tubemq/server/common/webbase/WebFieldType.java | 3 +-
7 files changed, 388 insertions(+), 60 deletions(-)
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple3.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple3.java
new file mode 100644
index 0000000..a2d98c3
--- /dev/null
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple3.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.corebase.utils;
+
+public class Tuple3<T0, T1, T2> {
+
+ /** Field 0 of the tuple. */
+ public T0 f0 = null;
+ /** Field 1 of the tuple. */
+ public T1 f1 = null;
+ /** Field 2 of the tuple. */
+ public T2 f2 = null;
+
+ /**
+ * Creates a new tuple where all fields are null.
+ */
+ public Tuple3() {
+
+ }
+
+ /**
+ * Creates a new tuple and assigns the given values to the tuple's fields.
+ *
+ * @param value0 The value for field 0
+ * @param value1 The value for field 1
+ * @param value2 The value for field 2
+ */
+ public Tuple3(T0 value0, T1 value1, T2 value2) {
+ this.f0 = value0;
+ this.f1 = value1;
+ this.f2 = value2;
+ }
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
index df3afc4..84dabb2 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -19,6 +19,7 @@ package org.apache.tubemq.server.broker.offset;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -26,9 +27,9 @@ import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.daemon.AbstractDaemonService;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.corebase.utils.Tuple3;
import org.apache.tubemq.server.broker.BrokerConfig;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
-import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.tubemq.server.common.offsetstorage.OffsetStorage;
import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
@@ -119,8 +120,8 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
|| (readStatus == TBaseConstants.CONSUME_MODEL_READ_FROM_MAX_ALWAYS)) {
long adjOffset = indexMaxOffset;
if (readStatus != TBaseConstants.CONSUME_MODEL_READ_FROM_MAX_ALWAYS) {
- adjOffset = reqOffset > indexMaxOffset ? indexMaxOffset : reqOffset;
- adjOffset = adjOffset < indexMinOffset ? indexMinOffset : adjOffset;
+ adjOffset = Math.min(reqOffset, indexMaxOffset);
+ adjOffset = Math.max(adjOffset, indexMinOffset);
}
regInfo.getAndSetOffset(adjOffset);
}
@@ -288,7 +289,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
long firstOffset = store.getIndexMinOffset();
long lastOffset = store.getIndexMaxOffset();
reSetOffset = reSetOffset < firstOffset
- ? firstOffset : reSetOffset > lastOffset ? lastOffset : reSetOffset;
+ ? firstOffset : Math.min(reSetOffset, lastOffset);
String offsetCacheKey = getOffsetCacheKey(topic, partitionId);
getAndResetTmpOffset(group, offsetCacheKey);
OffsetStorageInfo regInfo =
@@ -449,70 +450,46 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
return result;
}
-
/***
* Reset offset.
*
- * @param storeManager
* @param groups
- * @param topicPartOffsetMap
+ * @param topicPartOffsets
* @param modifier
* @return at least one record modified
*/
@Override
- public boolean modifyGroupOffset(
- MessageStoreManager storeManager, Set<String> groups,
- Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap, String modifier) {
+ public boolean modifyGroupOffset(Set<String> groups,
+ List<Tuple3<String, Integer, Long>> topicPartOffsets,
+ String modifier) {
long oldOffset = -1;
- long reSetOffset = -1;
boolean changed = false;
- MessageStore store = null;
+ String offsetCacheKey = null;
StringBuilder strBuidler = new StringBuilder(512);
// set offset by group
for (String group : groups) {
- for (Map.Entry<String, Map<Integer, Tuple2<Long, Long>>> entry
- : topicPartOffsetMap.entrySet()) {
- Map<Integer, Tuple2<Long, Long>> partOffsetMap = entry.getValue();
- if (partOffsetMap == null) {
+ for (Tuple3<String, Integer, Long> tuple3 : topicPartOffsets) {
+ if (tuple3 == null
+ || tuple3.f0 == null
+ || tuple3.f1 == null
+ || tuple3.f2 == null) {
continue;
}
- // set offset
- for (Map.Entry<Integer, Tuple2<Long, Long>> entry1 : partOffsetMap.entrySet()) {
- if (entry1.getValue() == null) {
- continue;
- }
- Tuple2<Long, Long> offsetTuple = entry1.getValue();
- // get topic store
- try {
- store = storeManager.getOrCreateMessageStore(
- entry.getKey(), entry1.getKey());
- } catch (Throwable e) {
- //
- }
- if (store == null) {
- continue;
- }
- long firstOffset = store.getIndexMinOffset();
- long lastOffset = store.getIndexMaxOffset();
- // adjust reseted offset value
- reSetOffset = offsetTuple.f0 < firstOffset
- ? firstOffset : Math.min(offsetTuple.f0, lastOffset);
- String offsetCacheKey =
- getOffsetCacheKey(entry.getKey(), entry1.getKey());
- getAndResetTmpOffset(group, offsetCacheKey);
- OffsetStorageInfo regInfo = loadOrCreateOffset(group,
- entry.getKey(), entry1.getKey(), offsetCacheKey, 0);
- oldOffset = regInfo.getAndSetOffset(reSetOffset);
- changed = true;
- logger.info(strBuidler
- .append("[Offset Manager] Update offset by modifier=")
- .append(modifier).append(",reset offset=").append(reSetOffset)
- .append(",old offset=").append(oldOffset)
- .append(",updated offset=").append(regInfo.getOffset())
- .append(",group=").append(group)
- .append(",topic-partId=").append(offsetCacheKey).toString());
- strBuidler.delete(0, strBuidler.length());
- }
+ // set offset value
+ offsetCacheKey = getOffsetCacheKey(tuple3.f0, tuple3.f1);
+ getAndResetTmpOffset(group, offsetCacheKey);
+ OffsetStorageInfo regInfo = loadOrCreateOffset(group,
+ tuple3.f0, tuple3.f1, offsetCacheKey, 0);
+ oldOffset = regInfo.getAndSetOffset(tuple3.f2);
+ changed = true;
+ logger.info(strBuidler
+ .append("[Offset Manager] Update offset by modifier=")
+ .append(modifier).append(",reset offset=").append(tuple3.f2)
+ .append(",old offset=").append(oldOffset)
+ .append(",updated offset=").append(regInfo.getOffset())
+ .append(",group=").append(group)
+ .append(",topic-partId=").append(offsetCacheKey).toString());
+ strBuidler.delete(0, strBuidler.length());
}
}
return changed;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
index fcebdfc..9dcd29a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
@@ -17,12 +17,13 @@
package org.apache.tubemq.server.broker.offset;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.corebase.utils.Tuple3;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
-import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
@@ -68,7 +69,7 @@ public interface OffsetService {
Map<String, Map<Integer, Tuple2<Long, Long>>> queryGroupOffset(
String group, Map<String, Set<Integer>> topicPartMap);
- boolean modifyGroupOffset(MessageStoreManager storeManager, Set<String> groups,
- Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap,
+ boolean modifyGroupOffset(Set<String> groups,
+ List<Tuple3<String, Integer, Long>> topicPartOffsets,
String modifier);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index d8f85d4..c76a6b7 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -17,6 +17,7 @@
package org.apache.tubemq.server.broker.web;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -25,9 +26,11 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpServletRequest;
+
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.corebase.utils.Tuple3;
import org.apache.tubemq.server.broker.TubeBroker;
import org.apache.tubemq.server.broker.metadata.TopicMetadata;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
@@ -89,6 +92,9 @@ public class BrokerAdminServlet extends AbstractWebHandler {
// clone consumer group's offset from source to target
innRegisterWebMethod("admin_clone_offset",
"adminCloneGroupOffSet");
+ // set or update group's offset info
+ innRegisterWebMethod("admin_set_offset",
+ "adminSetGroupOffSet");
}
public void adminQueryAllMethods(HttpServletRequest req,
@@ -759,6 +765,74 @@ public class BrokerAdminServlet extends AbstractWebHandler {
}
/***
+ * Add or Modify consumer group offset.
+ *
+ * @param req
+ * @param sBuilder process result
+ */
+ public void adminSetGroupOffSet(HttpServletRequest req,
+ StringBuilder sBuilder) {
+ // get group list
+ ProcessResult result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, true, null);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ Set<String> groupNameSet = (Set<String>) result.retData1;
+ // get set mode
+ result = WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.MANUALSET, true, false);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ boolean manualSet = (Boolean) result.retData1;
+ // get modify user
+ result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.MODIFYUSER, true, null);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ List<Tuple3<String, Integer, Long>> resetOffsets;
+ final String modifier = (String) result.retData1;
+ if (manualSet) {
+ // get offset json info
+ result = WebParameterUtils.getJsonDictParamValue(req,
+ WebFieldDef.OFFSETJSON, true, null);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ Map<String, Long> manOffsets =
+ (Map<String, Long>) result.retData1;
+ // valid and transfer offset format
+ result = validManOffsetResetInfo(WebFieldDef.OFFSETJSON, manOffsets);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ resetOffsets =
+ (List<Tuple3<String, Integer, Long>>) result.retData1;
+ } else {
+ // get the topic set to be set
+ result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, true, null);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ Set<String> topicSet = (Set<String>) result.retData1;
+ // transfer offset format
+ resetOffsets = buildOffsetResetInfo(topicSet);
+ }
+ boolean changed = broker.getOffsetManager().modifyGroupOffset(
+ groupNameSet, resetOffsets, modifier);
+ sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
+ }
+
+ /***
* Clone consume group offset, clone A group's offset to other group.
*
* @param req
@@ -821,12 +895,162 @@ public class BrokerAdminServlet extends AbstractWebHandler {
// query offset from source group
Map<String, Map<Integer, Tuple2<Long, Long>>> srcGroupOffsets =
broker.getOffsetManager().queryGroupOffset(srcGroupName, topicPartMap);
+ // transfer offset format
+ List<Tuple3<String, Integer, Long>> resetOffsets =
+ buildOffsetResetInfo(srcGroupOffsets);
boolean changed = broker.getOffsetManager().modifyGroupOffset(
- broker.getStoreManager(), tgtGroupNameSet, srcGroupOffsets, modifier);
+ tgtGroupNameSet, resetOffsets, modifier);
// builder return result
sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
}
+ // build reset offset info
+ private List<Tuple3<String, Integer, Long>> buildOffsetResetInfo(
+ Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap) {
+ long adjOffset = -1;
+ MessageStore store = null;
+ List<Tuple3<String, Integer, Long>> result = new ArrayList<>();
+ MessageStoreManager storeManager = broker.getStoreManager();
+ for (Map.Entry<String, Map<Integer, Tuple2<Long, Long>>> entry
+ : topicPartOffsetMap.entrySet()) {
+ Map<Integer, Tuple2<Long, Long>> partOffsetMap = entry.getValue();
+ if (partOffsetMap == null) {
+ continue;
+ }
+ // process offset value
+ for (Map.Entry<Integer, Tuple2<Long, Long>> entry1 : partOffsetMap.entrySet()) {
+ if (entry1.getValue() == null) {
+ continue;
+ }
+ Tuple2<Long, Long> offsetTuple = entry1.getValue();
+ // get topic store
+ try {
+ store = storeManager.getOrCreateMessageStore(
+ entry.getKey(), entry1.getKey());
+ } catch (Throwable e) {
+ //
+ }
+ if (store == null) {
+ continue;
+ }
+ long firstOffset = store.getIndexMinOffset();
+ long lastOffset = store.getIndexMaxOffset();
+ // adjust reset offset value
+ adjOffset = offsetTuple.f0 < firstOffset
+ ? firstOffset : Math.min(offsetTuple.f0, lastOffset);
+ result.add(new Tuple3<>(entry.getKey(), entry1.getKey(), adjOffset));
+ }
+ }
+ return result;
+ }
+
+ // build reset offset info
+ private List<Tuple3<String, Integer, Long>> buildOffsetResetInfo(Set<String> topicSet) {
+ MessageStore store = null;
+ List<Tuple3<String, Integer, Long>> result = new ArrayList<>();
+ MessageStoreManager storeManager = broker.getStoreManager();
+ // get topic's partition set
+ Map<String, Set<Integer>> topicPartMap =
+ validAndGetPartitions(null, topicSet);
+ // fill current topic's max offset value
+ for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+ if (entry.getKey() == null
+ || entry.getValue() == null
+ || entry.getValue().isEmpty()) {
+ continue;
+ }
+ Set<Integer> partitionSet = entry.getValue();
+ for (Integer partId : partitionSet) {
+ // get topic store
+ try {
+ store = storeManager.getOrCreateMessageStore(
+ entry.getKey(), partId);
+ } catch (Throwable e) {
+ //
+ }
+ if (store == null) {
+ continue;
+ }
+ result.add(new Tuple3<>(entry.getKey(),
+ partId, store.getIndexMaxOffset()));
+ }
+ }
+ return result;
+ }
+
+ // build reset offset info
+ private ProcessResult validManOffsetResetInfo(WebFieldDef fieldDef,
+ Map<String, Long> manOffsetInfoMap) {
+ String brokerId;
+ String topicName;
+ String strPartId;
+ int partitionId;
+ long adjOffset;
+ MessageStore store = null;
+ ProcessResult procResult = new ProcessResult();
+ MessageStoreManager storeManager = broker.getStoreManager();
+ List<Tuple3<String, Integer, Long>> offsetVals = new ArrayList<>();
+ String localBrokerId = String.valueOf(broker.getTubeConfig().getBrokerId());
+ // get topic configure infos
+ Map<String, TopicMetadata> topicConfigMap =
+ broker.getMetadataManager().getTopicConfigMap();
+ for (Map.Entry<String, Long> entry : manOffsetInfoMap.entrySet()) {
+ if (entry.getKey() == null || entry.getValue() == null) {
+ continue;
+ }
+ // parse and check partitionKey value
+ String[] keyItems = entry.getKey().split(TokenConstants.ATTR_SEP);
+ if (keyItems.length != 3) {
+ procResult.setFailResult(fieldDef.id,
+ new StringBuilder(512).append("Parameter ")
+ .append(fieldDef.name).append("'s key invalid:")
+ .append(entry.getKey())
+ .append(" must be brokerId:topicName:partitionId !").toString());
+ return procResult;
+ }
+ brokerId = keyItems[0].trim();
+ topicName = keyItems[1].trim();
+ strPartId = keyItems[2].trim();
+ if (!localBrokerId.equals(brokerId)
+ || !topicConfigMap.containsKey(topicName)) {
+ continue;
+ }
+ try {
+ partitionId = Integer.parseInt(strPartId);
+ } catch (NumberFormatException e) {
+ procResult.setFailResult(fieldDef.id,
+ new StringBuilder(512).append("Parameter ")
+ .append(fieldDef.name).append("'s key invalid:")
+ .append(entry.getKey())
+ .append("'s partitionId value not number!").toString());
+ return procResult;
+ }
+ // check and adjust offset value
+ try {
+ store = storeManager.getOrCreateMessageStore(topicName, partitionId);
+ } catch (Throwable e) {
+ //
+ }
+ if (store == null) {
+ continue;
+ }
+ long firstOffset = store.getIndexMinOffset();
+ long lastOffset = store.getIndexMaxOffset();
+ adjOffset = entry.getValue() < firstOffset
+ ? firstOffset : Math.min(entry.getValue(), lastOffset);
+ offsetVals.add(new Tuple3<>(topicName, partitionId, adjOffset));
+ }
+ if (offsetVals.isEmpty()) {
+ procResult.setFailResult(fieldDef.id,
+ new StringBuilder(512).append("Parameter ")
+ .append(fieldDef.name)
+ .append("'s value is invalid!").toString());
+ } else {
+ procResult.setSuccResult(offsetVals);
+ }
+ return procResult;
+ }
+
// builder group's offset info
private Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> getGroupOffsetInfo(
Set<String> groupSet, Set<String> topicSet) {
@@ -872,7 +1096,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
private Map<String, Set<Integer>> validAndGetPartitions(String group, Set<String> topicSet) {
Map<String, Set<Integer>> topicPartMap = new HashMap<>();
// query stored topic set stored in memory or zk
- if (topicSet.isEmpty()) {
+ if (topicSet.isEmpty() && group != null) {
topicSet = broker.getOffsetManager().getGroupSubInfo(group);
}
// get topic's partitionIds
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index ec97421..45b862d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -78,7 +78,13 @@ public enum WebFieldDef {
RegexDef.TMP_GROUP),
TGTCOMPSGROUPNAME(19, "targetGroupName", "tgtGroup",
WebFieldType.COMPSTRING, "Offset clone target group name",
- TBaseConstants.META_MAX_GROUPNAME_LENGTH, RegexDef.TMP_GROUP);
+ TBaseConstants.META_MAX_GROUPNAME_LENGTH, RegexDef.TMP_GROUP),
+ MANUALSET(20, "manualSet", "manSet",
+ WebFieldType.BOOLEAN, "Whether manual offset setting mode"),
+ OFFSETJSON(21, "offsetJsonSet", "offsetSet",
+ WebFieldType.JSONTYPE, "The offset set that needs to be added or modified");
+
+
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 1202d33..fddd5de 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -478,6 +478,77 @@ public class WebParameterUtils {
}
/**
+ * Parse the parameter value from an json dict
+ *
+ * @param req Http Servlet Request
+ * @param fieldDef the parameter field definition
+ * @param required a boolean value represent whether the parameter is must required
+ * @param defValue a default value returned if failed to parse value from the given object
+ * @return valid result for the parameter value
+ */
+ public static ProcessResult getJsonDictParamValue(HttpServletRequest req,
+ WebFieldDef fieldDef,
+ boolean required,
+ Map<String, Long> defValue) {
+ ProcessResult procResult = new ProcessResult();
+ // get parameter value
+ String paramValue = req.getParameter(fieldDef.name);
+ if (paramValue == null) {
+ paramValue = req.getParameter(fieldDef.shortName);
+ }
+ if (TStringUtils.isNotBlank(paramValue)) {
+ // Cleanup value extra characters
+ paramValue = escDoubleQuotes(paramValue.trim());
+ }
+ // Check if the parameter exists
+ if (TStringUtils.isBlank(paramValue)) {
+ if (required) {
+ procResult.setFailResult(fieldDef.id,
+ new StringBuilder(512).append("Parameter ")
+ .append(fieldDef.name)
+ .append(" is missing or value is null or blank!").toString());
+ } else {
+ procResult.setSuccResult(defValue);
+ }
+ return procResult;
+ }
+ try {
+ paramValue = URLDecoder.decode(paramValue,
+ TBaseConstants.META_DEFAULT_CHARSET_NAME);
+ } catch (UnsupportedEncodingException e) {
+ procResult.setFailResult(fieldDef.id,
+ new StringBuilder(512).append("Parameter ")
+ .append(fieldDef.name)
+ .append(" decode error, exception is ")
+ .append(e.toString()).toString());
+ }
+ if (TStringUtils.isBlank(paramValue)) {
+ if (required) {
+ procResult.setFailResult(fieldDef.id,
+ new StringBuilder(512).append("Parameter ")
+ .append(fieldDef.name)
+ .append("'s value is blank!").toString());
+ } else {
+ procResult.setSuccResult(defValue);
+ }
+ return procResult;
+ }
+ if (fieldDef.valMaxLen != TBaseConstants.META_VALUE_UNDEFINED) {
+ if (paramValue.length() > fieldDef.valMaxLen) {
+ procResult.setFailResult(fieldDef.id,
+ new StringBuilder(512).append("Parameter ")
+ .append(fieldDef.name)
+ .append("'s length over max allowed length (")
+ .append(fieldDef.valMaxLen).append(")!").toString());
+ return procResult;
+ }
+ }
+ procResult.setSuccResult(new Gson().fromJson(paramValue,
+ new TypeToken<Map<String, Long>>(){}.getType()));
+ return procResult;
+ }
+
+ /**
* process string default value
*
* @param procResult process result
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java
index b83a966..2f32cb1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java
@@ -28,7 +28,8 @@ public enum WebFieldType {
BOOLEAN(4, "Boolean"),
DATE(5, "Date"),
COMPSTRING(6, "Compound string"),
- COMPINT(7, "Compound integer");
+ COMPINT(7, "Compound integer"),
+ JSONTYPE(8, "Json");
public int value;