You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/05/06 02:21:21 UTC
[rocketmq] branch 5.0.0-beta updated: [ISSUE#4233] Move the capability of slaveActingMaster from container module to broker module
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta by this push:
new a4cccea9d [ISSUE#4233] Move the capability of slaveActingMaster from container module to broker module
a4cccea9d is described below
commit a4cccea9db284e92812b2c9f17043a8b4ce589d8
Author: rongtong <ji...@163.com>
AuthorDate: Fri May 6 10:21:14 2022 +0800
[ISSUE#4233] Move the capability of slaveActingMaster from container module to broker module
---
.../apache/rocketmq/broker/BrokerController.java | 384 +++++++++++++++++----
.../rocketmq/broker}/BrokerPreOnlineService.java | 8 +-
.../org/apache/rocketmq/common/BrokerConfig.java | 10 +
.../rocketmq/container/BrokerContainerConfig.java | 9 -
.../rocketmq/container/InnerBrokerController.java | 204 +----------
.../container/InnerSalveBrokerController.java | 114 ------
.../rocketmq/container/BrokerContainerTest.java | 18 +-
.../rocketmq/container/BrokerPreOnlineTest.java | 4 +-
.../container/ContainerIntegrationTestBase.java | 10 +-
9 files changed, 368 insertions(+), 393 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index bab68c481..7814c4feb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -33,8 +34,11 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -97,8 +101,10 @@ import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageC
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.HookUtils;
+import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.BrokerIdentity;
+import org.apache.rocketmq.common.BrokerSyncInfo;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
@@ -182,6 +188,8 @@ public class BrokerController {
private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
protected BrokerOuterAPI brokerOuterAPI;
protected ScheduledExecutorService scheduledExecutorService;
+ protected ScheduledExecutorService syncBrokerMemberGroupExecutorService;
+ protected ScheduledExecutorService brokerHeartbeatExecutorService;
protected final SlaveSynchronize slaveSynchronize;
protected final BlockingQueue<Runnable> sendThreadPoolQueue;
protected final BlockingQueue<Runnable> putThreadPoolQueue;
@@ -238,6 +246,12 @@ public class BrokerController {
protected EscapeBridge escapeBridge;
protected List<BrokerAttachedPlugin> brokerAttachedPlugins = new ArrayList<>();
protected volatile long shouldStartTime;
+ private BrokerPreOnlineService brokerPreOnlineService;
+ protected volatile boolean isIsolated = false;
+ protected volatile long minBrokerIdInGroup = 0;
+ protected volatile String minBrokerAddrInGroup = null;
+ private final Lock lock = new ReentrantLock();
+ protected final List<ScheduledFuture<?>> scheduledFutures = new ArrayList<>();
public BrokerController(
final BrokerConfig brokerConfig,
@@ -361,6 +375,10 @@ public class BrokerController {
this.brokerMemberGroup.getBrokerAddrs().put(this.brokerConfig.getBrokerId(), this.getBrokerAddr());
this.escapeBridge = new EscapeBridge(this);
+
+ if (!this.brokerConfig.isSkipPreOnline()) {
+ this.brokerPreOnlineService = new BrokerPreOnlineService(this);
+ }
}
public BrokerConfig getBrokerConfig() {
@@ -501,6 +519,11 @@ public class BrokerController {
this.loadBalanceThreadPoolQueue,
new ThreadFactoryImpl("LoadBalanceProcessorThread_", getBrokerIdentity()));
+ this.syncBrokerMemberGroupExecutorService = new ScheduledThreadPoolExecutor(1,
+ new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread", getBrokerIdentity()));
+ this.brokerHeartbeatExecutorService = new ScheduledThreadPoolExecutor(1,
+ new ThreadFactoryImpl("rokerControllerHeartbeatScheduledThread", getBrokerIdentity()));
+
this.topicQueueMappingCleanService = new TopicQueueMappingCleanService(this);
}
@@ -1273,6 +1296,13 @@ public class BrokerController {
escapeBridge.shutdown();
}
+ if (this.brokerPreOnlineService != null && !this.brokerPreOnlineService.isStopped()) {
+ this.brokerPreOnlineService.shutdown();
+ }
+
+ shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService);
+ shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService);
+
this.topicConfigManager.persist();
this.subscriptionGroupManager.persist();
@@ -1287,6 +1317,10 @@ public class BrokerController {
shutdownBasicService();
+ for (ScheduledFuture<?> scheduledFuture : scheduledFutures) {
+ scheduledFuture.cancel(true);
+ }
+
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.shutdown();
}
@@ -1385,6 +1419,10 @@ public class BrokerController {
this.escapeBridge.start();
}
+ if (this.brokerPreOnlineService != null) {
+ this.brokerPreOnlineService.start();
+ }
+
//Init state version after messageStore initialized.
this.topicConfigManager.initStateVersion();
}
@@ -1393,33 +1431,70 @@ public class BrokerController {
this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();
+ if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster()) {
+ isIsolated = true;
+ }
+
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
startBasicService();
- if (!this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
+ if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
this.registerBrokerAll(true, false, true);
}
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
+ scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override
- public void run() {
+ public void run2() {
try {
if (System.currentTimeMillis() < shouldStartTime) {
- LOG.info("Register to namesrv after {}", shouldStartTime);
+ BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
+ return;
+ }
+ if (isIsolated) {
+ BrokerController.LOG.info("Skip register for broker is isolated");
return;
}
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
- LOG.error("BrokerController#registerBrokerAll: unexpected error occurs.", e);
+ BrokerController.LOG.error("registerBrokerAll Exception", e);
}
}
- }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
+ }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));
+
+ if (this.brokerConfig.isEnableSlaveActingMaster()) {
+ scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
+ @Override
+ public void run2() {
+ if (isIsolated) {
+ return;
+ }
+ try {
+ BrokerController.this.sendHeartbeat();
+ } catch (Exception e) {
+ BrokerController.LOG.error("sendHeartbeat Exception", e);
+ }
+
+ }
+ }, 1000, brokerConfig.getBrokerHeartbeatInterval(), TimeUnit.MILLISECONDS));
+ scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
+ @Override public void run2() {
+ try {
+ BrokerController.this.syncBrokerMemberGroup();
+ } catch (Throwable e) {
+ BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);
+ }
+ }
+ }, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
+ }
+
+ if (brokerConfig.isSkipPreOnline()) {
+ startServiceWithoutCondition();
+ }
}
public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
@@ -1503,10 +1578,10 @@ public class BrokerController {
TopicConfigSerializeWrapper topicConfigWrapper) {
if (shutdown) {
- LOG.info("BrokerController#doResterBrokerAll: broker has shutdown, no need to register any more.");
+ BrokerController.LOG.info("BrokerController#doResterBrokerAll: broker has shutdown, no need to register any more.");
return;
}
- List<RegisterBrokerResult> registerBrokerResultList = this.getBrokerOuterAPI().registerBrokerAll(
+ List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
@@ -1518,11 +1593,61 @@ public class BrokerController {
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isEnableSlaveActingMaster(),
this.brokerConfig.isCompressedRegister(),
+ this.brokerConfig.isEnableSlaveActingMaster() ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,
this.getBrokerIdentity());
handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);
}
+ protected void sendHeartbeat() {
+ if (this.brokerConfig.isCompatibleWithOldNameSrv()) {
+ this.brokerOuterAPI.sendHeartbeatViaDataVersion(
+ this.brokerConfig.getBrokerClusterName(),
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId(),
+ this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+ this.getTopicConfigManager().getDataVersion(),
+ this.brokerConfig.isInBrokerContainer());
+ } else {
+ this.brokerOuterAPI.sendHeartbeat(
+ this.brokerConfig.getBrokerClusterName(),
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId(),
+ this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+ this.brokerConfig.isInBrokerContainer());
+ }
+ }
+
+ protected void syncBrokerMemberGroup() {
+ try {
+ brokerMemberGroup = this.getBrokerOuterAPI()
+ .syncBrokerMemberGroup(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.brokerConfig.isCompatibleWithOldNameSrv());
+ } catch (Exception e) {
+ BrokerController.LOG.error("syncBrokerMemberGroup from namesrv failed, ", e);
+ return;
+ }
+ if (brokerMemberGroup == null || brokerMemberGroup.getBrokerAddrs().size() == 0) {
+ BrokerController.LOG.warn("Couldn't find any broker member from namesrv in {}/{}", this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName());
+ return;
+ }
+ this.messageStore.setAliveReplicaNumInGroup(calcAliveBrokerNumInGroup(brokerMemberGroup.getBrokerAddrs()));
+
+ if (!this.isIsolated) {
+ long minBrokerId = brokerMemberGroup.minimumBrokerId();
+ this.updateMinBroker(minBrokerId, brokerMemberGroup.getBrokerAddrs().get(minBrokerId));
+ }
+ }
+
+ private int calcAliveBrokerNumInGroup(Map<Long, String> brokerAddrTable) {
+ if (brokerAddrTable.containsKey(this.brokerConfig.getBrokerId())) {
+ return brokerAddrTable.size();
+ } else {
+ return brokerAddrTable.size() + 1;
+ }
+ }
+
protected void handleRegisterBrokerResult(List<RegisterBrokerResult> registerBrokerResultList,
boolean checkOrderConfig) {
for (RegisterBrokerResult registerBrokerResult : registerBrokerResultList) {
@@ -1570,6 +1695,189 @@ public class BrokerController {
return null;
}
+ public void startService(long minBrokerId, String minBrokerAddr) {
+ BrokerController.LOG.info("{} start service, min broker id is {}, min broker addr: {}",
+ this.brokerConfig.getCanonicalName(), minBrokerId, minBrokerAddr);
+ this.minBrokerIdInGroup = minBrokerId;
+ this.minBrokerAddrInGroup = minBrokerAddr;
+
+ this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == minBrokerId);
+ this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
+
+ isIsolated = false;
+ }
+
+ public void startServiceWithoutCondition() {
+ BrokerController.LOG.info("{} start service", this.brokerConfig.getCanonicalName());
+
+ this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
+ this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
+
+ isIsolated = false;
+ }
+
+ public void stopService() {
+ BrokerController.LOG.info("{} stop service", this.getBrokerConfig().getCanonicalName());
+ isIsolated = true;
+ this.changeSpecialServiceStatus(false);
+ }
+
+ public boolean isSpecialServiceRunning() {
+ if (isScheduleServiceStart() && isTransactionCheckServiceStart()) {
+ return true;
+ }
+
+ return this.ackMessageProcessor != null && this.ackMessageProcessor.isPopReviveServiceRunning();
+ }
+
+ private void onMasterOffline() {
+ // close channels with master broker
+ String masterAddr = this.slaveSynchronize.getMasterAddr();
+ if (masterAddr != null) {
+ this.brokerOuterAPI.getRemotingClient().closeChannels(
+ Arrays.asList(masterAddr, MixAll.brokerVIPChannel(true, masterAddr)));
+ }
+ // master not available, stop sync
+ this.slaveSynchronize.setMasterAddr(null);
+ this.messageStore.updateHaMasterAddress(null);
+ }
+
+ private void onMasterOnline(String masterAddr, String masterHaAddr) {
+ boolean needSyncMasterFlushOffset = this.messageStore.getMasterFlushedOffset() == 0
+ && this.messageStoreConfig.isSyncMasterFlushOffsetWhenStartup();
+ if (masterHaAddr == null || needSyncMasterFlushOffset) {
+ try {
+ BrokerSyncInfo brokerSyncInfo = this.brokerOuterAPI.retrieveBrokerHaInfo(masterAddr);
+
+ if (needSyncMasterFlushOffset) {
+ LOG.info("Set master flush offset in slave to {}", brokerSyncInfo.getMasterFlushOffset());
+ this.messageStore.setMasterFlushedOffset(brokerSyncInfo.getMasterFlushOffset());
+ }
+
+ if (masterHaAddr == null) {
+ this.messageStore.updateHaMasterAddress(brokerSyncInfo.getMasterHaAddress());
+ this.messageStore.updateMasterAddress(brokerSyncInfo.getMasterAddress());
+ }
+ } catch (Exception e) {
+ LOG.error("retrieve master ha info exception, {}", e);
+ }
+ }
+
+ // set master HA address.
+ if (masterHaAddr != null) {
+ this.messageStore.updateHaMasterAddress(masterHaAddr);
+ }
+
+ // wakeup HAClient
+ this.messageStore.wakeupHAClient();
+ }
+
+ private void onMinBrokerChange(long minBrokerId, String minBrokerAddr, String offlineBrokerAddr,
+ String masterHaAddr) {
+ LOG.info("Min broker changed, old: {}-{}, new {}-{}",
+ this.minBrokerIdInGroup, this.minBrokerAddrInGroup, minBrokerId, minBrokerAddr);
+
+ this.minBrokerIdInGroup = minBrokerId;
+ this.minBrokerAddrInGroup = minBrokerAddr;
+
+ this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == this.minBrokerIdInGroup);
+
+ if (offlineBrokerAddr != null && offlineBrokerAddr.equals(this.slaveSynchronize.getMasterAddr())) {
+ // master offline
+ onMasterOffline();
+ }
+
+ if (minBrokerId == MixAll.MASTER_ID && minBrokerAddr != null) {
+ // master online
+ onMasterOnline(minBrokerAddr, masterHaAddr);
+ }
+
+ // notify PullRequest on hold to pull from master.
+ if (this.minBrokerIdInGroup == MixAll.MASTER_ID) {
+ this.pullRequestHoldService.notifyMasterOnline();
+ }
+ }
+
+ public void updateMinBroker(long minBrokerId, String minBrokerAddr) {
+ if (brokerConfig.isEnableSlaveActingMaster() && brokerConfig.getBrokerId() != MixAll.MASTER_ID) {
+ if (lock.tryLock()) {
+ try {
+ if (minBrokerId != this.minBrokerIdInGroup) {
+ String offlineBrokerAddr = null;
+ if (minBrokerId > this.minBrokerIdInGroup) {
+ offlineBrokerAddr = this.minBrokerAddrInGroup;
+ }
+ onMinBrokerChange(minBrokerId, minBrokerAddr, offlineBrokerAddr, null);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+ }
+
+ public void updateMinBroker(long minBrokerId, String minBrokerAddr, String offlineBrokerAddr,
+ String masterHaAddr) {
+ if (brokerConfig.isEnableSlaveActingMaster() && brokerConfig.getBrokerId() != MixAll.MASTER_ID) {
+ try {
+ if (lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+ try {
+ if (minBrokerId != this.minBrokerIdInGroup) {
+ onMinBrokerChange(minBrokerId, minBrokerAddr, offlineBrokerAddr, masterHaAddr);
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Update min broker error, {}", e);
+ }
+ }
+ }
+
+ public void changeSpecialServiceStatus(boolean shouldStart) {
+
+ for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {
+ if (brokerAttachedPlugin != null) {
+ brokerAttachedPlugin.statusChanged(shouldStart);
+ }
+ }
+
+ changeScheduleServiceStatus(shouldStart);
+
+ changeTransactionCheckServiceStatus(shouldStart);
+
+ if (this.ackMessageProcessor != null) {
+ LOG.info("Set PopReviveService Status to {}", shouldStart);
+ this.ackMessageProcessor.setPopReviveServiceStatus(shouldStart);
+ }
+ }
+
+ private synchronized void changeTransactionCheckServiceStatus(boolean shouldStart) {
+ if (isTransactionCheckServiceStart != shouldStart) {
+ LOG.info("TransactionCheckService status changed to {}", shouldStart);
+ if (shouldStart) {
+ this.transactionalMessageCheckService.start();
+ } else {
+ this.transactionalMessageCheckService.shutdown(true);
+ }
+ isTransactionCheckServiceStart = shouldStart;
+ }
+ }
+
+ public synchronized void changeScheduleServiceStatus(boolean shouldStart) {
+ if (isScheduleServiceStart != shouldStart) {
+ LOG.info("ScheduleServiceStatus changed to {}", shouldStart);
+ if (shouldStart) {
+ this.scheduleMessageService.start();
+ } else {
+ this.scheduleMessageService.stop();
+ }
+ isScheduleServiceStart = shouldStart;
+ }
+ }
+
public TopicConfigManager getTopicConfigManager() {
return topicConfigManager;
}
@@ -1772,18 +2080,6 @@ public class BrokerController {
return this.brokerMemberGroup;
}
- public boolean isSpecialServiceRunning() {
- return this.brokerConfig.getBrokerId() == MixAll.MASTER_ID;
- }
-
- public void updateMinBroker(long minBrokerId, String minBrokerAddr) {
- //do nothing
- }
-
- public void updateMinBroker(long minBrokerId, String minBrokerAddr, String offlineBrokerAddr, String masterHaAddr) {
- //do nothing
- }
-
public int getListenPort() {
return this.nettyServerConfig.getListenPort();
}
@@ -1800,46 +2096,8 @@ public class BrokerController {
return shouldStartTime;
}
- public void changeSpecialServiceStatus(boolean shouldStart) {
-
- for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {
- if (brokerAttachedPlugin != null) {
- brokerAttachedPlugin.statusChanged(shouldStart);
- }
- }
-
- changeScheduleServiceStatus(shouldStart);
-
- changeTransactionCheckServiceStatus(shouldStart);
-
- if (this.ackMessageProcessor != null) {
- LOG.info("Set PopReviveService Status to {}", shouldStart);
- this.ackMessageProcessor.setPopReviveServiceStatus(shouldStart);
- }
- }
-
- private synchronized void changeTransactionCheckServiceStatus(boolean shouldStart) {
- if (isTransactionCheckServiceStart != shouldStart) {
- LOG.info("TransactionCheckService status changed to {}", shouldStart);
- if (shouldStart) {
- this.transactionalMessageCheckService.start();
- } else {
- this.transactionalMessageCheckService.shutdown(true);
- }
- isTransactionCheckServiceStart = shouldStart;
- }
- }
-
- public synchronized void changeScheduleServiceStatus(boolean shouldStart) {
- if (isScheduleServiceStart != shouldStart) {
- LOG.info("ScheduleServiceStatus changed to {}", shouldStart);
- if (shouldStart) {
- this.scheduleMessageService.start();
- } else {
- this.scheduleMessageService.stop();
- }
- isScheduleServiceStart = shouldStart;
- }
+ public BrokerPreOnlineService getBrokerPreOnlineService() {
+ return brokerPreOnlineService;
}
public boolean isScheduleServiceStart() {
@@ -1872,4 +2130,8 @@ public class BrokerController {
brokerConfig.getBrokerId(), brokerConfig.isInBrokerContainer());
}
}
+
+ public boolean isIsolated() {
+ return this.isIsolated;
+ }
}
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerPreOnlineService.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java
similarity index 97%
rename from container/src/main/java/org/apache/rocketmq/container/BrokerPreOnlineService.java
rename to broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java
index c85eb654e..10757c0db 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerPreOnlineService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.container;
+package org.apache.rocketmq.broker;
import java.io.IOException;
import java.util.ArrayList;
@@ -42,11 +42,11 @@ import org.apache.rocketmq.store.ha.HAConnectionStateNotificationRequest;
public class BrokerPreOnlineService extends ServiceThread {
private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private final InnerBrokerController brokerController;
+ private final BrokerController brokerController;
private int waitBrokerIndex = 0;
- public BrokerPreOnlineService(InnerBrokerController brokerController) {
+ public BrokerPreOnlineService(BrokerController brokerController) {
this.brokerController = brokerController;
}
@@ -241,7 +241,7 @@ public class BrokerPreOnlineService extends ServiceThread {
brokerMemberGroup = this.brokerController.getBrokerOuterAPI().syncBrokerMemberGroup(
this.brokerController.getBrokerConfig().getBrokerClusterName(),
this.brokerController.getBrokerConfig().getBrokerName(),
- this.brokerController.getBrokerContainer().getBrokerContainerConfig().isCompatibleWithOldNameSrv());
+ this.brokerController.getBrokerConfig().isCompatibleWithOldNameSrv());
} catch (Exception e) {
LOGGER.error("syncBrokerMemberGroup from namesrv error, start service failed, will try later, ", e);
return false;
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index efd4ec90a..71280ea0b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -295,6 +295,8 @@ public class BrokerConfig extends BrokerIdentity {
private String metaDataHosts = "";
+ private boolean compatibleWithOldNameSrv = true;
+
public long getMaxPopPollingSize() {
return maxPopPollingSize;
}
@@ -1270,4 +1272,12 @@ public class BrokerConfig extends BrokerIdentity {
public void setIsolateLogEnable(boolean isolateLogEnable) {
this.isolateLogEnable = isolateLogEnable;
}
+
+ public boolean isCompatibleWithOldNameSrv() {
+ return compatibleWithOldNameSrv;
+ }
+
+ public void setCompatibleWithOldNameSrv(boolean compatibleWithOldNameSrv) {
+ this.compatibleWithOldNameSrv = compatibleWithOldNameSrv;
+ }
}
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
index 28a524254..564cd37ab 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
@@ -36,8 +36,6 @@ public class BrokerContainerConfig {
private String brokerConfigPaths = null;
- private boolean compatibleWithOldNameSrv = true;
-
public String getRocketmqHome() {
return rocketmqHome;
}
@@ -74,11 +72,4 @@ public class BrokerContainerConfig {
this.brokerConfigPaths = brokerConfigPaths;
}
- public boolean isCompatibleWithOldNameSrv() {
- return compatibleWithOldNameSrv;
- }
-
- public void setCompatibleWithOldNameSrv(boolean compatibleWithOldNameSrv) {
- this.compatibleWithOldNameSrv = compatibleWithOldNameSrv;
- }
}
diff --git a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
index 1f0d0fe91..2efa939fa 100644
--- a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
+++ b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
@@ -16,36 +16,20 @@
*/
package org.apache.rocketmq.container;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
-import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
public class InnerBrokerController extends BrokerController {
- private ScheduledExecutorService syncBrokerMemberGroupExecutorService;
- private ScheduledExecutorService brokerHeartbeatExecutorService;
- protected volatile long minBrokerIdInGroup = 0;
- protected volatile String minBrokerAddrInGroup = null;
- protected final List<ScheduledFuture<?>> scheduledFutures = new ArrayList<>();
- private BrokerPreOnlineService brokerPreOnlineService;
protected BrokerContainer brokerContainer;
- protected volatile boolean isIsolated = false;
public InnerBrokerController(
final BrokerContainer brokerContainer,
@@ -55,10 +39,6 @@ public class InnerBrokerController extends BrokerController {
super(brokerConfig, messageStoreConfig);
this.brokerContainer = brokerContainer;
this.brokerOuterAPI = this.brokerContainer.getBrokerOuterAPI();
-
- if (!this.brokerConfig.isSkipPreOnline()) {
- this.brokerPreOnlineService = new BrokerPreOnlineService(this);
- }
}
@Override
@@ -67,70 +47,23 @@ public class InnerBrokerController extends BrokerController {
this.fastRemotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort() - 2);
}
- /**
- * Initialize resources for master which will be re-used by slave.
- */
- @Override
- protected void initializeResources() {
- super.initializeResources();
- this.syncBrokerMemberGroupExecutorService = new ScheduledThreadPoolExecutor(1,
- new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread", super.getBrokerIdentity()));
- this.brokerHeartbeatExecutorService = new ScheduledThreadPoolExecutor(1,
- new ThreadFactoryImpl("rokerControllerHeartbeatScheduledThread", super.getBrokerIdentity()));
- }
-
@Override
protected void initializeScheduledTasks() {
initializeBrokerScheduledTasks();
}
- @Override
- public void shutdown() {
-
- shutdownBasicService();
-
- if (this.remotingServer != null) {
- this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort());
- }
-
- if (this.fastRemotingServer != null) {
- this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort() - 2);
- }
-
- for (ScheduledFuture<?> scheduledFuture : scheduledFutures) {
- scheduledFuture.cancel(true);
- }
-
- if (this.brokerPreOnlineService != null && !this.brokerPreOnlineService.isStopped()) {
- this.brokerPreOnlineService.shutdown();
- }
-
- shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService);
- shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService);
-
- }
-
- @Override
- public String getBrokerAddr() {
- return this.brokerConfig.getBrokerIP1() + ":" + this.brokerConfig.getListenPort();
- }
-
@Override
public void start() throws Exception {
-
this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();
- if (messageStoreConfig.getTotalReplicas() > 1) {
+ if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster()) {
isIsolated = true;
}
startBasicService();
- if (this.brokerPreOnlineService != null) {
- this.brokerPreOnlineService.start();
- }
-
- if (!isIsolated) {
+ if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
+ changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
this.registerBrokerAll(true, false, true);
}
@@ -178,12 +111,6 @@ public class InnerBrokerController extends BrokerController {
}
}
}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
-
- }
-
- if (!isIsolated && !messageStoreConfig.isEnableDLegerCommitLog()
- && !messageStoreConfig.isDuplicationEnable() && !this.brokerConfig.isEnableSlaveActingMaster()) {
- changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
}
if (brokerConfig.isSkipPreOnline()) {
@@ -191,79 +118,27 @@ public class InnerBrokerController extends BrokerController {
}
}
- private void sendHeartbeat() {
- if (this.brokerContainer.getBrokerContainerConfig().isCompatibleWithOldNameSrv()) {
- this.brokerOuterAPI.sendHeartbeatViaDataVersion(
- this.brokerConfig.getBrokerClusterName(),
- this.getBrokerAddr(),
- this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId(),
- this.brokerConfig.getSendHeartbeatTimeoutMillis(),
- this.getTopicConfigManager().getDataVersion(),
- this.brokerConfig.isInBrokerContainer());
- } else {
- this.brokerOuterAPI.sendHeartbeat(
- this.brokerConfig.getBrokerClusterName(),
- this.getBrokerAddr(),
- this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId(),
- this.brokerConfig.getSendHeartbeatTimeoutMillis(),
- this.brokerConfig.isInBrokerContainer());
- }
- }
+ @Override
+ public void shutdown() {
- public void syncBrokerMemberGroup() {
- try {
- brokerMemberGroup = this.brokerContainer.getBrokerOuterAPI()
- .syncBrokerMemberGroup(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.brokerContainer.getBrokerContainerConfig().isCompatibleWithOldNameSrv());
- } catch (Exception e) {
- BrokerController.LOG.error("syncBrokerMemberGroup from namesrv failed, ", e);
- return;
- }
- if (brokerMemberGroup == null || brokerMemberGroup.getBrokerAddrs().size() == 0) {
- BrokerController.LOG.warn("Couldn't find any broker member from namesrv in {}/{}", this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName());
- return;
+ shutdownBasicService();
+
+ for (ScheduledFuture<?> scheduledFuture : scheduledFutures) {
+ scheduledFuture.cancel(true);
}
- this.messageStore.setAliveReplicaNumInGroup(calcAliveBrokerNumInGroup(brokerMemberGroup.getBrokerAddrs()));
- if (!this.isIsolated) {
- long minBrokerId = brokerMemberGroup.minimumBrokerId();
- this.updateMinBroker(minBrokerId, brokerMemberGroup.getBrokerAddrs().get(minBrokerId));
+ if (this.remotingServer != null) {
+ this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort());
}
- }
- private int calcAliveBrokerNumInGroup(Map<Long, String> brokerAddrTable) {
- if (brokerAddrTable.containsKey(this.brokerConfig.getBrokerId())) {
- return brokerAddrTable.size();
- } else {
- return brokerAddrTable.size() + 1;
+ if (this.fastRemotingServer != null) {
+ this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort() - 2);
}
}
@Override
- protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
- TopicConfigSerializeWrapper topicConfigWrapper) {
-
- if (shutdown) {
- BrokerController.LOG.info("BrokerController#doResterBrokerAll: broker has shutdown, no need to register any more.");
- return;
- }
- List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
- this.brokerConfig.getBrokerClusterName(),
- this.getBrokerAddr(),
- this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId(),
- this.getHAServerAddr(),
- topicConfigWrapper,
- this.filterServerManager.buildNewFilterServerList(),
- oneway,
- this.brokerConfig.getRegisterBrokerTimeoutMills(),
- this.brokerConfig.isEnableSlaveActingMaster(),
- this.brokerConfig.isCompressedRegister(),
- this.brokerConfig.isEnableSlaveActingMaster() ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,
- this.getBrokerIdentity());
-
- handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);
+ public String getBrokerAddr() {
+ return this.brokerConfig.getBrokerIP1() + ":" + this.brokerConfig.getListenPort();
}
@Override
@@ -287,15 +162,6 @@ public class InnerBrokerController extends BrokerController {
return this.minBrokerIdInGroup;
}
- @Override
- public boolean isSpecialServiceRunning() {
- if (isScheduleServiceStart() && isTransactionCheckServiceStart()) {
- return true;
- }
-
- return this.ackMessageProcessor != null && this.ackMessageProcessor.isPopReviveServiceRunning();
- }
-
@Override
public int getListenPort() {
return this.brokerConfig.getListenPort();
@@ -305,46 +171,10 @@ public class InnerBrokerController extends BrokerController {
return brokerContainer.getBrokerOuterAPI();
}
- public void startService(long minBrokerId, String minBrokerAddr) {
- BrokerController.LOG.info("{} start service, min broker id is {}, min broker addr: {}",
- this.brokerConfig.getCanonicalName(), minBrokerId, minBrokerAddr);
- this.minBrokerIdInGroup = minBrokerId;
- this.minBrokerAddrInGroup = minBrokerAddr;
-
- this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == minBrokerId);
- this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
-
- isIsolated = false;
- }
-
- public void startServiceWithoutCondition() {
- BrokerController.LOG.info("{} start service", this.brokerConfig.getCanonicalName());
-
- this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
- this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
-
- isIsolated = false;
- }
-
- public void stopService() {
- BrokerController.LOG.info("{} stop service", this.getBrokerConfig().getCanonicalName());
- isIsolated = true;
- this.changeSpecialServiceStatus(false);
- this.closeChannels();
- }
-
- public synchronized void closeChannels() {
- this.brokerContainer.getBrokerOuterAPI().getRemotingClient().closeChannels();
- }
-
public BrokerContainer getBrokerContainer() {
return this.brokerContainer;
}
- public boolean isIsolated() {
- return this.isIsolated;
- }
-
public NettyServerConfig getNettyServerConfig() {
return brokerContainer.getNettyServerConfig();
}
@@ -371,8 +201,4 @@ public class InnerBrokerController extends BrokerController {
}
return this.brokerContainer.peekMasterBroker();
}
-
- public BrokerPreOnlineService getBrokerPreOnlineService() {
- return brokerPreOnlineService;
- }
}
diff --git a/container/src/main/java/org/apache/rocketmq/container/InnerSalveBrokerController.java b/container/src/main/java/org/apache/rocketmq/container/InnerSalveBrokerController.java
index e00242a93..a7901bc7d 100644
--- a/container/src/main/java/org/apache/rocketmq/container/InnerSalveBrokerController.java
+++ b/container/src/main/java/org/apache/rocketmq/container/InnerSalveBrokerController.java
@@ -19,14 +19,10 @@ package org.apache.rocketmq.container;
import com.google.common.base.Preconditions;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.BrokerSyncInfo;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -47,114 +43,4 @@ public class InnerSalveBrokerController extends InnerBrokerController {
Preconditions.checkNotNull(brokerConfig.getBrokerName());
Preconditions.checkArgument(brokerConfig.getBrokerId() != MixAll.MASTER_ID);
}
-
- private void onMasterOffline() {
- // close channels with master broker
- String masterAddr = this.slaveSynchronize.getMasterAddr();
- if (masterAddr != null) {
- this.brokerOuterAPI.getRemotingClient().closeChannels(
- Arrays.asList(masterAddr, MixAll.brokerVIPChannel(true, masterAddr)));
- }
- // master not available, stop sync
- this.slaveSynchronize.setMasterAddr(null);
- this.messageStore.updateHaMasterAddress(null);
- }
-
- private void onMasterOnline(String masterAddr, String masterHaAddr) {
- boolean needSyncMasterFlushOffset = this.messageStore.getMasterFlushedOffset() == 0
- && this.messageStoreConfig.isSyncMasterFlushOffsetWhenStartup();
- if (masterHaAddr == null || needSyncMasterFlushOffset) {
- try {
- BrokerSyncInfo brokerSyncInfo = this.brokerOuterAPI.retrieveBrokerHaInfo(masterAddr);
-
- if (needSyncMasterFlushOffset) {
- LOG.info("Set master flush offset in slave to {}", brokerSyncInfo.getMasterFlushOffset());
- this.messageStore.setMasterFlushedOffset(brokerSyncInfo.getMasterFlushOffset());
- }
-
- if (masterHaAddr == null) {
- this.messageStore.updateHaMasterAddress(brokerSyncInfo.getMasterHaAddress());
- this.messageStore.updateMasterAddress(brokerSyncInfo.getMasterAddress());
- }
- } catch (Exception e) {
- LOG.error("retrieve master ha info exception, {}", e);
- }
- }
-
- // set master HA address.
- if (masterHaAddr != null) {
- this.messageStore.updateHaMasterAddress(masterHaAddr);
- }
-
- // wakeup HAClient
- this.messageStore.wakeupHAClient();
- }
-
- private void onMinBrokerChange(long minBrokerId, String minBrokerAddr, String offlineBrokerAddr,
- String masterHaAddr) {
- LOG.info("Min broker changed, old: {}-{}, new {}-{}",
- this.minBrokerIdInGroup, this.minBrokerAddrInGroup, minBrokerId, minBrokerAddr);
-
- this.minBrokerIdInGroup = minBrokerId;
- this.minBrokerAddrInGroup = minBrokerAddr;
-
- this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == this.minBrokerIdInGroup);
-
- if (offlineBrokerAddr != null && offlineBrokerAddr.equals(this.slaveSynchronize.getMasterAddr())) {
- // master offline
- onMasterOffline();
- }
-
- if (minBrokerId == MixAll.MASTER_ID && minBrokerAddr != null) {
- // master online
- onMasterOnline(minBrokerAddr, masterHaAddr);
- }
-
- // notify PullRequest on hold to pull from master.
- if (this.minBrokerIdInGroup == MixAll.MASTER_ID) {
- this.pullRequestHoldService.notifyMasterOnline();
- }
- }
-
- @Override
- public void updateMinBroker(long minBrokerId, String minBrokerAddr) {
- if (lock.tryLock()) {
- try {
- if (minBrokerId != this.minBrokerIdInGroup) {
- String offlineBrokerAddr = null;
- if (minBrokerId > this.minBrokerIdInGroup) {
- offlineBrokerAddr = this.minBrokerAddrInGroup;
- }
- onMinBrokerChange(minBrokerId, minBrokerAddr, offlineBrokerAddr, null);
- }
- } finally {
- lock.unlock();
- }
-
- }
- }
-
- @Override
- public void updateMinBroker(long minBrokerId, String minBrokerAddr, String offlineBrokerAddr,
- String masterHaAddr) {
- try {
- if (lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
- try {
- if (minBrokerId != this.minBrokerIdInGroup) {
- onMinBrokerChange(minBrokerId, minBrokerAddr, offlineBrokerAddr, masterHaAddr);
- }
- } finally {
- lock.unlock();
- }
-
- }
- } catch (InterruptedException e) {
- LOG.error("Update min broker error, {}", e);
- }
- }
-
- @Override
- public BrokerController peekMasterBroker() {
- return this.brokerContainer.peekMasterBroker();
- }
}
diff --git a/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java b/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java
index 351d88def..73354af88 100644
--- a/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java
+++ b/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java
@@ -76,13 +76,13 @@ public class BrokerContainerTest {
new NettyClientConfig(),
new MessageStoreConfig());
+ brokerController.getBrokerConfig().setEnableSlaveActingMaster(true);
+
BrokerOuterAPI brokerOuterAPI = mock(BrokerOuterAPI.class);
Field field = BrokerController.class.getDeclaredField("brokerOuterAPI");
field.setAccessible(true);
field.set(brokerController, brokerOuterAPI);
- // topic-0 doesn't have queueGroupConfig.
- // topic-1 has queueGroupConfig.
List<TopicConfig> topicConfigList = new ArrayList<>(2);
for (int i = 0; i < 2; i++) {
topicConfigList.add(new TopicConfig("topic-" + i));
@@ -108,13 +108,13 @@ public class BrokerContainerTest {
new NettyClientConfig(),
new MessageStoreConfig());
+ brokerController.getBrokerConfig().setEnableSlaveActingMaster(true);
+
BrokerOuterAPI brokerOuterAPI = mock(BrokerOuterAPI.class);
Field field = BrokerController.class.getDeclaredField("brokerOuterAPI");
field.setAccessible(true);
field.set(brokerController, brokerOuterAPI);
- // topic-0 doesn't have queueGroupConfig.
- // topic-1 has queueGroupConfig.
List<TopicConfig> topicConfigList = new ArrayList<>(2);
for (int i = 0; i < 2; i++) {
topicConfigList.add(new TopicConfig("topic-" + i));
@@ -128,7 +128,7 @@ public class BrokerContainerTest {
ArgumentCaptor<TopicConfigSerializeWrapper> captor = ArgumentCaptor.forClass(TopicConfigSerializeWrapper.class);
ArgumentCaptor<BrokerIdentity> brokerIdentityCaptor = ArgumentCaptor.forClass(BrokerIdentity.class);
verify(brokerOuterAPI).registerBrokerAll(anyString(), anyString(), anyString(), anyLong(), anyString(),
- captor.capture(), ArgumentMatchers.anyList(), anyBoolean(), anyInt(), anyBoolean(), anyBoolean(), brokerIdentityCaptor.capture());
+ captor.capture(), ArgumentMatchers.anyList(), anyBoolean(), anyInt(), anyBoolean(), anyBoolean(), anyLong(), brokerIdentityCaptor.capture());
TopicConfigSerializeWrapper wrapper = captor.getValue();
for (Map.Entry<String, TopicConfig> entry : wrapper.getTopicConfigTable().entrySet()) {
assertThat(entry.getValue().getPerm()).isEqualTo(brokerController.getBrokerConfig().getBrokerPermission());
@@ -245,9 +245,9 @@ public class BrokerContainerTest {
@Test
public void testAddAndRemoveDLedgerBroker() throws Exception {
BrokerContainer brokerContainer = new BrokerContainer(
- new BrokerContainerConfig(),
- new NettyServerConfig(),
- new NettyClientConfig());
+ new BrokerContainerConfig(),
+ new NettyServerConfig(),
+ new NettyClientConfig());
assertThat(brokerContainer.initialize()).isTrue();
brokerContainer.start();
@@ -354,7 +354,7 @@ public class BrokerContainerTest {
ArgumentCaptor<TopicConfigSerializeWrapper> captor = ArgumentCaptor.forClass(TopicConfigSerializeWrapper.class);
ArgumentCaptor<BrokerIdentity> brokerIdentityCaptor = ArgumentCaptor.forClass(BrokerIdentity.class);
verify(brokerOuterAPI, times(times)).registerBrokerAll(anyString(), anyString(), anyString(), anyLong(),
- anyString(), captor.capture(), ArgumentMatchers.anyList(), anyBoolean(), anyInt(), anyBoolean(), anyBoolean(), brokerIdentityCaptor.capture());
+ anyString(), captor.capture(), ArgumentMatchers.anyList(), anyBoolean(), anyInt(), anyBoolean(), anyBoolean(), anyLong(), brokerIdentityCaptor.capture());
TopicConfigSerializeWrapper wrapper = captor.getValue();
for (TopicConfig topicConfig : topicConfigList) {
diff --git a/container/src/test/java/org/apache/rocketmq/container/BrokerPreOnlineTest.java b/container/src/test/java/org/apache/rocketmq/container/BrokerPreOnlineTest.java
index 6943d28a9..d9258bbb1 100644
--- a/container/src/test/java/org/apache/rocketmq/container/BrokerPreOnlineTest.java
+++ b/container/src/test/java/org/apache/rocketmq/container/BrokerPreOnlineTest.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerPreOnlineService;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
import org.apache.rocketmq.common.BrokerConfig;
@@ -50,7 +51,6 @@ public class BrokerPreOnlineTest {
public void init() throws Exception {
when(brokerContainer.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
- when(brokerContainer.getBrokerContainerConfig()).thenReturn(new BrokerContainerConfig());
BrokerMemberGroup brokerMemberGroup1 = new BrokerMemberGroup();
Map<Long, String> brokerAddrMap = new HashMap<>();
@@ -81,7 +81,7 @@ public class BrokerPreOnlineTest {
innerBrokerController.setTransactionalMessageCheckService(new TransactionalMessageCheckService(innerBrokerController));
- Field field = InnerBrokerController.class.getDeclaredField("isIsolated");
+ Field field = BrokerController.class.getDeclaredField("isIsolated");
field.setAccessible(true);
field.set(innerBrokerController, true);
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java
index fab4c46b8..4a2e06642 100644
--- a/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java
@@ -314,7 +314,6 @@ public class ContainerIntegrationTestBase {
NettyServerConfig nettyServerConfig = new NettyServerConfig();
NettyClientConfig nettyClientConfig = new NettyClientConfig();
brokerContainerConfig.setNamesrvAddr(nsAddr);
- brokerContainerConfig.setCompatibleWithOldNameSrv(false);
nettyServerConfig.setListenPort(generatePort(20000, 10000));
BrokerContainer brokerContainer = new BrokerContainer(brokerContainerConfig, nettyServerConfig, nettyClientConfig);
@@ -356,6 +355,7 @@ public class ContainerIntegrationTestBase {
brokerConfig.setLockInStrictMode(true);
brokerConfig.setConsumerOffsetUpdateVersionStep(10);
brokerConfig.setDelayOffsetUpdateVersionStep(10);
+ brokerConfig.setCompatibleWithOldNameSrv(false);
brokerConfig.setListenPort(generatePort(brokerContainer.getRemotingServer().localListenPort(), 10000));
String baseDir = createBaseDir(brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId()).getAbsolutePath();
@@ -426,7 +426,7 @@ public class ContainerIntegrationTestBase {
slaveBrokerConfig.setBrokerName(master.getBrokerConfig().getBrokerName());
slaveBrokerConfig.setBrokerId(slaveBrokerId);
slaveBrokerConfig.setBrokerClusterName(master.getBrokerConfig().getBrokerClusterName());
-
+ slaveBrokerConfig.setCompatibleWithOldNameSrv(false);
slaveBrokerConfig.setBrokerIP1("127.0.0.1");
slaveBrokerConfig.setBrokerIP2("127.0.0.1");
slaveBrokerConfig.setEnablePropertyFilter(true);
@@ -564,9 +564,9 @@ public class ContainerIntegrationTestBase {
}
protected static void changeCompatibleMode(boolean compatibleMode) {
- brokerContainer1.getBrokerContainerConfig().setCompatibleWithOldNameSrv(compatibleMode);
- brokerContainer2.getBrokerContainerConfig().setCompatibleWithOldNameSrv(compatibleMode);
- brokerContainer3.getBrokerContainerConfig().setCompatibleWithOldNameSrv(compatibleMode);
+ brokerContainer1.getBrokerControllers().forEach(brokerController -> brokerController.getBrokerConfig().setCompatibleWithOldNameSrv(compatibleMode));
+ brokerContainer2.getBrokerControllers().forEach(brokerController -> brokerController.getBrokerConfig().setCompatibleWithOldNameSrv(compatibleMode));
+ brokerContainer3.getBrokerControllers().forEach(brokerController -> brokerController.getBrokerConfig().setCompatibleWithOldNameSrv(compatibleMode));
}
protected static Set<MessageQueue> filterMessageQueue(Set<MessageQueue> mqSet, String topic) {