You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/11 10:41:44 UTC
[inlong] branch master updated: [INLONG-6142][TubeMQ] Added client-balanced consumer group control API (#6143)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ebe724f84 [INLONG-6142][TubeMQ] Added client-balanced consumer group control API (#6143)
ebe724f84 is described below
commit ebe724f845517279ce3a20ee8a1257aed57620fe
Author: Goson Zhang <46...@qq.com>
AuthorDate: Tue Oct 11 18:41:38 2022 +0800
[INLONG-6142][TubeMQ] Added client-balanced consumer group control API (#6143)
---
.../web/handler/WebAdminGroupCtrlHandler.java | 94 ++++++++++++++++++++++
1 file changed, 94 insertions(+)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
index 5b614f79c..5d77c672b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
@@ -22,9 +22,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
+
import javax.servlet.http.HttpServletRequest;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.server.common.TServerConstants;
import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.inlong.tubemq.server.common.statusdef.EnableStatus;
@@ -98,6 +101,10 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
"adminDeleteConsumeGroupSetting");
registerModifyWebMethod("admin_rebalance_group_allocate",
"adminRebalanceGroupAllocateInfo");
+ registerModifyWebMethod("admin_set_client_balance_group_consume_from_max",
+ "adminSetBalanceGroupConsumeFromMax");
+ registerQueryWebMethod("admin_query_client_balance_group_set",
+ "adminQueryClientBalanceGroupSet");
}
/**
@@ -1495,4 +1502,91 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
return result.isSuccess();
}
+ /**
+ * Query client balance group set
+ *
+ * @param req request
+ * @param sBuffer string buffer
+ * @param result process result
+ *
+ * @return process result
+ */
+ public StringBuilder adminQueryClientBalanceGroupSet(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ try {
+ ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
+ List<String> clientGroups = consumerHolder.getAllClientBalanceGroups();
+ int j = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ for (String groupName : clientGroups) {
+ if (TStringUtils.isEmpty(groupName)) {
+ continue;
+ }
+ if (j++ > 0) {
+ sBuffer.append(",");
+ }
+ sBuffer.append("\"").append(groupName).append("\"");
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, j);
+ } catch (Exception e) {
+ sBuffer.delete(0, sBuffer.length());
+ WebParameterUtils.buildFailResult(sBuffer, e.getMessage());
+ }
+ return sBuffer;
+ }
+
+ /**
+ * Set online client-balance group consume from max offset
+ *
+ * @param req the request object
+ * @param sBuffer string buffer
+ * @param result the result object
+ * @return the return result
+ */
+ public StringBuilder adminSetBalanceGroupConsumeFromMax(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // get group list
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return sBuffer;
+ }
+ final Set<String> groupNameSet = (Set<String>) result.getRetData();
+ // process offset setting
+ ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
+ List<String> clientGroups = consumerHolder.getAllClientBalanceGroups();
+ Set<String> filtedGroupSet = new TreeSet<>();
+ for (String groupName : groupNameSet) {
+ if (clientGroups.contains(groupName)) {
+ filtedGroupSet.add(groupName);
+ }
+ }
+ if (filtedGroupSet.isEmpty()) {
+ WebParameterUtils.buildFailResult(sBuffer,
+ "all consumer groups are not client balance groups");
+ } else {
+ ConsumeGroupInfo groupInfo;
+ for (String groupName : filtedGroupSet) {
+ groupInfo = consumerHolder.getConsumeGroupInfo(groupName);
+ if (groupInfo == null) {
+ continue;
+ }
+ groupInfo.updCsmFromMaxCtrlId();
+ }
+ logger.info(sBuffer.append("[Admin reset] ").append(opEntity.getModifyUser())
+ .append(" set client-balance group consume from max offset, group set = ")
+ .append(filtedGroupSet.toString()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ WebParameterUtils.buildSuccessResult(sBuffer);
+ }
+ return sBuffer;
+ }
}