You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/20 08:46:13 UTC
[incubator-tubemq] branch TUBEMQ-452 updated: [TUBEMQ-463]Adjust
Master rebalance process implementation (#355)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-452
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-452 by this push:
new e307364 [TUBEMQ-463]Adjust Master rebalance process implementation (#355)
e307364 is described below
commit e3073641827577f4a8f4d9fb53a7023b3532d559
Author: gosonzhang <46...@qq.com>
AuthorDate: Sun Dec 20 16:46:04 2020 +0800
[TUBEMQ-463]Adjust Master rebalance process implementation (#355)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../apache/tubemq/server/master/MasterConfig.java | 10 ++
.../org/apache/tubemq/server/master/TMaster.java | 180 ++++++++++++---------
2 files changed, 115 insertions(+), 75 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java
index 84ff2ed..b112d66 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java
@@ -79,6 +79,7 @@ public class MasterConfig extends AbstractFileConfig {
private String visitName = "";
private String visitPassword = "";
private long authValidTimeStampPeriodMs = TBaseConstants.CFG_DEFAULT_AUTH_TIMESTAMP_VALID_INTERVAL;
+ private int rebalanceParallel = 4;
/**
* getters
@@ -253,6 +254,10 @@ public class MasterConfig extends AbstractFileConfig {
return authValidTimeStampPeriodMs;
}
+ public int getRebalanceParallel() {
+ return rebalanceParallel;
+ }
+
/**
* Load file section attributes
*
@@ -460,6 +465,10 @@ public class MasterConfig extends AbstractFileConfig {
this.visitName = masterConf.get("visitName").trim();
this.visitPassword = masterConf.get("visitPassword").trim();
}
+ if (TStringUtils.isNotBlank(masterConf.get("rebalanceParallel"))) {
+ int tmpParallel = this.getInt(masterConf, "rebalanceParallel");
+ this.rebalanceParallel = (tmpParallel <= 0) ? 1 : (Math.min(tmpParallel, 20));
+ }
}
/**
@@ -606,6 +615,7 @@ public class MasterConfig extends AbstractFileConfig {
.append("useWebProxy", useWebProxy)
.append("visitName", visitName)
.append("visitPassword", visitPassword)
+ .append("rebalanceParallel", rebalanceParallel)
.append(",").append(replicationConfig.toString())
.append(",").append(tlsConfig.toString())
.append(",").append(zkConfig.toString())
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index 8811cb0..34570de 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -27,7 +27,10 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.commons.collections.CollectionUtils;
@@ -126,6 +129,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
new RpcServiceFactory();
private final ConsumerEventManager consumerEventManager; //consumer event manager
private final TopicPSInfoManager topicPSInfoManager; //topic publish/subscribe info manager
+ private final ExecutorService executor;
private final BrokerInfoHolder brokerHolder; //broker holder
private final ProducerInfoHolder producerHolder; //producer holder
private final ConsumerInfoHolder consumerHolder; //consumer holder
@@ -144,11 +148,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
private AtomicLong idGenerator = new AtomicLong(0); //id generator
private volatile boolean stopped = false; //stop flag
private Thread balancerChore; //balance chore
- private Thread resetBalancerChore; //reset balance chore
private boolean initialized = false;
private boolean startupBalance = true;
- private boolean startupResetBalance = true;
private int balanceDelayTimes = 0;
+ private AtomicInteger curBalanceParal = new AtomicInteger(0);
private Sleeper stopSleeper = new Sleeper(1000, this);
private SimpleVisitTokenManager visitTokenManager;
@@ -165,6 +168,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
this.checkAndCreateBdbDataPath();
this.masterAddInfo =
new NodeAddrInfo(masterConfig.getHostName(), masterConfig.getPort());
+ this.executor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
this.visitTokenManager = new SimpleVisitTokenManager(this.masterConfig);
this.serverAuthHandler = new SimpleCertificateMasterHandler(this.masterConfig);
this.producerHolder = new ProducerInfoHolder();
@@ -1832,7 +1836,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if (!this.stopped) {
Thread.sleep(masterConfig.getFirstBalanceDelayAfterStartMs());
this.balancerChore = startBalancerChore(this);
- this.resetBalancerChore = startResetBalancerChore(this);
initialized = true;
while (!this.stopped) {
stopSleeper.sleep();
@@ -1851,10 +1854,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
/**
* Load balance
*/
- private void balance() {
- // #lizard forgives
+ private void balance(final TMaster tMaster) {
final StringBuilder strBuffer = new StringBuilder(512);
- long rebalanceId = idGenerator.incrementAndGet();
+ final long rebalanceId = idGenerator.incrementAndGet();
if (defaultBdbStoreService != null) {
logger.info(strBuffer.append("[Rebalance Start] ").append(rebalanceId)
.append(", isMaster=").append(defaultBdbStoreService.isMaster())
@@ -1865,20 +1867,86 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
.append(", BDB service is null isMaster= false, isPrimaryNodeActive=false").toString());
}
strBuffer.delete(0, strBuffer.length());
+ int curDoingTasks = this.curBalanceParal.get();
+ if (curDoingTasks > 0) {
+ logger.info(strBuffer.append("[Rebalance End] ").append(rebalanceId)
+ .append(", the previous rebalance has ")
+ .append(curDoingTasks).append(" task(s) in progress!").toString());
+ return;
+ }
+ final boolean isStartBalance = startupBalance;
+ List<String> groupsNeedToBalance = isStartBalance ?
+ consumerHolder.getAllGroup() : getNeedToBalanceGroupList(strBuffer);
+ strBuffer.delete(0, strBuffer.length());
+ if (!groupsNeedToBalance.isEmpty()) {
+ // set parallel rebalance signal
+ curBalanceParal.set(masterConfig.getRebalanceParallel());
+ // calculate process count
+ int unitNum = (groupsNeedToBalance.size() + masterConfig.getRebalanceParallel() - 1)
+ / masterConfig.getRebalanceParallel();
+ // start processer to do reblance;
+ int startIndex = 0;
+ int endIndex = 0;
+ for (int i = 0; i < masterConfig.getRebalanceParallel(); i++) {
+ // get groups need to rebalance
+ startIndex = Math.min((i) * unitNum, groupsNeedToBalance.size());
+ endIndex = Math.min((i + 1) * unitNum, groupsNeedToBalance.size());
+ final List<String> subGroups = groupsNeedToBalance.subList(startIndex, endIndex);
+ // execute rebalance
+ this.executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (subGroups.isEmpty()) {
+ return;
+ }
+ // first process reset rebalance task;
+ try {
+ tMaster.processResetbalance(rebalanceId,
+ isStartBalance, subGroups);
+ } catch (Throwable e) {
+ logger.warn("[Rebalance processor] Error during reset-reb", e);
+ }
+ if (tMaster.isStopped()) {
+ return;
+ }
+ // second process normal rebalance task;
+ try {
+ tMaster.processRebalance(rebalanceId,
+ isStartBalance, subGroups);
+ } catch (Throwable e) {
+ logger.warn("[Rebalance processor] Error during normal-reb", e);
+ }
+ } catch (Throwable e) {
+ logger.warn("[Rebalance processor] Error during process", e);
+ } finally {
+ curBalanceParal.decrementAndGet();
+ }
+ }
+ });
+ }
+ }
+ startupBalance = false;
+ logger.info(strBuffer.append("[Rebalance End] ").append(rebalanceId).toString());
+ }
+
+ // process unReset group rebalance
+ public void processRebalance(long rebalanceId, boolean isFirstReb, List<String> groups) {
+ // #lizard forgives
Map<String, Map<String, List<Partition>>> finalSubInfoMap = null;
- if (startupBalance) {
- finalSubInfoMap =
- this.loadBalancer.bukAssign(consumerHolder, topicPSInfoManager,
- consumerHolder.getAllGroup(), defaultBrokerConfManager,
- masterConfig.getMaxGroupBrokerConsumeRate(), strBuffer);
- startupBalance = false;
+ final StringBuilder strBuffer = new StringBuilder(512);
+ // choose different load balance strategy
+ if (isFirstReb) {
+ finalSubInfoMap = this.loadBalancer.bukAssign(consumerHolder,
+ topicPSInfoManager, groups, defaultBrokerConfManager,
+ masterConfig.getMaxGroupBrokerConsumeRate(), strBuffer);
} else {
- List<String> groupsNeedToBalance = getNeedToBalanceGroupList(strBuffer);
- finalSubInfoMap =
- this.loadBalancer.balanceCluster(currentSubInfo, consumerHolder, brokerHolder,
- topicPSInfoManager, groupsNeedToBalance, defaultBrokerConfManager,
- masterConfig.getMaxGroupBrokerConsumeRate(), strBuffer);
+ finalSubInfoMap = this.loadBalancer.balanceCluster(currentSubInfo,
+ consumerHolder, brokerHolder, topicPSInfoManager, groups,
+ defaultBrokerConfManager, masterConfig.getMaxGroupBrokerConsumeRate(),
+ strBuffer);
}
+ // allocate partitions to consumers
for (Map.Entry<String, Map<String, List<Partition>>> entry : finalSubInfoMap.entrySet()) {
String consumerId = entry.getKey();
if (consumerId == null) {
@@ -1891,7 +1959,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
|| tupleInfo.f1 == null) {
continue;
}
- List<String> blackTopicList = this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0);
+ List<String> blackTopicList =
+ this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0);
Map<String, List<Partition>> topicSubPartMap = entry.getValue();
List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
@@ -1899,7 +1968,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
String topic = topicEntry.getKey();
List<Partition> finalPartList = topicEntry.getValue();
Map<String, Partition> currentPartMap = null;
- Map<String, Map<String, Partition>> curTopicSubInfoMap = currentSubInfo.get(consumerId);
+ Map<String, Map<String, Partition>> curTopicSubInfoMap =
+ currentSubInfo.get(consumerId);
if (curTopicSubInfoMap == null || curTopicSubInfoMap.get(topic) == null) {
currentPartMap = new HashMap<>();
} else {
@@ -1948,8 +2018,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
for (Partition currentPart : currentPartMap.values()) {
if ((blackTopicList.contains(currentPart.getTopic()))
|| (!finalPartList.contains(currentPart))) {
- deletedSubInfoList
- .add(new SubscribeInfo(consumerId, tupleInfo.f0, false, currentPart));
+ deletedSubInfoList.add(new SubscribeInfo(consumerId,
+ tupleInfo.f0, false, currentPart));
}
}
for (Partition finalPart : finalPartList) {
@@ -1990,42 +2060,24 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
}
}
- logger.info(strBuffer.append("[Rebalance End] ")
- .append(rebalanceId).toString());
}
/**
- * Reset balance
+ * process Reset balance
*/
- private void resetBalance() {
+ public void processResetbalance(long rebalanceId, boolean isFirstReb, List<String> groups) {
// #lizard forgives
- //consumer need reset offset
final StringBuilder strBuffer = new StringBuilder(512);
- long rebalanceId = idGenerator.incrementAndGet();
- if (defaultBdbStoreService != null) {
- logger.info(strBuffer.append("[ResetRebalance Start] ").append(rebalanceId)
- .append(", isMaster=").append(defaultBdbStoreService.isMaster())
- .append(", isPrimaryNodeActive=")
- .append(defaultBdbStoreService.isPrimaryNodeActive()).toString());
- } else {
- logger.info(strBuffer.append("[ResetRebalance Start] ").append(rebalanceId)
- .append(", BDB service is null isMaster= false, isPrimaryNodeActive=false").toString());
- }
- strBuffer.delete(0, strBuffer.length());
Map<String, Map<String, Map<String, Partition>>> finalSubInfoMap = null;
// choose different load balance strategy
- if (startupResetBalance) {
- finalSubInfoMap =
- this.loadBalancer.resetBukAssign(consumerHolder, topicPSInfoManager,
- consumerHolder.getAllGroup(), this.zkOffsetStorage,
- this.defaultBrokerConfManager, strBuffer);
- startupResetBalance = false;
+ if (isFirstReb) {
+ finalSubInfoMap = this.loadBalancer.resetBukAssign(consumerHolder,
+ topicPSInfoManager, groups, this.zkOffsetStorage,
+ this.defaultBrokerConfManager, strBuffer);
} else {
- List<String> groupsNeedToBalance = getNeedToBalanceGroupList(strBuffer);
- finalSubInfoMap =
- this.loadBalancer.resetBalanceCluster(currentSubInfo, consumerHolder,
- topicPSInfoManager, groupsNeedToBalance, this.zkOffsetStorage,
- this.defaultBrokerConfManager, strBuffer);
+ finalSubInfoMap = this.loadBalancer.resetBalanceCluster(currentSubInfo,
+ consumerHolder, topicPSInfoManager, groups, this.zkOffsetStorage,
+ this.defaultBrokerConfManager, strBuffer);
}
// filter
for (Map.Entry<String, Map<String, Map<String, Partition>>> entry
@@ -2041,7 +2093,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
|| tupleInfo.f1 == null) {
continue;
}
-
+ // allocate partitions to consumers
List<String> blackTopicList =
this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0);
Map<String, Map<String, Partition>> topicSubPartMap = entry.getValue();
@@ -2107,7 +2159,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
}
}
- logger.info(strBuffer.append("[ResetRebalance End] ").append(rebalanceId).toString());
}
/**
@@ -2240,9 +2291,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if (this.balancerChore != null) {
this.balancerChore.interrupt();
}
- if (this.resetBalancerChore != null) {
- this.resetBalancerChore.interrupt();
- }
}
/**
@@ -2252,11 +2300,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
* @return
*/
private Thread startBalancerChore(final TMaster master) {
- Chore chore = new Chore("BalancerChore", masterConfig.getConsumerBalancePeriodMs(), master) {
+ Chore chore = new Chore("BalancerChore",
+ masterConfig.getConsumerBalancePeriodMs(), master) {
@Override
protected void chore() {
try {
- master.balance();
+ master.balance(master);
} catch (Throwable e) {
logger.warn("Rebalance throwable error: ", e);
}
@@ -2265,26 +2314,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return ThreadUtils.setDaemonThreadRunning(chore.getThread());
}
- /**
- * Start reset balance chore
- *
- * @param master
- * @return
- */
- private Thread startResetBalancerChore(final TMaster master) {
- Chore chore = new Chore("ResetBalancerChore", masterConfig.getConsumerBalancePeriodMs(), master) {
- @Override
- protected void chore() {
- try {
- master.resetBalance();
- } catch (Throwable e) {
- logger.warn("Reset Rebalance throwable error: ", e);
- }
- }
- };
- return ThreadUtils.setDaemonThreadRunning(chore.getThread());
- }
-
public void stop() {
stop("");
}
@@ -2301,6 +2330,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
try {
webServer.stop();
rpcServiceFactory.destroy();
+ executor.shutdown();
stopChores();
heartbeatManager.stop();
zkOffsetStorage.close();