You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/01/06 08:37:54 UTC
[incubator-tubemq] 01/02: [TUBEMQ-486]Add the delete API of
consumer group offset
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 537ed34b39dfb007fea0d65773e2f1a509a1bd46
Author: gosonzhang <go...@tencent.com>
AuthorDate: Wed Jan 6 11:27:15 2021 +0800
[TUBEMQ-486]Add the delete API of consumer group offset
---
.../server/broker/offset/DefaultOffsetManager.java | 104 ++++-
.../tubemq/server/broker/offset/OffsetService.java | 4 +
.../server/broker/web/BrokerAdminServlet.java | 430 +++++++++++++--------
.../tubemq/server/common/fielddef/WebFieldDef.java | 6 +-
.../tubemq/server/common/fileconfig/ZKConfig.java | 1 -
.../server/common/offsetstorage/OffsetStorage.java | 7 +-
.../common/offsetstorage/ZkOffsetStorage.java | 175 ++++++---
.../common/offsetstorage/zookeeper/ZKUtil.java | 14 +
.../tubemq/server/common/utils/ProcessResult.java | 3 +
.../server/common/utils/WebParameterUtils.java | 278 ++++++-------
.../web/handler/WebBrokerTopicConfHandler.java | 21 +-
11 files changed, 661 insertions(+), 382 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
index 84dabb2..f052375 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -17,6 +17,7 @@
package org.apache.tubemq.server.broker.offset;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -339,7 +340,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
Set<String> groupSet = new HashSet<>();
groupSet.addAll(cfmOffsetMap.keySet());
Map<String, Set<String>> localGroups =
- zkOffsetStorage.getZkLocalGroupTopicInfos();
+ zkOffsetStorage.queryZkAllGroupTopicInfos();
groupSet.addAll(localGroups.keySet());
return groupSet;
}
@@ -364,7 +365,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
public Set<String> getUnusedGroupInfo() {
Set<String> unUsedGroups = new HashSet<>();
Map<String, Set<String>> localGroups =
- zkOffsetStorage.getZkLocalGroupTopicInfos();
+ zkOffsetStorage.queryZkAllGroupTopicInfos();
for (String groupName : localGroups.keySet()) {
if (!cfmOffsetMap.containsKey(groupName)) {
unUsedGroups.add(groupName);
@@ -383,9 +384,11 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
Set<String> result = new HashSet<>();
Map<String, OffsetStorageInfo> topicPartOffsetMap = cfmOffsetMap.get(group);
if (topicPartOffsetMap == null) {
- Map<String, Set<String>> localGroups =
- zkOffsetStorage.getZkLocalGroupTopicInfos();
- result = localGroups.get(group);
+ List<String> groupLst = new ArrayList<>(1);
+ groupLst.add(group);
+ Map<String, Set<String>> groupTopicInfo =
+ zkOffsetStorage.queryZKGroupTopicInfo(groupLst);
+ result = groupTopicInfo.get(group);
} else {
for (OffsetStorageInfo storageInfo : topicPartOffsetMap.values()) {
result.add(storageInfo.getTopic());
@@ -496,6 +499,53 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
}
/***
+ * Delete offset.
+ *
+ * @param onlyMemory
+ * @param groupTopicPartMap
+ * @param modifier
+ */
+ @Override
+ public void deleteGroupOffset(boolean onlyMemory,
+ Map<String, Map<String, Set<Integer>>> groupTopicPartMap,
+ String modifier) {
+ String printBase;
+ StringBuilder strBuidler = new StringBuilder(512);
+ for (Map.Entry<String, Map<String, Set<Integer>>> entry
+ : groupTopicPartMap.entrySet()) {
+ if (entry.getKey() == null
+ || entry.getValue() == null
+ || entry.getValue().isEmpty()) {
+ continue;
+ }
+ rmvOffset(entry.getKey(), entry.getValue());
+ }
+ if (onlyMemory) {
+ printBase = strBuidler
+ .append("[Offset Manager] delete offset from memory by modifier=")
+ .append(modifier).toString();
+ } else {
+ zkOffsetStorage.deleteGroupOffsetInfo(groupTopicPartMap);
+ printBase = strBuidler
+ .append("[Offset Manager] delete offset from memory and zk by modifier=")
+ .append(modifier).toString();
+ }
+ strBuidler.delete(0, strBuidler.length());
+ // print log
+ for (Map.Entry<String, Map<String, Set<Integer>>> entry
+ : groupTopicPartMap.entrySet()) {
+ if (entry.getKey() == null
+ || entry.getValue() == null
+ || entry.getValue().isEmpty()) {
+ continue;
+ }
+ logger.info(strBuidler.append(printBase).append(",group=").append(entry.getKey())
+ .append(",topic-partId-map=").append(entry.getValue()).toString());
+ strBuidler.delete(0, strBuidler.length());
+ }
+ }
+
+ /***
* Set temp offset.
*
* @param group
@@ -611,6 +661,50 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
return regInfo;
}
+ private void rmvOffset(String group, Map<String, Set<Integer>> topicPartMap) {
+ if (group == null
+ || topicPartMap == null
+ || topicPartMap.isEmpty()) {
+ return;
+ }
+ // remove confirm offset
+ ConcurrentHashMap<String, OffsetStorageInfo> regInfoMap = cfmOffsetMap.get(group);
+ if (regInfoMap != null) {
+ for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+ if (entry.getKey() == null
+ || entry.getValue() == null
+ || entry.getValue().isEmpty()) {
+ continue;
+ }
+ for (Integer partitionId : entry.getValue()) {
+ String offsetCacheKey = getOffsetCacheKey(entry.getKey(), partitionId);
+ regInfoMap.remove(offsetCacheKey);
+ }
+ }
+ if (regInfoMap.isEmpty()) {
+ cfmOffsetMap.remove(group);
+ }
+ }
+ // remove tmp offset
+ ConcurrentHashMap<String, Long> tmpRegInfoMap = tmpOffsetMap.get(group);
+ if (tmpRegInfoMap != null) {
+ for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+ if (entry.getKey() == null
+ || entry.getValue() == null
+ || entry.getValue().isEmpty()) {
+ continue;
+ }
+ for (Integer partitionId : entry.getValue()) {
+ String offsetCacheKey = getOffsetCacheKey(entry.getKey(), partitionId);
+ tmpRegInfoMap.remove(offsetCacheKey);
+ }
+ }
+ if (tmpRegInfoMap.isEmpty()) {
+ tmpOffsetMap.remove(group);
+ }
+ }
+ }
+
private String getOffsetCacheKey(String topic, int partitionId) {
return new StringBuilder(256).append(topic)
.append("-").append(partitionId).toString();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
index 9dcd29a..4a19798 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
@@ -72,4 +72,8 @@ public interface OffsetService {
boolean modifyGroupOffset(Set<String> groups,
List<Tuple3<String, Integer, Long>> topicPartOffsets,
String modifier);
+
+ void deleteGroupOffset(boolean onlyMemory,
+ Map<String, Map<String, Set<Integer>>> groupTopicPartMap,
+ String modifier);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index c76a6b7..1f2fb5d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -95,10 +95,13 @@ public class BrokerAdminServlet extends AbstractWebHandler {
// set or update group's offset info
innRegisterWebMethod("admin_set_offset",
"adminSetGroupOffSet");
+ // remove group's offset info
+ innRegisterWebMethod("admin_rmv_offset",
+ "adminRemoveGroupOffSet");
}
public void adminQueryAllMethods(HttpServletRequest req,
- StringBuilder sBuilder) throws Exception {
+ StringBuilder sBuilder) {
int index = 0;
List<String> methods = getSupportedMethod();
sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
@@ -120,11 +123,11 @@ public class BrokerAdminServlet extends AbstractWebHandler {
* @throws Exception
*/
public void adminQueryBrokerAllConsumerInfo(HttpServletRequest req,
- StringBuilder sBuilder) throws Exception {
+ StringBuilder sBuilder) {
int index = 0;
- ProcessResult result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSGROUPNAME, false, null);
- if (!result.success) {
+ ProcessResult result = new ProcessResult();
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, false, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
@@ -209,10 +212,10 @@ public class BrokerAdminServlet extends AbstractWebHandler {
* @throws Exception
*/
public void adminQueryBrokerAllMessageStoreInfo(HttpServletRequest req,
- StringBuilder sBuilder) throws Exception {
- ProcessResult result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, false, null);
- if (!result.success) {
+ StringBuilder sBuilder) {
+ ProcessResult result = new ProcessResult();
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
@@ -276,17 +279,16 @@ public class BrokerAdminServlet extends AbstractWebHandler {
* @throws Exception
*/
public void adminGetMemStoreStatisInfo(HttpServletRequest req,
- StringBuilder sBuilder) throws Exception {
- ProcessResult result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, false, null);
- if (!result.success) {
+ StringBuilder sBuilder) {
+ ProcessResult result = new ProcessResult();
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
Set<String> topicNameSet = (Set<String>) result.retData1;
- result = WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.NEEDREFRESH, false, false);
- if (!result.success) {
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.NEEDREFRESH, false, false, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
@@ -335,38 +337,34 @@ public class BrokerAdminServlet extends AbstractWebHandler {
* @throws Exception
*/
public void adminManualSetCurrentOffSet(HttpServletRequest req,
- StringBuilder sBuilder) throws Exception {
- ProcessResult result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.TOPICNAME, true, null);
- if (!result.success) {
+ StringBuilder sBuilder) {
+ ProcessResult result = new ProcessResult();
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.TOPICNAME, true, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
final String topicName = (String) result.retData1;
- result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.GROUPNAME, true, null);
- if (!result.success) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.GROUPNAME, true, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
final String groupName = (String) result.retData1;
- result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.MODIFYUSER, true, null);
- if (!result.success) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.MODIFYUSER, true, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
final String modifyUser = (String) result.retData1;
- result = WebParameterUtils.getIntParamValue(req,
- WebFieldDef.PARTITIONID, true, -1, 0);
- if (!result.success) {
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.PARTITIONID, true, -1, 0, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
int partitionId = (Integer) result.retData1;
- result = WebParameterUtils.getLongParamValue(req,
- WebFieldDef.MANUALOFFSET, true, -1);
- if (!result.success) {
+ if (!WebParameterUtils.getLongParamValue(req,
+ WebFieldDef.MANUALOFFSET, true, -1, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
@@ -429,23 +427,21 @@ public class BrokerAdminServlet extends AbstractWebHandler {
*/
public void adminQuerySnapshotMessageSet(HttpServletRequest req,
StringBuilder sBuilder) throws Exception {
- ProcessResult result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.TOPICNAME, true, null);
- if (!result.success) {
+ ProcessResult result = new ProcessResult();
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.TOPICNAME, true, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
final String topicName = (String) result.retData1;
- result = WebParameterUtils.getIntParamValue(req,
- WebFieldDef.PARTITIONID, true, -1, 0);
- if (!result.success) {
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.PARTITIONID, true, -1, 0, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
int partitionId = (Integer) result.retData1;
- result = WebParameterUtils.getIntParamValue(req,
- WebFieldDef.MSGCOUNT, false, 3, 3);
- if (!result.success) {
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.MSGCOUNT, false, 3, 3, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
@@ -457,9 +453,8 @@ public class BrokerAdminServlet extends AbstractWebHandler {
.append("\"}");
return;
}
- result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.FILTERCONDS, false, null);
- if (!result.success) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.FILTERCONDS, false, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
@@ -477,31 +472,27 @@ public class BrokerAdminServlet extends AbstractWebHandler {
*/
public void adminQueryCurrentGroupOffSet(HttpServletRequest req,
StringBuilder sBuilder) {
- ProcessResult result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.TOPICNAME, true, null);
- if (!result.success) {
+ ProcessResult result = new ProcessResult();
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.TOPICNAME, true, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
final String topicName = (String) result.retData1;
- result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.GROUPNAME, true, null);
- if (!result.success) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.GROUPNAME, true, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
final String groupName = (String) result.retData1;
- result = WebParameterUtils.getIntParamValue(req,
- WebFieldDef.PARTITIONID, true, -1, 0);
- if (!result.success) {
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.PARTITIONID, true, -1, 0, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
int partitionId = (Integer) result.retData1;
-
- result = WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.REQUIREREALOFFSET, false, false);
- if (!result.success) {
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.REQUIREREALOFFSET, false, false, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
@@ -567,8 +558,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
Map<String, ConsumerNodeInfo> map =
broker.getBrokerServiceServer().getConsumerRegisterMap();
int totalCnt = 0;
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",")
- .append(",\"dataSet\":[");
+ sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
for (Entry<String, ConsumerNodeInfo> entry : map.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
@@ -592,10 +582,10 @@ public class BrokerAdminServlet extends AbstractWebHandler {
*/
public void adminQueryPubInfo(HttpServletRequest req,
StringBuilder sBuilder) {
+ ProcessResult result = new ProcessResult();
// get the topic set to be queried
- ProcessResult result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, false, null);
- if (!result.success) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
@@ -637,9 +627,9 @@ public class BrokerAdminServlet extends AbstractWebHandler {
public void adminQueryBookedGroup(HttpServletRequest req,
StringBuilder sBuilder) {
// get divide info
- ProcessResult result = WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.WITHDIVIDE, false, false);
- if (!result.success) {
+ ProcessResult result = new ProcessResult();
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.WITHDIVIDE, false, false, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
@@ -697,16 +687,24 @@ public class BrokerAdminServlet extends AbstractWebHandler {
*/
public void adminQueryGroupOffSet(HttpServletRequest req,
StringBuilder sBuilder) {
+ ProcessResult result = new ProcessResult();
// get group list
- ProcessResult result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSGROUPNAME, false, null);
- if (!result.success) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, false, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
+ Set<String> inGroupNameSet = (Set<String>) result.retData1;
+ // get the topic set to be queried
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ // get target consume group name
+ Set<String> topicSet = (Set<String>) result.retData1;
// filter invalid groups
Set<String> qryGroupNameSet = new HashSet<>();
- Set<String> inGroupNameSet = (Set<String>) result.retData1;
Set<String> bookedGroupSet = broker.getOffsetManager().getBookedGroups();
if (inGroupNameSet.isEmpty()) {
qryGroupNameSet = bookedGroupSet;
@@ -717,19 +715,10 @@ public class BrokerAdminServlet extends AbstractWebHandler {
}
}
}
- // get the topic set to be queried
- result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, false, null);
- if (!result.success) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return;
- }
- // get target consume group name
- Set<String> topicSet = (Set<String>) result.retData1;
// verify the acquired Topic set and
// query the corresponding offset information
Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> groupOffsetMaps =
- getGroupOffsetInfo(qryGroupNameSet, topicSet);
+ getGroupOffsetInfo(WebFieldDef.COMPSGROUPNAME, qryGroupNameSet, topicSet);
// builder result
int totalCnt = 0;
sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
@@ -772,26 +761,24 @@ public class BrokerAdminServlet extends AbstractWebHandler {
*/
public void adminSetGroupOffSet(HttpServletRequest req,
StringBuilder sBuilder) {
+ ProcessResult result = new ProcessResult();
// get group list
- ProcessResult result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSGROUPNAME, true, null);
- if (!result.success) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, true, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
Set<String> groupNameSet = (Set<String>) result.retData1;
// get set mode
- result = WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.MANUALSET, true, false);
- if (!result.success) {
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.MANUALSET, true, false, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
boolean manualSet = (Boolean) result.retData1;
// get modify user
- result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.MODIFYUSER, true, null);
- if (!result.success) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.MODIFYUSER, true, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
@@ -799,17 +786,15 @@ public class BrokerAdminServlet extends AbstractWebHandler {
final String modifier = (String) result.retData1;
if (manualSet) {
// get offset json info
- result = WebParameterUtils.getJsonDictParamValue(req,
- WebFieldDef.OFFSETJSON, true, null);
- if (!result.success) {
+ if (!WebParameterUtils.getJsonDictParamValue(req,
+ WebFieldDef.OFFSETJSON, true, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
Map<String, Long> manOffsets =
(Map<String, Long>) result.retData1;
// valid and transfer offset format
- result = validManOffsetResetInfo(WebFieldDef.OFFSETJSON, manOffsets);
- if (!result.success) {
+ if (!validManOffsetResetInfo(WebFieldDef.OFFSETJSON, manOffsets, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
@@ -817,9 +802,8 @@ public class BrokerAdminServlet extends AbstractWebHandler {
(List<Tuple3<String, Integer, Long>>) result.retData1;
} else {
// get the topic set to be set
- result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, true, null);
- if (!result.success) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, true, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
@@ -840,38 +824,43 @@ public class BrokerAdminServlet extends AbstractWebHandler {
*/
public void adminCloneGroupOffSet(HttpServletRequest req,
StringBuilder sBuilder) {
+ ProcessResult result = new ProcessResult();
// get source consume group name
- ProcessResult result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.SRCGROUPNAME, true, null);
- if (!result.success) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.SRCGROUPNAME, true, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
final String srcGroupName = (String) result.retData1;
- // get modify user
- result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.MODIFYUSER, true, null);
- if (!result.success) {
+ // get source consume group's topic set cloned to target group
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
- final String modifier = (String) result.retData1;
- // get source consume group's topic set cloned to target group
- result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, false, null);
- if (!result.success) {
+ Set<String> srcTopicNameSet = (Set<String>) result.retData1;
+ // valid topic and get topic's partitionIds
+ if (!validAndGetTopicPartInfo(srcGroupName,
+ WebFieldDef.SRCGROUPNAME, srcTopicNameSet, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
+ Map<String, Set<Integer>> topicPartMap =
+ (Map<String, Set<Integer>>) result.retData1;
// get target consume group name
- Set<String> srcTopicNameSet = (Set<String>) result.retData1;
- result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.TGTCOMPSGROUPNAME, true, null);
- if (!result.success) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.TGTCOMPSGROUPNAME, true, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return;
}
Set<String> tgtGroupNameSet = (Set<String>) result.retData1;
+ // get modify user
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.MODIFYUSER, true, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ final String modifier = (String) result.retData1;
// check sourceGroup if existed
Set<String> bookedGroups = broker.getOffsetManager().getBookedGroups();
if (!bookedGroups.contains(srcGroupName)) {
@@ -882,16 +871,6 @@ public class BrokerAdminServlet extends AbstractWebHandler {
.append(" has not been registered on this Broker!").toString());
return;
}
- // valid topic and get topic's partitionIds
- Map<String, Set<Integer>> topicPartMap =
- validAndGetPartitions(srcGroupName, srcTopicNameSet);
- if (topicPartMap.isEmpty()) {
- WebParameterUtils.buildFailResult(sBuilder,
- new StringBuilder(512).append("Parameter ")
- .append(WebFieldDef.SRCGROUPNAME.name).append(": not found ")
- .append(srcGroupName).append(" subscribed topic set!").toString());
- return;
- }
// query offset from source group
Map<String, Map<Integer, Tuple2<Long, Long>>> srcGroupOffsets =
broker.getOffsetManager().queryGroupOffset(srcGroupName, topicPartMap);
@@ -904,6 +883,56 @@ public class BrokerAdminServlet extends AbstractWebHandler {
sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
}
+ /***
+ * Remove consume group offset.
+ *
+ * @param req
+ * @param sBuilder process result
+ */
+ public void adminRemoveGroupOffSet(HttpServletRequest req,
+ StringBuilder sBuilder) {
+ ProcessResult result = new ProcessResult();
+ // get consume group name
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, true, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ Set<String> groupNameSet = (Set<String>) result.retData1;
+ // get modify user
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.MODIFYUSER, true, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ final String modifier = (String) result.retData1;
+ // get need removed offset's topic
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ // get target consume group name
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ // get set mode
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.ONLYMEM, false, false, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ boolean onlyMemory = (Boolean) result.retData1;
+ if (!validAndGetGroupTopicInfo(groupNameSet, topicNameSet, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ Map<String, Map<String, Set<Integer>>> groupTopicPartMap =
+ (Map<String, Map<String, Set<Integer>>>) result.retData1;
+ broker.getOffsetManager().deleteGroupOffset(
+ onlyMemory, groupTopicPartMap, modifier);
+ // builder return result
+ sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
+ }
+
// build reset offset info
private List<Tuple3<String, Integer, Long>> buildOffsetResetInfo(
Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap) {
@@ -950,8 +979,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
List<Tuple3<String, Integer, Long>> result = new ArrayList<>();
MessageStoreManager storeManager = broker.getStoreManager();
// get topic's partition set
- Map<String, Set<Integer>> topicPartMap =
- validAndGetPartitions(null, topicSet);
+ Map<String, Set<Integer>> topicPartMap = getTopicPartitions(topicSet);
// fill current topic's max offset value
for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
if (entry.getKey() == null
@@ -979,15 +1007,15 @@ public class BrokerAdminServlet extends AbstractWebHandler {
}
// build reset offset info
- private ProcessResult validManOffsetResetInfo(WebFieldDef fieldDef,
- Map<String, Long> manOffsetInfoMap) {
+ private boolean validManOffsetResetInfo(WebFieldDef fieldDef,
+ Map<String, Long> manOffsetInfoMap,
+ ProcessResult result) {
String brokerId;
String topicName;
String strPartId;
int partitionId;
long adjOffset;
MessageStore store = null;
- ProcessResult procResult = new ProcessResult();
MessageStoreManager storeManager = broker.getStoreManager();
List<Tuple3<String, Integer, Long>> offsetVals = new ArrayList<>();
String localBrokerId = String.valueOf(broker.getTubeConfig().getBrokerId());
@@ -1001,12 +1029,12 @@ public class BrokerAdminServlet extends AbstractWebHandler {
// parse and check partitionKey value
String[] keyItems = entry.getKey().split(TokenConstants.ATTR_SEP);
if (keyItems.length != 3) {
- procResult.setFailResult(fieldDef.id,
+ result.setFailResult(fieldDef.id,
new StringBuilder(512).append("Parameter ")
.append(fieldDef.name).append("'s key invalid:")
.append(entry.getKey())
.append(" must be brokerId:topicName:partitionId !").toString());
- return procResult;
+ return result.success;
}
brokerId = keyItems[0].trim();
topicName = keyItems[1].trim();
@@ -1018,12 +1046,12 @@ public class BrokerAdminServlet extends AbstractWebHandler {
try {
partitionId = Integer.parseInt(strPartId);
} catch (NumberFormatException e) {
- procResult.setFailResult(fieldDef.id,
+ result.setFailResult(fieldDef.id,
new StringBuilder(512).append("Parameter ")
.append(fieldDef.name).append("'s key invalid:")
.append(entry.getKey())
.append("'s partitionId value not number!").toString());
- return procResult;
+ return result.success;
}
// check and adjust offset value
try {
@@ -1041,65 +1069,129 @@ public class BrokerAdminServlet extends AbstractWebHandler {
offsetVals.add(new Tuple3<>(topicName, partitionId, adjOffset));
}
if (offsetVals.isEmpty()) {
- procResult.setFailResult(fieldDef.id,
+ result.setFailResult(fieldDef.id,
new StringBuilder(512).append("Parameter ")
- .append(fieldDef.name)
- .append("'s value is invalid!").toString());
+ .append(fieldDef.name).append("'s value is invalid!").toString());
} else {
- procResult.setSuccResult(offsetVals);
+ result.setSuccResult(offsetVals);
}
- return procResult;
+ return result.success;
}
// builder group's offset info
private Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> getGroupOffsetInfo(
- Set<String> groupSet, Set<String> topicSet) {
- long curReadDataOffset = -2;
- long curDataLag = -2;
+ WebFieldDef groupFldDef, Set<String> groupSet, Set<String> topicSet) {
+ ProcessResult result = new ProcessResult();
Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> groupOffsetMaps = new HashMap<>();
for (String group : groupSet) {
Map<String, Map<Integer, GroupOffsetInfo>> topicOffsetRet = new HashMap<>();
// valid and get topic's partitionIds
- Map<String, Set<Integer>> topicPartMap = validAndGetPartitions(group, topicSet);
- // get topic's publish info
- Map<String, Map<Integer, TopicPubStoreInfo>> topicStorePubInfoMap =
- broker.getStoreManager().getTopicPublishInfos(topicPartMap.keySet());
- // get group's booked offset info
- Map<String, Map<Integer, Tuple2<Long, Long>>> groupOffsetMap =
- broker.getOffsetManager().queryGroupOffset(group, topicPartMap);
- // get offset info array
- for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
- String topic = entry.getKey();
- Map<Integer, GroupOffsetInfo> partOffsetRet = new HashMap<>();
- Map<Integer, TopicPubStoreInfo> storeInfoMap = topicStorePubInfoMap.get(topic);
- Map<Integer, Tuple2<Long, Long>> partBookedMap = groupOffsetMap.get(topic);
- for (Integer partitionId : entry.getValue()) {
- GroupOffsetInfo offsetInfo = new GroupOffsetInfo(partitionId);
- offsetInfo.setPartPubStoreInfo(storeInfoMap.get(partitionId));
- offsetInfo.setConsumeOffsetInfo(partBookedMap.get(partitionId));
- String queryKey = buildQueryID(group, topic, partitionId);
- ConsumerNodeInfo nodeInfo = broker.getConsumerNodeInfo(queryKey);
- if (nodeInfo != null) {
- offsetInfo.setConsumeDataOffsetInfo(nodeInfo.getLastDataRdOffset());
+ if (validAndGetTopicPartInfo(group, groupFldDef, topicSet, result)) {
+ Map<String, Set<Integer>> topicPartMap =
+ (Map<String, Set<Integer>>) result.retData1;
+ // get topic's publish info
+ Map<String, Map<Integer, TopicPubStoreInfo>> topicStorePubInfoMap =
+ broker.getStoreManager().getTopicPublishInfos(topicPartMap.keySet());
+ // get group's booked offset info
+ Map<String, Map<Integer, Tuple2<Long, Long>>> groupOffsetMap =
+ broker.getOffsetManager().queryGroupOffset(group, topicPartMap);
+ // get offset info array
+ for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+ String topic = entry.getKey();
+ Map<Integer, GroupOffsetInfo> partOffsetRet = new HashMap<>();
+ Map<Integer, TopicPubStoreInfo> storeInfoMap = topicStorePubInfoMap.get(topic);
+ Map<Integer, Tuple2<Long, Long>> partBookedMap = groupOffsetMap.get(topic);
+ for (Integer partitionId : entry.getValue()) {
+ GroupOffsetInfo offsetInfo = new GroupOffsetInfo(partitionId);
+ offsetInfo.setPartPubStoreInfo(
+ storeInfoMap == null ? null :storeInfoMap.get(partitionId));
+ offsetInfo.setConsumeOffsetInfo(
+ partBookedMap == null ? null : partBookedMap.get(partitionId));
+ String queryKey = buildQueryID(group, topic, partitionId);
+ ConsumerNodeInfo nodeInfo = broker.getConsumerNodeInfo(queryKey);
+ if (nodeInfo != null) {
+ offsetInfo.setConsumeDataOffsetInfo(nodeInfo.getLastDataRdOffset());
+ }
+ offsetInfo.calculateLag();
+ partOffsetRet.put(partitionId, offsetInfo);
}
- offsetInfo.calculateLag();
- partOffsetRet.put(partitionId, offsetInfo);
+ topicOffsetRet.put(topic, partOffsetRet);
}
- topicOffsetRet.put(topic, partOffsetRet);
}
groupOffsetMaps.put(group, topicOffsetRet);
}
return groupOffsetMaps;
}
+ // valid and get need removed group-topic info
+ private boolean validAndGetGroupTopicInfo(Set<String> groupSet,
+ Set<String> topicSet,
+ ProcessResult result) {
+ Map<String, Map<String, Set<Integer>>> groupTopicPartMap = new HashMap<>();
+ // filter group
+ Set<String> targetGroupSet = new HashSet<>();
+ Set<String> bookedGroups = broker.getOffsetManager().getBookedGroups();
+ for (String orgGroup : groupSet) {
+ if (bookedGroups.contains(orgGroup)) {
+ targetGroupSet.add(orgGroup);
+ }
+ }
+ // valid specified topic set
+ for (String group : targetGroupSet) {
+ if (validAndGetTopicPartInfo(group, WebFieldDef.GROUPNAME, topicSet, result)) {
+ Map<String, Set<Integer>> topicPartMap =
+ (Map<String, Set<Integer>>) result.retData1;
+ groupTopicPartMap.put(group, topicPartMap);
+ }
+ }
+ result.setSuccResult(groupTopicPartMap);
+ return true;
+ }
- private Map<String, Set<Integer>> validAndGetPartitions(String group, Set<String> topicSet) {
- Map<String, Set<Integer>> topicPartMap = new HashMap<>();
- // query stored topic set stored in memory or zk
- if (topicSet.isEmpty() && group != null) {
- topicSet = broker.getOffsetManager().getGroupSubInfo(group);
+ private boolean validAndGetTopicPartInfo(String groupName,
+ WebFieldDef groupFldDef,
+ Set<String> topicSet,
+ ProcessResult result) {
+ Set<String> subTopicSet =
+ broker.getOffsetManager().getGroupSubInfo(groupName);
+ if (subTopicSet == null || subTopicSet.isEmpty()) {
+ result.setFailResult(400, new StringBuilder(512)
+ .append("Parameter ").append(groupFldDef.name)
+ .append(": subscribed topic set of ").append(groupName)
+ .append(" query result is null!").toString());
+ return result.success;
}
- // get topic's partitionIds
+ // filter valid topic set
+ Set<String> tgtTopicSet = new HashSet<>();
+ if (topicSet.isEmpty()) {
+ tgtTopicSet = subTopicSet;
+ } else {
+ for (String topic : topicSet) {
+ if (subTopicSet.contains(topic)) {
+ tgtTopicSet.add(topic);
+ }
+ }
+ if (tgtTopicSet.isEmpty()) {
+ result.setFailResult(400, new StringBuilder(512)
+ .append("Parameter ").append(groupFldDef.name)
+ .append(": ").append(groupName)
+ .append(" unsubscribed to the specified topic set!").toString());
+ return result.success;
+ }
+ }
+ Map<String, Set<Integer>> topicPartMap = getTopicPartitions(tgtTopicSet);
+ if (topicPartMap.isEmpty()) {
+ result.setFailResult(400, new StringBuilder(512)
+ .append("Parameter ").append(groupFldDef.name)
+ .append(": all topics subscribed by the group have been deleted!").toString());
+ return result.success;
+ }
+ result.setSuccResult(topicPartMap);
+ return result.success;
+ }
+
+ private Map<String, Set<Integer>> getTopicPartitions(Set<String> topicSet) {
+ Map<String, Set<Integer>> topicPartMap = new HashMap<>();
if (topicSet != null) {
Map<String, TopicMetadata> topicConfigMap =
broker.getMetadataManager().getTopicConfigMap();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index 45b862d..a65a223 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -81,8 +81,10 @@ public enum WebFieldDef {
TBaseConstants.META_MAX_GROUPNAME_LENGTH, RegexDef.TMP_GROUP),
MANUALSET(20, "manualSet", "manSet",
WebFieldType.BOOLEAN, "Whether manual offset setting mode"),
- OFFSETJSON(21, "offsetJsonSet", "offsetSet",
- WebFieldType.JSONTYPE, "The offset set that needs to be added or modified");
+ OFFSETJSON(21, "offsetJsonInfo", "offsetInfo",
+ WebFieldType.JSONTYPE, "The offset info that needs to be added or modified"),
+ ONLYMEM(22, "onlyMemory", "onlyMem", WebFieldType.BOOLEAN,
+ "Only clear the offset data in the memory cache, default is false");
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fileconfig/ZKConfig.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fileconfig/ZKConfig.java
index 5ac3683..8e677a5 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fileconfig/ZKConfig.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fileconfig/ZKConfig.java
@@ -28,7 +28,6 @@ public class ZKConfig {
private int zkSyncTimeMs = 1000;
private long zkCommitPeriodMs = 5000L;
private int zkCommitFailRetries = TServerConstants.CFG_ZK_COMMIT_DEFAULT_RETRIES;
-
public ZKConfig() {
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java
index dca7ca8..f5eee40 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java
@@ -18,6 +18,7 @@
package org.apache.tubemq.server.common.offsetstorage;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -33,10 +34,12 @@ public interface OffsetStorage {
final Collection<OffsetStorageInfo> offsetInfoList,
boolean isFailRetry);
- Map<String, Map<String, Set<String>>> getZkGroupTopicBrokerInfos();
+ Map<String, Set<String>> queryZkAllGroupTopicInfos();
- Map<String, Set<String>> getZkLocalGroupTopicInfos();
+ Map<String, Set<String>> queryZKGroupTopicInfo(List<String> groupSet);
Map<Integer, Long> queryGroupOffsetInfo(String group, String topic,
Set<Integer> partitionIds);
+
+ void deleteGroupOffsetInfo(Map<String, Map<String, Set<Integer>>> groupTopicPartMap);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
index 8094151..3700753 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
@@ -35,6 +35,8 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+
/**
* A offset storage implementation with zookeeper
*/
@@ -64,18 +66,16 @@ public class ZkOffsetStorage implements OffsetStorage {
private final String consumerZkDir;
private final boolean isBroker;
private final int brokerId;
+ private final String strBrokerId;
private ZKConfig zkConfig;
private ZooKeeperWatcher zkw;
- // group-topic-brokerid
- private final Map<String, Map<String, Set<String>>> zkGroupTopicBrokerInfos = new HashMap<>();
- // group-topic
- private final Map<String, Set<String>> zkLocalGroupTopicInfos = new HashMap<>();
public ZkOffsetStorage(final ZKConfig zkConfig, boolean isBroker, int brokerId) {
this.zkConfig = zkConfig;
- this.brokerId = brokerId;
this.isBroker = isBroker;
+ this.brokerId = brokerId;
+ this.strBrokerId = String.valueOf(brokerId);
this.tubeZkRoot = normalize(this.zkConfig.getZkNodeRoot());
this.consumerZkDir = this.tubeZkRoot + "/consumers-v3";
try {
@@ -86,8 +86,6 @@ public class ZkOffsetStorage implements OffsetStorage {
.append(this.zkConfig.getZkServerAddr()).append(") !").toString(), e);
System.exit(1);
}
- logger.info("[ZkOffsetStorage] Get group-topic-broker info from ZooKeeper");
- queryAllZKGroupTopicInfo();
logger.info("[ZkOffsetStorage] ZooKeeper Offset Storage initiated!");
}
@@ -102,16 +100,6 @@ public class ZkOffsetStorage implements OffsetStorage {
}
@Override
- public Map<String, Map<String, Set<String>>> getZkGroupTopicBrokerInfos() {
- return zkGroupTopicBrokerInfos;
- }
-
- @Override
- public Map<String, Set<String>> getZkLocalGroupTopicInfos() {
- return zkLocalGroupTopicInfos;
- }
-
- @Override
public void commitOffset(final String group,
final Collection<OffsetStorageInfo> offsetInfoList,
boolean isFailRetry) {
@@ -242,63 +230,130 @@ public class ZkOffsetStorage implements OffsetStorage {
}
/**
- * Get group-topic-brokerid map info stored in zookeeper.
- * <p/>
- * The broker only cares about the content of its own node,
- * so this part only queries when the node starts, and
- * caches relevant data in the memory for finding
- *
+ * Query booked topic info of groups stored in zookeeper.
+ * @param groupSet query groups
+ * @return group--topic map info
*/
- private void queryAllZKGroupTopicInfo() {
+ @Override
+ public Map<String, Set<String>> queryZKGroupTopicInfo(List<String> groupSet) {
+ String qryBrokerId;
+ Map<String, Set<String>> groupTopicMap = new HashMap<>();
StringBuilder sBuider = new StringBuilder(512);
- // get all booked groups name
+ if (groupSet == null || groupSet.isEmpty()) {
+ return groupTopicMap;
+ }
+ // build path base
String groupNode = sBuider.append(this.consumerZkDir).toString();
- List<String> bookedGroups = ZKUtil.getChildren(this.zkw, groupNode);
sBuider.delete(0, sBuider.length());
- if (bookedGroups != null) {
- // get topic info by group
- for (String group : bookedGroups) {
- String topicNode = sBuider.append(groupNode)
- .append("/").append(group).append("/offsets").toString();
- List<String> consumeTopics = ZKUtil.getChildren(this.zkw, topicNode);
- sBuider.delete(0, sBuider.length());
- Set<String> topicSet = new HashSet<>();
- Map<String, Set<String>> topicBrokerSet = new HashMap<>();
- if (consumeTopics != null) {
- // get broker info by topic
- for (String topic : consumeTopics) {
- String brokerNode = sBuider.append(topicNode)
- .append("/").append(topic).toString();
- List<String> brokerIds = ZKUtil.getChildren(this.zkw, brokerNode);
- sBuider.delete(0, sBuider.length());
- Set<String> brokerIdSet = new HashSet<>();
- if (brokerIds != null) {
- for (String idStr : brokerIds) {
- if (idStr != null) {
- String[] brokerPartIdStrs =
- idStr.split(TokenConstants.HYPHEN);
- brokerIdSet.add(brokerPartIdStrs[0]);
+ // get the group managed by this broker
+ for (String group : groupSet) {
+ String topicNode = sBuider.append(groupNode)
+ .append("/").append(group).append("/offsets").toString();
+ List<String> consumeTopics = ZKUtil.getChildren(this.zkw, topicNode);
+ sBuider.delete(0, sBuider.length());
+ Set<String> topicSet = new HashSet<>();
+ if (consumeTopics != null) {
+ for (String topic : consumeTopics) {
+ if (topic == null) {
+ continue;
+ }
+ String brokerNode = sBuider.append(topicNode)
+ .append("/").append(topic).toString();
+ List<String> brokerIds = ZKUtil.getChildren(this.zkw, brokerNode);
+ sBuider.delete(0, sBuider.length());
+ if (brokerIds != null) {
+ for (String idStr : brokerIds) {
+ if (idStr != null) {
+ String[] brokerPartIdStrs =
+ idStr.split(TokenConstants.HYPHEN);
+ qryBrokerId = brokerPartIdStrs[0];
+ if (qryBrokerId != null
+ && strBrokerId.equals(qryBrokerId.trim())) {
+ topicSet.add(topic);
+ break;
}
}
- if (isBroker && brokerIdSet.contains(String.valueOf(brokerId))) {
- topicSet.add(topic);
- }
}
- topicBrokerSet.put(topic, brokerIdSet);
}
}
- if (!topicSet.isEmpty()) {
- zkLocalGroupTopicInfos.put(group, topicSet);
+ }
+ if (!topicSet.isEmpty()) {
+ groupTopicMap.put(group, topicSet);
+ }
+ }
+ return groupTopicMap;
+ }
+
+ /**
+ * Get group-topic map info stored in zookeeper.
+ * <p/>
+ * The broker only cares about the content of its own node
+ *
+ */
+ @Override
+ public Map<String, Set<String>> queryZkAllGroupTopicInfos() {
+ StringBuilder sBuider = new StringBuilder(512);
+ // get all booked groups name
+ String groupNode = sBuider.append(this.consumerZkDir).toString();
+ List<String> bookedGroups = ZKUtil.getChildren(this.zkw, groupNode);
+ return queryZKGroupTopicInfo(bookedGroups);
+ }
+
+ /**
+ * Get offset stored in zookeeper, if not found or error, set null
+ * <p/>
+ *
+ * @return partitionId--offset map info
+ */
+ @Override
+ public void deleteGroupOffsetInfo(
+ Map<String, Map<String, Set<Integer>>> groupTopicPartMap) {
+ StringBuilder sBuider = new StringBuilder(512);
+ for (Map.Entry<String, Map<String, Set<Integer>>> entry
+ : groupTopicPartMap.entrySet()) {
+ if (entry.getKey() == null
+ || entry.getValue() == null
+ || entry.getValue().isEmpty()) {
+ continue;
+ }
+ String basePath = sBuider.append(this.consumerZkDir).append("/")
+ .append(entry.getKey()).append("/offsets").toString();
+ sBuider.delete(0, sBuider.length());
+ Map<String, Set<Integer>> topicPartMap = entry.getValue();
+ for (Map.Entry<String, Set<Integer>> topicEntry : topicPartMap.entrySet()) {
+ if (topicEntry.getKey() == null
+ || topicEntry.getValue() == null
+ || topicEntry.getValue().isEmpty()) {
+ continue;
+ }
+ Set<Integer> partIdSet = topicEntry.getValue();
+ for (Integer partitionId : partIdSet) {
+ String offsetNode = sBuider.append(basePath).append("/")
+ .append(topicEntry.getKey()).append("/")
+ .append(brokerId).append(TokenConstants.HYPHEN)
+ .append(partitionId).toString();
+ sBuider.delete(0, sBuider.length());
+ ZKUtil.delZNode(this.zkw, offsetNode);
}
- zkGroupTopicBrokerInfos.put(group, topicBrokerSet);
+ String parentNode = sBuider.append(basePath).append("/")
+ .append(topicEntry.getKey()).toString();
+ sBuider.delete(0, sBuider.length());
+ chkAndRmvBlankParentNode(parentNode);
}
+ chkAndRmvBlankParentNode(basePath);
+ String parentNode = sBuider.append(this.consumerZkDir)
+ .append("/").append(entry.getKey()).toString();
+ sBuider.delete(0, sBuider.length());
+ chkAndRmvBlankParentNode(parentNode);
}
- logger.info(new StringBuilder(256)
- .append("[ZkOffsetStorage] query from zookeeper, total group size = ")
- .append(zkGroupTopicBrokerInfos.size()).append(", local group size = ")
- .append(zkLocalGroupTopicInfos.size()).toString());
}
+ private void chkAndRmvBlankParentNode(String parentNode) {
+ List<String> nodeSet = ZKUtil.getChildren(zkw, parentNode);
+ if (nodeSet != null && nodeSet.isEmpty()) {
+ ZKUtil.delZNode(this.zkw, parentNode);
+ }
+ }
private String normalize(final String root) {
if (root.startsWith("/")) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java
index da86191..5eb6cea 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java
@@ -167,6 +167,20 @@ public class ZKUtil {
}
}
+ /**
+ * delete the specified znode.
+ *
+ * @param zkw zk reference
+ * @param znode path of node
+ */
+ public static void delZNode(ZooKeeperWatcher zkw, String znode) {
+ try {
+ zkw.getRecoverableZooKeeper().delete(znode, -1);
+ } catch (Throwable e) {
+ //
+ }
+ }
+
private static byte[] getDataInternal(ZooKeeperWatcher zkw, String znode, Stat stat,
boolean watcherSet) throws KeeperException {
try {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java
index 3688b1c..8cabf67 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java
@@ -44,10 +44,13 @@ public class ProcessResult {
this.success = false;
this.errCode = errCode;
this.errInfo = errMsg;
+ this.retData1 = null;
}
public void setSuccResult(Object retData) {
this.success = true;
+ this.errInfo = "";
+ this.errCode = TErrCodeConstants.SUCCESS;
this.retData1 = retData;
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index fddd5de..2318bdb 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -249,32 +249,32 @@ public class WebParameterUtils {
* @param fieldDef the parameter field definition
* @param required a boolean value represent whether the parameter is must required
* @param defValue a default value returned if failed to parse value from the given object
- * @return valid result for the parameter value
+ * @param result process result of parameter value
+ * @return process result
*/
- public static ProcessResult getLongParamValue(HttpServletRequest req,
- WebFieldDef fieldDef,
- boolean required,
- long defValue) {
- ProcessResult procResult =
- getStringParamValue(req, fieldDef, required, null);
- if (!procResult.success) {
- return procResult;
- }
- String paramValue = (String) procResult.retData1;
+ public static boolean getLongParamValue(HttpServletRequest req,
+ WebFieldDef fieldDef,
+ boolean required,
+ long defValue,
+ ProcessResult result) {
+ if (!getStringParamValue(req, fieldDef, required, null, result)) {
+ return result.success;
+ }
+ String paramValue = (String) result.retData1;
if (paramValue == null) {
- procResult.setSuccResult(defValue);
- return procResult;
+ result.setSuccResult(defValue);
+ return result.success;
}
try {
long paramIntVal = Long.parseLong(paramValue);
- procResult.setSuccResult(paramIntVal);
+ result.setSuccResult(paramIntVal);
} catch (Throwable e) {
- procResult.setFailResult(400,
+ result.setFailResult(400,
new StringBuilder(512).append("Parameter ")
.append(fieldDef.name).append(" parse error: ")
.append(e.getMessage()).toString());
}
- return procResult;
+ return result.success;
}
/**
@@ -283,43 +283,44 @@ public class WebParameterUtils {
* @param req Http Servlet Request
* @param fieldDef the parameter field definition
* @param required a boolean value represent whether the parameter is must required
- * @return valid result for the parameter value
+ * @param result process result of parameter value
+ * @return process result
*/
- public static ProcessResult getIntParamValue(HttpServletRequest req,
- WebFieldDef fieldDef,
- boolean required) {
- ProcessResult procResult =
- getStringParamValue(req, fieldDef, required, null);
- if (!procResult.success) {
- return procResult;
+ public static boolean getIntParamValue(HttpServletRequest req,
+ WebFieldDef fieldDef,
+ boolean required,
+ ProcessResult result) {
+ if (!getStringParamValue(req, fieldDef,
+ required, null, result)) {
+ return result.success;
}
- ProcessResult procRet = new ProcessResult();
Set<Integer> tgtValueSet = new HashSet<Integer>();
if (fieldDef.isCompFieldType()) {
- Set<String> valItemSet = (Set<String>) procResult.retData1;
+ Set<String> valItemSet = (Set<String>) result.retData1;
if (valItemSet.isEmpty()) {
- procResult.setSuccResult(tgtValueSet);
- return procResult;
+ result.setSuccResult(tgtValueSet);
+ return result.success;
}
for (String itemVal : valItemSet) {
- if (!checkIntValueNorms(procRet, fieldDef, itemVal, false, -1)) {
- return procRet;
+ if (!checkIntValueNorms(fieldDef,
+ itemVal, false, -1, result)) {
+ return result.success;
}
- tgtValueSet.add((Integer) procRet.retData1);
+ tgtValueSet.add((Integer) result.retData1);
}
} else {
- String paramValue = (String) procResult.retData1;
+ String paramValue = (String) result.retData1;
if (paramValue == null) {
- procResult.setSuccResult(tgtValueSet);
- return procResult;
+ result.setSuccResult(tgtValueSet);
+ return result.success;
}
- if (!checkIntValueNorms(procRet,
- fieldDef, paramValue, false, -1)) {
- tgtValueSet.add((Integer) procRet.retData1);
+ if (!checkIntValueNorms(fieldDef,
+ paramValue, false, -1, result)) {
+ tgtValueSet.add((Integer) result.retData1);
}
}
- procResult.setSuccResult(tgtValueSet);
- return procResult;
+ result.setSuccResult(tgtValueSet);
+ return result.success;
}
/**
@@ -330,43 +331,44 @@ public class WebParameterUtils {
* @param required a boolean value represent whether the parameter is must required
* @param defValue a default value returned if failed to parse value from the given object
* @param minValue min value required
- * @return valid result for the parameter value
+ * @param result process result of parameter value
+ * @return process result
*/
- public static ProcessResult getIntParamValue(HttpServletRequest req,
+ public static boolean getIntParamValue(HttpServletRequest req,
WebFieldDef fieldDef,
boolean required,
int defValue,
- int minValue) {
- ProcessResult procResult =
- getStringParamValue(req, fieldDef, required, null);
- if (!procResult.success) {
- return procResult;
+ int minValue,
+ ProcessResult result) {
+ if (!getStringParamValue(req, fieldDef, required, null, result)) {
+ return result.success;
}
if (fieldDef.isCompFieldType()) {
Set<Integer> tgtValueSet = new HashSet<Integer>();
- Set<String> valItemSet = (Set<String>) procResult.retData1;
+ Set<String> valItemSet = (Set<String>) result.retData1;
if (valItemSet.isEmpty()) {
tgtValueSet.add(defValue);
- procResult.setSuccResult(tgtValueSet);
- return procResult;
+ result.setSuccResult(tgtValueSet);
+ return result.success;
}
- ProcessResult procRet = new ProcessResult();
for (String itemVal : valItemSet) {
- if (!checkIntValueNorms(procRet, fieldDef, itemVal, true, minValue)) {
- return procRet;
+ if (!checkIntValueNorms(fieldDef,
+ itemVal, true, minValue, result)) {
+ return result.success;
}
- tgtValueSet.add((Integer) procRet.retData1);
+ tgtValueSet.add((Integer) result.retData1);
}
- procResult.setSuccResult(tgtValueSet);
+ result.setSuccResult(tgtValueSet);
} else {
- String paramValue = (String) procResult.retData1;
+ String paramValue = (String) result.retData1;
if (paramValue == null) {
- procResult.setSuccResult(defValue);
- return procResult;
+ result.setSuccResult(defValue);
+ return result.success;
}
- checkIntValueNorms(procResult, fieldDef, paramValue, true, minValue);
+ checkIntValueNorms(fieldDef,
+ paramValue, true, minValue, result);
}
- return procResult;
+ return result.success;
}
/**
@@ -376,24 +378,24 @@ public class WebParameterUtils {
* @param fieldDef the parameter field definition
* @param required a boolean value represent whether the parameter is must required
* @param defValue a default value returned if failed to parse value from the given object
+ * @param result process result
* @return valid result for the parameter value
*/
- public static ProcessResult getBooleanParamValue(HttpServletRequest req,
- WebFieldDef fieldDef,
- boolean required,
- boolean defValue) {
- ProcessResult procResult =
- getStringParamValue(req, fieldDef, required, null);
- if (!procResult.success) {
- return procResult;
- }
- String paramValue = (String) procResult.retData1;
+ public static boolean getBooleanParamValue(HttpServletRequest req,
+ WebFieldDef fieldDef,
+ boolean required,
+ boolean defValue,
+ ProcessResult result) {
+ if (!getStringParamValue(req, fieldDef, required, null, result)) {
+ return result.success;
+ }
+ String paramValue = (String) result.retData1;
if (paramValue == null) {
- procResult.setSuccResult(defValue);
- return procResult;
+ result.setSuccResult(defValue);
+ return result.success;
}
- procResult.setSuccResult(Boolean.parseBoolean(paramValue));
- return procResult;
+ result.setSuccResult(Boolean.parseBoolean(paramValue));
+ return result.success;
}
/**
@@ -403,13 +405,14 @@ public class WebParameterUtils {
* @param fieldDef the parameter field definition
* @param required a boolean value represent whether the parameter is must required
* @param defValue a default value returned if failed to parse value from the given object
+ * @param result process result
* @return valid result for the parameter value
*/
- public static ProcessResult getStringParamValue(HttpServletRequest req,
- WebFieldDef fieldDef,
- boolean required,
- String defValue) {
- ProcessResult procResult = new ProcessResult();
+ public static boolean getStringParamValue(HttpServletRequest req,
+ WebFieldDef fieldDef,
+ boolean required,
+ String defValue,
+ ProcessResult result) {
// get parameter value
String paramValue = req.getParameter(fieldDef.name);
if (paramValue == null) {
@@ -422,14 +425,14 @@ public class WebParameterUtils {
// Check if the parameter exists
if (TStringUtils.isBlank(paramValue)) {
if (required) {
- procResult.setFailResult(fieldDef.id,
+ result.setFailResult(fieldDef.id,
new StringBuilder(512).append("Parameter ")
.append(fieldDef.name)
.append(" is missing or value is null or blank!").toString());
} else {
- procStringDefValue(procResult, fieldDef.isCompFieldType(), defValue);
+ procStringDefValue(fieldDef.isCompFieldType(), defValue, result);
}
- return procResult;
+ return result.success;
}
// check if value is norm;
if (fieldDef.isCompFieldType()) {
@@ -440,41 +443,41 @@ public class WebParameterUtils {
if (TStringUtils.isBlank(strParamValueItem)) {
continue;
}
- if (!checkStrValueNorms(procResult, fieldDef, strParamValueItem)) {
- return procResult;
+ if (!checkStrValueNorms(fieldDef, strParamValueItem, result)) {
+ return result.success;
}
- valItemSet.add((String) procResult.retData1);
+ valItemSet.add((String) result.retData1);
}
// check if is empty result
if (valItemSet.isEmpty()) {
if (required) {
- procResult.setFailResult(fieldDef.id,
+ result.setFailResult(fieldDef.id,
new StringBuilder(512).append("Parameter ")
.append(fieldDef.name)
.append(" is missing or value is null or blank!").toString());
} else {
- procStringDefValue(procResult, fieldDef.isCompFieldType(), defValue);
+ procStringDefValue(fieldDef.isCompFieldType(), defValue, result);
}
- return procResult;
+ return result.success;
}
// check max item count
if (fieldDef.itemMaxCnt != TBaseConstants.META_VALUE_UNDEFINED) {
if (valItemSet.size() > fieldDef.itemMaxCnt) {
- procResult.setFailResult(fieldDef.id,
+ result.setFailResult(fieldDef.id,
new StringBuilder(512).append("Parameter ")
.append(fieldDef.name)
.append("'s item count over max allowed count (")
.append(fieldDef.itemMaxCnt).append(")!").toString());
}
}
- procResult.setSuccResult(valItemSet);
+ result.setSuccResult(valItemSet);
} else {
- if (!checkStrValueNorms(procResult, fieldDef, paramValue)) {
- return procResult;
+ if (!checkStrValueNorms(fieldDef, paramValue, result)) {
+ return result.success;
}
- procResult.setSuccResult(paramValue);
+ result.setSuccResult(paramValue);
}
- return procResult;
+ return result.success;
}
/**
@@ -484,13 +487,14 @@ public class WebParameterUtils {
* @param fieldDef the parameter field definition
* @param required a boolean value represent whether the parameter is must required
* @param defValue a default value returned if failed to parse value from the given object
+ * @param result process result
* @return valid result for the parameter value
*/
- public static ProcessResult getJsonDictParamValue(HttpServletRequest req,
- WebFieldDef fieldDef,
- boolean required,
- Map<String, Long> defValue) {
- ProcessResult procResult = new ProcessResult();
+ public static boolean getJsonDictParamValue(HttpServletRequest req,
+ WebFieldDef fieldDef,
+ boolean required,
+ Map<String, Long> defValue,
+ ProcessResult result) {
// get parameter value
String paramValue = req.getParameter(fieldDef.name);
if (paramValue == null) {
@@ -503,20 +507,20 @@ public class WebParameterUtils {
// Check if the parameter exists
if (TStringUtils.isBlank(paramValue)) {
if (required) {
- procResult.setFailResult(fieldDef.id,
+ result.setFailResult(fieldDef.id,
new StringBuilder(512).append("Parameter ")
.append(fieldDef.name)
.append(" is missing or value is null or blank!").toString());
} else {
- procResult.setSuccResult(defValue);
+ result.setSuccResult(defValue);
}
- return procResult;
+ return result.success;
}
try {
paramValue = URLDecoder.decode(paramValue,
TBaseConstants.META_DEFAULT_CHARSET_NAME);
} catch (UnsupportedEncodingException e) {
- procResult.setFailResult(fieldDef.id,
+ result.setFailResult(fieldDef.id,
new StringBuilder(512).append("Parameter ")
.append(fieldDef.name)
.append(" decode error, exception is ")
@@ -524,73 +528,83 @@ public class WebParameterUtils {
}
if (TStringUtils.isBlank(paramValue)) {
if (required) {
- procResult.setFailResult(fieldDef.id,
+ result.setFailResult(fieldDef.id,
new StringBuilder(512).append("Parameter ")
.append(fieldDef.name)
.append("'s value is blank!").toString());
} else {
- procResult.setSuccResult(defValue);
+ result.setSuccResult(defValue);
}
- return procResult;
+ return result.success;
}
if (fieldDef.valMaxLen != TBaseConstants.META_VALUE_UNDEFINED) {
if (paramValue.length() > fieldDef.valMaxLen) {
- procResult.setFailResult(fieldDef.id,
+ result.setFailResult(fieldDef.id,
new StringBuilder(512).append("Parameter ")
.append(fieldDef.name)
.append("'s length over max allowed length (")
.append(fieldDef.valMaxLen).append(")!").toString());
- return procResult;
+ return result.success;
}
}
- procResult.setSuccResult(new Gson().fromJson(paramValue,
- new TypeToken<Map<String, Long>>(){}.getType()));
- return procResult;
+ // parse data
+ try {
+ Map<String, Long> manOffsets = new Gson().fromJson(paramValue,
+ new TypeToken<Map<String, Long>>(){}.getType());
+ result.setSuccResult(manOffsets);
+ } catch (Throwable e) {
+ result.setFailResult(fieldDef.id,
+ new StringBuilder(512).append("Parameter ")
+ .append(fieldDef.name)
+ .append(" value parse failure, error is ")
+ .append(e.getMessage()).append("!").toString());
+ }
+ return result.success;
}
/**
* process string default value
*
- * @param procResult process result
* @param isCompFieldType the parameter if compound field type
* @param defValue the parameter default value
+ * @param result process result
* @return process result for default value of parameter
*/
- private static ProcessResult procStringDefValue(ProcessResult procResult,
- boolean isCompFieldType,
- String defValue) {
+ private static boolean procStringDefValue(boolean isCompFieldType,
+ String defValue,
+ ProcessResult result) {
if (isCompFieldType) {
Set<String> valItemSet = new HashSet<>();
if (TStringUtils.isNotBlank(defValue)) {
valItemSet.add(defValue);
}
- procResult.setSuccResult(valItemSet);
+ result.setSuccResult(valItemSet);
} else {
- procResult.setSuccResult(defValue);
+ result.setSuccResult(defValue);
}
- return procResult;
+ return result.success;
}
/**
* Parse the parameter string value by regex define
*
- * @param procResult process result
* @param fieldDef the parameter field definition
* @param paramVal the parameter value
+ * @param result process result
* @return check result for string value of parameter
*/
- private static boolean checkStrValueNorms(ProcessResult procResult,
- WebFieldDef fieldDef,
- String paramVal) {
+ private static boolean checkStrValueNorms(WebFieldDef fieldDef,
+ String paramVal,
+ ProcessResult result) {
paramVal = paramVal.trim();
if (TStringUtils.isBlank(paramVal)) {
- procResult.setSuccResult(null);
+ result.setSuccResult(null);
return true;
}
// check value's max length
if (fieldDef.valMaxLen != TBaseConstants.META_VALUE_UNDEFINED) {
if (paramVal.length() > fieldDef.valMaxLen) {
- procResult.setFailResult(fieldDef.id,
+ result.setFailResult(fieldDef.id,
new StringBuilder(512).append("over max length for ")
.append(fieldDef.name).append(", only allow ")
.append(fieldDef.valMaxLen).append(" length").toString());
@@ -600,44 +614,44 @@ public class WebParameterUtils {
// check value's pattern
if (fieldDef.regexCheck) {
if (!paramVal.matches(fieldDef.regexDef.getPattern())) {
- procResult.setFailResult(fieldDef.id,
+ result.setFailResult(fieldDef.id,
new StringBuilder(512).append("illegal value for ")
.append(fieldDef.name).append(", value ")
.append(fieldDef.regexDef.getErrMsgTemp()).toString());
return false;
}
}
- procResult.setSuccResult(paramVal);
+ result.setSuccResult(paramVal);
return true;
}
/**
* Parse the parameter string value by regex define
*
- * @param procResult process result
* @param fieldDef the parameter field definition
* @param paramValue the parameter value
* @param hasMinVal whether there is a minimum
* param minValue the parameter min value
+ * @param result process result
* @return check result for string value of parameter
*/
- private static boolean checkIntValueNorms(ProcessResult procResult,
- WebFieldDef fieldDef,
+ private static boolean checkIntValueNorms(WebFieldDef fieldDef,
String paramValue,
boolean hasMinVal,
- int minValue) {
+ int minValue,
+ ProcessResult result) {
try {
int paramIntVal = Integer.parseInt(paramValue);
if (hasMinVal && paramIntVal < minValue) {
- procResult.setFailResult(400,
+ result.setFailResult(400,
new StringBuilder(512).append("Parameter ")
.append(fieldDef.name).append(" value must >= ")
.append(minValue).toString());
return false;
}
- procResult.setSuccResult(paramIntVal);
+ result.setSuccResult(paramIntVal);
} catch (Throwable e) {
- procResult.setFailResult(400,
+ result.setFailResult(400,
new StringBuilder(512).append("Parameter ")
.append(fieldDef.name).append(" parse error: ")
.append(e.getMessage()).toString());
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
index 3bdfe16..8bef5ab 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
@@ -207,7 +207,7 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB",
req.getParameter("memCacheMsgSizeInMB"),
false, defmemCacheMsgSizeInMB, 2);
- memCacheMsgSizeInMB = memCacheMsgSizeInMB >= 2048 ? 2048 : memCacheMsgSizeInMB;
+ memCacheMsgSizeInMB = Math.min(memCacheMsgSizeInMB, 2048);
int memCacheFlushIntvl =
WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
req.getParameter("memCacheFlushIntvl"),
@@ -354,7 +354,7 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB",
jsonObject.get("memCacheMsgSizeInMB"),
false, brokerConfEntity.getDftMemCacheMsgSizeInMB(), 2);
- memCacheMsgSizeInMB = memCacheMsgSizeInMB >= 2048 ? 2048 : memCacheMsgSizeInMB;
+ memCacheMsgSizeInMB = Math.min(memCacheMsgSizeInMB, 2048);
int memCacheFlushIntvl =
WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
jsonObject.get("memCacheFlushIntvl"),
@@ -714,10 +714,10 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
* @return
*/
public StringBuilder adminQuerySimpleTopicName(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
StringBuilder strBuffer = new StringBuilder(512);
- ProcessResult result = WebParameterUtils.getIntParamValue(req,
- WebFieldDef.COMPSBROKERID, false);
- if (!result.success) {
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, false, result)) {
WebParameterUtils.buildFailResult(strBuffer, result.errInfo);
return strBuffer;
}
@@ -752,17 +752,16 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
* @return
*/
public StringBuilder adminQuerySimpleBrokerId(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
StringBuilder strBuffer = new StringBuilder(512);
- ProcessResult result = WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, false, null);
- if (!result.success) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, result)) {
WebParameterUtils.buildFailResult(strBuffer, result.errInfo);
return strBuffer;
}
Set<String> topicNameSet = (Set<String>) result.retData1;
- result = WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.WITHIP, false, false);
- if (!result.success) {
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.WITHIP, false, false, result)) {
WebParameterUtils.buildFailResult(strBuffer, result.errInfo);
return strBuffer;
}