You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/20 08:46:13 UTC

[incubator-tubemq] branch TUBEMQ-452 updated: [TUBEMQ-463]Adjust Master rebalance process implementation (#355)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-452
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-452 by this push:
     new e307364  [TUBEMQ-463]Adjust Master rebalance process implementation (#355)
e307364 is described below

commit e3073641827577f4a8f4d9fb53a7023b3532d559
Author: gosonzhang <46...@qq.com>
AuthorDate: Sun Dec 20 16:46:04 2020 +0800

    [TUBEMQ-463]Adjust Master rebalance process implementation (#355)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../apache/tubemq/server/master/MasterConfig.java  |  10 ++
 .../org/apache/tubemq/server/master/TMaster.java   | 180 ++++++++++++---------
 2 files changed, 115 insertions(+), 75 deletions(-)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java
index 84ff2ed..b112d66 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java
@@ -79,6 +79,7 @@ public class MasterConfig extends AbstractFileConfig {
     private String visitName = "";
     private String visitPassword = "";
     private long authValidTimeStampPeriodMs = TBaseConstants.CFG_DEFAULT_AUTH_TIMESTAMP_VALID_INTERVAL;
+    private int rebalanceParallel = 4;
 
     /**
      * getters
@@ -253,6 +254,10 @@ public class MasterConfig extends AbstractFileConfig {
         return authValidTimeStampPeriodMs;
     }
 
+    public int getRebalanceParallel() {
+        return rebalanceParallel;
+    }
+
     /**
      * Load file section attributes
      *
@@ -460,6 +465,10 @@ public class MasterConfig extends AbstractFileConfig {
             this.visitName = masterConf.get("visitName").trim();
             this.visitPassword = masterConf.get("visitPassword").trim();
         }
+        if (TStringUtils.isNotBlank(masterConf.get("rebalanceParallel"))) {
+            int tmpParallel = this.getInt(masterConf, "rebalanceParallel");
+            this.rebalanceParallel = (tmpParallel <= 0) ? 1 : (Math.min(tmpParallel, 20));
+        }
     }
 
     /**
@@ -606,6 +615,7 @@ public class MasterConfig extends AbstractFileConfig {
                 .append("useWebProxy", useWebProxy)
                 .append("visitName", visitName)
                 .append("visitPassword", visitPassword)
+                .append("rebalanceParallel", rebalanceParallel)
                 .append(",").append(replicationConfig.toString())
                 .append(",").append(tlsConfig.toString())
                 .append(",").append(zkConfig.toString())
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index 8811cb0..34570de 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -27,7 +27,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.codec.binary.StringUtils;
 import org.apache.commons.collections.CollectionUtils;
@@ -126,6 +129,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             new RpcServiceFactory();
     private final ConsumerEventManager consumerEventManager;    //consumer event manager
     private final TopicPSInfoManager topicPSInfoManager;        //topic publish/subscribe info manager
+    private final ExecutorService executor;
     private final BrokerInfoHolder brokerHolder;                //broker holder
     private final ProducerInfoHolder producerHolder;            //producer holder
     private final ConsumerInfoHolder consumerHolder;            //consumer holder
@@ -144,11 +148,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
     private AtomicLong idGenerator = new AtomicLong(0);     //id generator
     private volatile boolean stopped = false;                   //stop flag
     private Thread balancerChore;                               //balance chore
-    private Thread resetBalancerChore;                          //reset balance chore
     private boolean initialized = false;
     private boolean startupBalance = true;
-    private boolean startupResetBalance = true;
     private int balanceDelayTimes = 0;
+    private AtomicInteger curBalanceParal = new AtomicInteger(0);
     private Sleeper stopSleeper = new Sleeper(1000, this);
     private SimpleVisitTokenManager visitTokenManager;
 
@@ -165,6 +168,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         this.checkAndCreateBdbDataPath();
         this.masterAddInfo =
                 new NodeAddrInfo(masterConfig.getHostName(), masterConfig.getPort());
+        this.executor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
         this.visitTokenManager = new SimpleVisitTokenManager(this.masterConfig);
         this.serverAuthHandler = new SimpleCertificateMasterHandler(this.masterConfig);
         this.producerHolder = new ProducerInfoHolder();
@@ -1832,7 +1836,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             if (!this.stopped) {
                 Thread.sleep(masterConfig.getFirstBalanceDelayAfterStartMs());
                 this.balancerChore = startBalancerChore(this);
-                this.resetBalancerChore = startResetBalancerChore(this);
                 initialized = true;
                 while (!this.stopped) {
                     stopSleeper.sleep();
@@ -1851,10 +1854,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
     /**
      * Load balance
      */
-    private void balance() {
-        // #lizard forgives
+    private void balance(final TMaster tMaster) {
         final StringBuilder strBuffer = new StringBuilder(512);
-        long rebalanceId = idGenerator.incrementAndGet();
+        final long rebalanceId = idGenerator.incrementAndGet();
         if (defaultBdbStoreService != null) {
             logger.info(strBuffer.append("[Rebalance Start] ").append(rebalanceId)
                     .append(", isMaster=").append(defaultBdbStoreService.isMaster())
@@ -1865,20 +1867,86 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                     .append(", BDB service is null isMaster= false, isPrimaryNodeActive=false").toString());
         }
         strBuffer.delete(0, strBuffer.length());
+        int curDoingTasks = this.curBalanceParal.get();
+        if (curDoingTasks > 0) {
+            logger.info(strBuffer.append("[Rebalance End] ").append(rebalanceId)
+                    .append(", the previous rebalance has ")
+                    .append(curDoingTasks).append(" task(s) in progress!").toString());
+            return;
+        }
+        final boolean isStartBalance = startupBalance;
+        List<String> groupsNeedToBalance = isStartBalance ?
+                consumerHolder.getAllGroup() : getNeedToBalanceGroupList(strBuffer);
+        strBuffer.delete(0, strBuffer.length());
+        if (!groupsNeedToBalance.isEmpty()) {
+            // set parallel rebalance signal
+            curBalanceParal.set(masterConfig.getRebalanceParallel());
+            // calculate process count
+            int unitNum = (groupsNeedToBalance.size() + masterConfig.getRebalanceParallel() - 1)
+                    / masterConfig.getRebalanceParallel();
+            // start processer to do reblance;
+            int startIndex = 0;
+            int endIndex = 0;
+            for (int i = 0; i < masterConfig.getRebalanceParallel(); i++) {
+                // get groups need to rebalance
+                startIndex = Math.min((i) * unitNum, groupsNeedToBalance.size());
+                endIndex = Math.min((i + 1) * unitNum, groupsNeedToBalance.size());
+                final List<String> subGroups = groupsNeedToBalance.subList(startIndex, endIndex);
+                // execute rebalance
+                this.executor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            if (subGroups.isEmpty()) {
+                                return;
+                            }
+                            // first process reset rebalance task;
+                            try {
+                                tMaster.processResetbalance(rebalanceId,
+                                        isStartBalance, subGroups);
+                            } catch (Throwable e) {
+                                logger.warn("[Rebalance processor] Error during reset-reb", e);
+                            }
+                            if (tMaster.isStopped()) {
+                                return;
+                            }
+                            // second process normal rebalance task;
+                            try {
+                                tMaster.processRebalance(rebalanceId,
+                                        isStartBalance, subGroups);
+                            } catch (Throwable e) {
+                                logger.warn("[Rebalance processor] Error during normal-reb", e);
+                            }
+                        } catch (Throwable e) {
+                            logger.warn("[Rebalance processor] Error during process", e);
+                        } finally {
+                            curBalanceParal.decrementAndGet();
+                        }
+                    }
+                });
+            }
+        }
+        startupBalance = false;
+        logger.info(strBuffer.append("[Rebalance End] ").append(rebalanceId).toString());
+    }
+
+    // process unReset group rebalance
+    public void processRebalance(long rebalanceId, boolean isFirstReb, List<String> groups) {
+        // #lizard forgives
         Map<String, Map<String, List<Partition>>> finalSubInfoMap = null;
-        if (startupBalance) {
-            finalSubInfoMap =
-                    this.loadBalancer.bukAssign(consumerHolder, topicPSInfoManager,
-                            consumerHolder.getAllGroup(), defaultBrokerConfManager,
-                            masterConfig.getMaxGroupBrokerConsumeRate(), strBuffer);
-            startupBalance = false;
+        final StringBuilder strBuffer = new StringBuilder(512);
+        // choose different load balance strategy
+        if (isFirstReb) {
+            finalSubInfoMap = this.loadBalancer.bukAssign(consumerHolder,
+                    topicPSInfoManager, groups, defaultBrokerConfManager,
+                    masterConfig.getMaxGroupBrokerConsumeRate(), strBuffer);
         } else {
-            List<String> groupsNeedToBalance = getNeedToBalanceGroupList(strBuffer);
-            finalSubInfoMap =
-                    this.loadBalancer.balanceCluster(currentSubInfo, consumerHolder, brokerHolder,
-                            topicPSInfoManager, groupsNeedToBalance, defaultBrokerConfManager,
-                            masterConfig.getMaxGroupBrokerConsumeRate(), strBuffer);
+            finalSubInfoMap = this.loadBalancer.balanceCluster(currentSubInfo,
+                    consumerHolder, brokerHolder, topicPSInfoManager, groups,
+                    defaultBrokerConfManager, masterConfig.getMaxGroupBrokerConsumeRate(),
+                    strBuffer);
         }
+        // allocate partitions to consumers
         for (Map.Entry<String, Map<String, List<Partition>>> entry : finalSubInfoMap.entrySet()) {
             String consumerId = entry.getKey();
             if (consumerId == null) {
@@ -1891,7 +1959,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                     || tupleInfo.f1 == null) {
                 continue;
             }
-            List<String> blackTopicList = this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0);
+            List<String> blackTopicList =
+                    this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0);
             Map<String, List<Partition>> topicSubPartMap = entry.getValue();
             List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
             List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
@@ -1899,7 +1968,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 String topic = topicEntry.getKey();
                 List<Partition> finalPartList = topicEntry.getValue();
                 Map<String, Partition> currentPartMap = null;
-                Map<String, Map<String, Partition>> curTopicSubInfoMap = currentSubInfo.get(consumerId);
+                Map<String, Map<String, Partition>> curTopicSubInfoMap =
+                        currentSubInfo.get(consumerId);
                 if (curTopicSubInfoMap == null || curTopicSubInfoMap.get(topic) == null) {
                     currentPartMap = new HashMap<>();
                 } else {
@@ -1948,8 +2018,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                     for (Partition currentPart : currentPartMap.values()) {
                         if ((blackTopicList.contains(currentPart.getTopic()))
                                 || (!finalPartList.contains(currentPart))) {
-                            deletedSubInfoList
-                                    .add(new SubscribeInfo(consumerId, tupleInfo.f0, false, currentPart));
+                            deletedSubInfoList.add(new SubscribeInfo(consumerId,
+                                    tupleInfo.f0, false, currentPart));
                         }
                     }
                     for (Partition finalPart : finalPartList) {
@@ -1990,42 +2060,24 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 }
             }
         }
-        logger.info(strBuffer.append("[Rebalance End] ")
-                .append(rebalanceId).toString());
     }
 
     /**
-     * Reset balance
+     * process Reset balance
      */
-    private void resetBalance() {
+    public void processResetbalance(long rebalanceId, boolean isFirstReb, List<String> groups) {
         // #lizard forgives
-        //consumer need reset offset
         final StringBuilder strBuffer = new StringBuilder(512);
-        long rebalanceId = idGenerator.incrementAndGet();
-        if (defaultBdbStoreService != null) {
-            logger.info(strBuffer.append("[ResetRebalance Start] ").append(rebalanceId)
-                    .append(", isMaster=").append(defaultBdbStoreService.isMaster())
-                    .append(", isPrimaryNodeActive=")
-                    .append(defaultBdbStoreService.isPrimaryNodeActive()).toString());
-        } else {
-            logger.info(strBuffer.append("[ResetRebalance Start] ").append(rebalanceId)
-                    .append(", BDB service is null isMaster= false, isPrimaryNodeActive=false").toString());
-        }
-        strBuffer.delete(0, strBuffer.length());
         Map<String, Map<String, Map<String, Partition>>> finalSubInfoMap = null;
         // choose different load balance strategy
-        if (startupResetBalance) {
-            finalSubInfoMap =
-                    this.loadBalancer.resetBukAssign(consumerHolder, topicPSInfoManager,
-                            consumerHolder.getAllGroup(), this.zkOffsetStorage,
-                            this.defaultBrokerConfManager, strBuffer);
-            startupResetBalance = false;
+        if (isFirstReb) {
+            finalSubInfoMap =  this.loadBalancer.resetBukAssign(consumerHolder,
+                    topicPSInfoManager, groups, this.zkOffsetStorage,
+                    this.defaultBrokerConfManager, strBuffer);
         } else {
-            List<String> groupsNeedToBalance = getNeedToBalanceGroupList(strBuffer);
-            finalSubInfoMap =
-                    this.loadBalancer.resetBalanceCluster(currentSubInfo, consumerHolder,
-                            topicPSInfoManager, groupsNeedToBalance, this.zkOffsetStorage,
-                            this.defaultBrokerConfManager, strBuffer);
+            finalSubInfoMap = this.loadBalancer.resetBalanceCluster(currentSubInfo,
+                    consumerHolder, topicPSInfoManager, groups, this.zkOffsetStorage,
+                    this.defaultBrokerConfManager, strBuffer);
         }
         // filter
         for (Map.Entry<String, Map<String, Map<String, Partition>>> entry
@@ -2041,7 +2093,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                     || tupleInfo.f1 == null) {
                 continue;
             }
-
+            // allocate partitions to consumers
             List<String> blackTopicList =
                     this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0);
             Map<String, Map<String, Partition>> topicSubPartMap = entry.getValue();
@@ -2107,7 +2159,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 }
             }
         }
-        logger.info(strBuffer.append("[ResetRebalance End] ").append(rebalanceId).toString());
     }
 
     /**
@@ -2240,9 +2291,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         if (this.balancerChore != null) {
             this.balancerChore.interrupt();
         }
-        if (this.resetBalancerChore != null) {
-            this.resetBalancerChore.interrupt();
-        }
     }
 
     /**
@@ -2252,11 +2300,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
      * @return
      */
     private Thread startBalancerChore(final TMaster master) {
-        Chore chore = new Chore("BalancerChore", masterConfig.getConsumerBalancePeriodMs(), master) {
+        Chore chore = new Chore("BalancerChore",
+                masterConfig.getConsumerBalancePeriodMs(), master) {
             @Override
             protected void chore() {
                 try {
-                    master.balance();
+                    master.balance(master);
                 } catch (Throwable e) {
                     logger.warn("Rebalance throwable error: ", e);
                 }
@@ -2265,26 +2314,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         return ThreadUtils.setDaemonThreadRunning(chore.getThread());
     }
 
-    /**
-     * Start reset balance chore
-     *
-     * @param master
-     * @return
-     */
-    private Thread startResetBalancerChore(final TMaster master) {
-        Chore chore = new Chore("ResetBalancerChore", masterConfig.getConsumerBalancePeriodMs(), master) {
-            @Override
-            protected void chore() {
-                try {
-                    master.resetBalance();
-                } catch (Throwable e) {
-                    logger.warn("Reset Rebalance throwable error: ", e);
-                }
-            }
-        };
-        return ThreadUtils.setDaemonThreadRunning(chore.getThread());
-    }
-
     public void stop() {
         stop("");
     }
@@ -2301,6 +2330,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         try {
             webServer.stop();
             rpcServiceFactory.destroy();
+            executor.shutdown();
             stopChores();
             heartbeatManager.stop();
             zkOffsetStorage.close();