You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/11 10:41:44 UTC

[inlong] branch master updated: [INLONG-6142][TubeMQ] Added client-balanced consumer group control API (#6143)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ebe724f84 [INLONG-6142][TubeMQ] Added client-balanced consumer group control API (#6143)
ebe724f84 is described below

commit ebe724f845517279ce3a20ee8a1257aed57620fe
Author: Goson Zhang <46...@qq.com>
AuthorDate: Tue Oct 11 18:41:38 2022 +0800

    [INLONG-6142][TubeMQ] Added client-balanced consumer group control API (#6143)
---
 .../web/handler/WebAdminGroupCtrlHandler.java      | 94 ++++++++++++++++++++++
 1 file changed, 94 insertions(+)

diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
index 5b614f79c..5d77c672b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
@@ -22,9 +22,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
+
 import javax.servlet.http.HttpServletRequest;
 import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
 import org.apache.inlong.tubemq.server.common.TServerConstants;
 import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
 import org.apache.inlong.tubemq.server.common.statusdef.EnableStatus;
@@ -98,6 +101,10 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
                 "adminDeleteConsumeGroupSetting");
         registerModifyWebMethod("admin_rebalance_group_allocate",
                 "adminRebalanceGroupAllocateInfo");
+        registerModifyWebMethod("admin_set_client_balance_group_consume_from_max",
+                "adminSetBalanceGroupConsumeFromMax");
+        registerQueryWebMethod("admin_query_client_balance_group_set",
+                "adminQueryClientBalanceGroupSet");
     }
 
     /**
@@ -1495,4 +1502,91 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
         return result.isSuccess();
     }
 
+    /**
+     * Query client balance group set
+     *
+     * @param req      request
+     * @param sBuffer  string buffer
+     * @param result   process result
+     *
+     * @return   process result
+     */
+    public StringBuilder adminQueryClientBalanceGroupSet(HttpServletRequest req,
+                                                         StringBuilder sBuffer,
+                                                         ProcessResult result) {
+        try {
+            ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
+            List<String> clientGroups = consumerHolder.getAllClientBalanceGroups();
+            int j = 0;
+            WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+            for (String groupName : clientGroups) {
+                if (TStringUtils.isEmpty(groupName)) {
+                    continue;
+                }
+                if (j++ > 0) {
+                    sBuffer.append(",");
+                }
+                sBuffer.append("\"").append(groupName).append("\"");
+            }
+            WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, j);
+        } catch (Exception e) {
+            sBuffer.delete(0, sBuffer.length());
+            WebParameterUtils.buildFailResult(sBuffer, e.getMessage());
+        }
+        return sBuffer;
+    }
+
+    /**
+     * Set online client-balance group consume from max offset
+     *
+     * @param req     the request object
+     * @param sBuffer  string buffer
+     * @param result   the result object
+     * @return  the return result
+     */
+    public StringBuilder adminSetBalanceGroupConsumeFromMax(HttpServletRequest req,
+                                                            StringBuilder sBuffer,
+                                                            ProcessResult result) {
+        // check and get operation info
+        if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+            return sBuffer;
+        }
+        BaseEntity opEntity = (BaseEntity) result.getRetData();
+        // get group list
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+            return sBuffer;
+        }
+        final Set<String> groupNameSet = (Set<String>) result.getRetData();
+        // process offset setting
+        ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
+        List<String> clientGroups = consumerHolder.getAllClientBalanceGroups();
+        Set<String> filtedGroupSet = new TreeSet<>();
+        for (String groupName : groupNameSet) {
+            if (clientGroups.contains(groupName)) {
+                filtedGroupSet.add(groupName);
+            }
+        }
+        if (filtedGroupSet.isEmpty()) {
+            WebParameterUtils.buildFailResult(sBuffer,
+                    "all consumer groups are not client balance groups");
+        } else {
+            ConsumeGroupInfo groupInfo;
+            for (String groupName : filtedGroupSet) {
+                groupInfo = consumerHolder.getConsumeGroupInfo(groupName);
+                if (groupInfo == null) {
+                    continue;
+                }
+                groupInfo.updCsmFromMaxCtrlId();
+            }
+            logger.info(sBuffer.append("[Admin reset] ").append(opEntity.getModifyUser())
+                    .append(" set client-balance group consume from max offset, group set = ")
+                    .append(filtedGroupSet.toString()).toString());
+            sBuffer.delete(0, sBuffer.length());
+            WebParameterUtils.buildSuccessResult(sBuffer);
+        }
+        return sBuffer;
+    }
 }