You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hz...@apache.org on 2023/01/02 12:37:01 UTC
[rocketmq] 01/08: Solve merge conflicts
This is an automated email from the ASF dual-hosted git repository.
hzh0425 pushed a commit to branch dledger-controller-snapshot
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit f9a5690b043d1de4713bcb2310f29e6012ada28f
Author: hzh0425 <64...@qq.com>
AuthorDate: Sun Jan 1 14:36:20 2023 +0800
Solve merge conflicts
---
.../impl/manager/ReplicasInfoManager.java | 26 +++++++++--------
.../impl/manager/ReplicasInfoManagerTest.java | 8 ++++--
.../store/ha/autoswitch/AutoSwitchHATest.java | 33 +++++++++++-----------
3 files changed, 36 insertions(+), 31 deletions(-)
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 591d22c16..32b6cb763 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -16,15 +16,6 @@
*/
package org.apache.rocketmq.controller.impl.manager;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.BiPredicate;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.MixAll;
@@ -54,6 +45,17 @@ import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRes
import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerResponseHeader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
/**
* The manager that manages the replicas info for all brokers. We can think of this class as the controller's memory
* state machine It should be noted that this class is not thread safe, and the upper layer needs to ensure that it can
@@ -240,7 +242,7 @@ public class ReplicasInfoManager implements SnapshotAbleMetadataManager {
final ApplyBrokerIdEvent applyIdEvent = new ApplyBrokerIdEvent(brokerName, brokerAddress, brokerId);
result.addEvent(applyIdEvent);
} else {
- brokerId = brokerInfo.getBrokerId(brokerAddress);
+ brokerId = brokerInfo.getBrokerIdByAddress(brokerAddress);
}
response.setBrokerId(brokerId);
response.setMasterEpoch(syncStateInfo.getMasterEpoch());
@@ -315,7 +317,7 @@ public class ReplicasInfoManager implements SnapshotAbleMetadataManager {
response.setMasterAddress(masterAddress);
response.setMasterEpoch(syncStateInfo.getMasterEpoch());
if (StringUtils.isNotEmpty(request.getBrokerAddress())) {
- response.setBrokerId(brokerInfo.getBrokerId(request.getBrokerAddress()));
+ response.setBrokerId(brokerInfo.getBrokerIdByAddress(request.getBrokerAddress()));
}
result.setBody(new SyncStateSet(syncStateInfo.getSyncStateSet(), syncStateInfo.getSyncStateSetEpoch()).encode());
return result;
@@ -336,7 +338,7 @@ public class ReplicasInfoManager implements SnapshotAbleMetadataManager {
final String master = syncStateInfo.getMasterAddress();
final ArrayList<InSyncStateData.InSyncMember> inSyncMembers = new ArrayList<>();
syncStateSet.forEach(replicas -> {
- long brokerId = StringUtils.equals(master, replicas) ? MixAll.MASTER_ID : brokerInfo.getBrokerId(replicas);
+ long brokerId = StringUtils.equals(master, replicas) ? MixAll.MASTER_ID : brokerInfo.getBrokerIdByAddress(replicas);
inSyncMembers.add(new InSyncStateData.InSyncMember(replicas, brokerId));
});
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index 9eabedb9d..b46619536 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -16,9 +16,7 @@
*/
package org.apache.rocketmq.controller.impl.controller.impl.manager;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.controller.elect.ElectPolicy;
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
@@ -43,6 +41,10 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 93f35630d..524d2aa72 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -17,21 +17,9 @@
package org.apache.rocketmq.store.ha.autoswitch;
-import java.io.File;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
@@ -46,10 +34,23 @@ import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import org.apache.rocketmq.common.MixAll;
import org.junit.After;
-import org.junit.Test;
import org.junit.Assume;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
@@ -193,7 +194,7 @@ public class AutoSwitchHATest {
}
private void checkMessage(final DefaultMessageStore messageStore, int totalNums, int startOffset) {
- await().atMost(30, TimeUnit.SECONDS)
+ await().atMost(60, TimeUnit.SECONDS)
.until(() -> {
GetMessageResult result = messageStore.getMessage("GROUP_A", "FooBar", 0, startOffset, 1024, null);
// System.out.printf(result + "%n");