You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2022/07/05 02:33:33 UTC
[pulsar] branch master updated: fix the local police IsolationGroups cover the defaultIsolationGroups cause can not rollback to use defaultIsolationGroups (#16273)
This is an automated email from the ASF dual-hosted git repository.
zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 95abf3afd0c fix the local police IsolationGroups cover the defaultIsolationGroups cause can not rollback to use defaultIsolationGroups (#16273)
95abf3afd0c is described below
commit 95abf3afd0cb3fdb368f9f9fae6dcff0b95983ce
Author: lixinyang <84...@users.noreply.github.com>
AuthorDate: Tue Jul 5 10:33:25 2022 +0800
fix the local police IsolationGroups cover the defaultIsolationGroups cause can not rollback to use defaultIsolationGroups (#16273)
Co-authored-by: nicklixinyang <ni...@didiglobal.com>
---
.../IsolatedBookieEnsemblePlacementPolicy.java | 46 +++++------------
.../IsolatedBookieEnsemblePlacementPolicyTest.java | 60 +++++++++++++++++++++-
2 files changed, 72 insertions(+), 34 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index b1c08801cae..b8c288535bc 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -121,32 +121,7 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac
public PlacementResult<List<BookieId>> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
Map<String, byte[]> customMetadata, Set<BookieId> excludeBookies)
throws BKNotEnoughBookiesException {
- if (customMetadata.containsKey(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG)) {
- try {
- EnsemblePlacementPolicyConfig policy = EnsemblePlacementPolicyConfig
- .decode(customMetadata.get(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG));
- Map<String, Object> policyProperties = policy.getProperties();
- String isolationBookieGroups =
- (String) policyProperties.get(ISOLATION_BOOKIE_GROUPS);
- String secondaryIsolationBookieGroups =
- (String) policyProperties.get(SECONDARY_ISOLATION_BOOKIE_GROUPS);
- Set<String> primaryIsolationGroups = new HashSet<>();
- Set<String> secondaryIsolationGroups = new HashSet<>();
- if (isolationBookieGroups != null) {
- primaryIsolationGroups.addAll(Arrays.asList(isolationBookieGroups.split(",")));
- }
- if (secondaryIsolationBookieGroups != null) {
- secondaryIsolationGroups.addAll(Arrays.asList(secondaryIsolationBookieGroups.split(",")));
- }
- defaultIsolationGroups = ImmutablePair.of(primaryIsolationGroups, secondaryIsolationGroups);
- } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
- log.error("Failed to decode EnsemblePlacementPolicyConfig from customeMetadata when choosing ensemble, "
- + "Will use defaultIsolationGroups instead");
- }
- }
-
- Set<BookieId> blacklistedBookies = getBlacklistedBookiesWithIsolationGroups(
- ensembleSize, defaultIsolationGroups);
+ Set<BookieId> blacklistedBookies = getBlacklistedBookies(ensembleSize, customMetadata);
if (excludeBookies == null) {
excludeBookies = new HashSet<BookieId>();
}
@@ -159,10 +134,20 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac
Map<String, byte[]> customMetadata, List<BookieId> currentEnsemble,
BookieId bookieToReplace, Set<BookieId> excludeBookies)
throws BKNotEnoughBookiesException {
+ Set<BookieId> blacklistedBookies = getBlacklistedBookies(ensembleSize, customMetadata);
+ if (excludeBookies == null) {
+ excludeBookies = new HashSet<BookieId>();
+ }
+ excludeBookies.addAll(blacklistedBookies);
+ return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble,
+ bookieToReplace, excludeBookies);
+ }
+
+ private Set<BookieId> getBlacklistedBookies(int ensembleSize, Map<String, byte[]> customMetadata){
// parse the ensemble placement policy from the custom metadata, if it is present, we will apply it to
// the isolation groups for filtering the bookies.
Optional<EnsemblePlacementPolicyConfig> ensemblePlacementPolicyConfig =
- getEnsemblePlacementPolicyConfig(customMetadata);
+ getEnsemblePlacementPolicyConfig(customMetadata);
Set<BookieId> blacklistedBookies;
if (ensemblePlacementPolicyConfig.isPresent()) {
EnsemblePlacementPolicyConfig config = ensemblePlacementPolicyConfig.get();
@@ -171,12 +156,7 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac
} else {
blacklistedBookies = getBlacklistedBookiesWithIsolationGroups(ensembleSize, defaultIsolationGroups);
}
- if (excludeBookies == null) {
- excludeBookies = new HashSet<BookieId>();
- }
- excludeBookies.addAll(blacklistedBookies);
- return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble,
- bookieToReplace, excludeBookies);
+ return blacklistedBookies;
}
private static Optional<EnsemblePlacementPolicyConfig> getEnsemblePlacementPolicyConfig(
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
index d0f9b410904..3ae5fd2b0ea 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
@@ -327,7 +327,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
placementPolicyProperties2
);
Map<String, byte[]> customMetadata2 = new HashMap<>();
- customMetadata2.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, policyConfig.encode());
+ customMetadata2.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, policyConfig2.encode());
BookieId replaceBookie2 = isolationPolicy.replaceBookie(3, 3, 3, customMetadata2,
Arrays.asList(bookie1Id,bookie2Id,bookie3Id), bookie3Id, null).getResult();
assertEquals(replaceBookie2, bookie4Id);
@@ -526,4 +526,62 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
Arrays.asList(bookie1Id, bookie3Id), bookie3Id, null).getResult();
assertEquals(bookieId, bookie2Id);
}
+
+ @Test
+ public void testDefaultIsolationPolicyNotCovered() throws Exception {
+ Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
+ final String defaultIsolatedGroup = "Group1";
+ final String defaultSecondaryIsolatedGroup = "Group2";
+ final String customIsolatedGroup = "Group2";
+
+ Map<String, BookieInfo> Group1 = new HashMap<>();
+ Group1.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
+ Group1.put(BOOKIE2, BookieInfo.builder().rack("rack0").build());
+
+ Map<String, BookieInfo> Group2 = new HashMap<>();
+ Group2.put(BOOKIE3, BookieInfo.builder().rack("rack1").build());
+ Group2.put(BOOKIE4, BookieInfo.builder().rack("rack1").build());
+
+ Set<BookieId> BookieIdGroup1 = new HashSet<>();
+ BookieIdGroup1.add(new BookieSocketAddress(BOOKIE1).toBookieId());
+ BookieIdGroup1.add(new BookieSocketAddress(BOOKIE2).toBookieId());
+ Set<BookieId> BookieIdGroup2 = new HashSet<>();
+ BookieIdGroup2.add(new BookieSocketAddress(BOOKIE3).toBookieId());
+ BookieIdGroup2.add(new BookieSocketAddress(BOOKIE4).toBookieId());
+
+ bookieMapping.put(defaultIsolatedGroup, Group1);
+ bookieMapping.put(defaultSecondaryIsolatedGroup, Group2);
+
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
+ Optional.empty()).join();
+
+ IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new IsolatedBookieEnsemblePlacementPolicy();
+ ClientConfiguration bkClientConf = new ClientConfiguration();
+ bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
+ bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, defaultIsolatedGroup);
+ bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, defaultSecondaryIsolatedGroup);
+ isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL,
+ NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+ isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
+
+ Map<String, Object> placementPolicyProperties = new HashMap<>();
+ placementPolicyProperties.put(
+ IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, customIsolatedGroup);
+ placementPolicyProperties.put(
+ IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, "");
+
+ EnsemblePlacementPolicyConfig policyConfig = new EnsemblePlacementPolicyConfig(
+ IsolatedBookieEnsemblePlacementPolicy.class,
+ placementPolicyProperties
+ );
+ Map<String, byte[]> customMetadata = new HashMap<>();
+ customMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, policyConfig.encode());
+
+ List<BookieId> customBookieList = isolationPolicy
+ .newEnsemble(2, 2, 2, customMetadata, new HashSet<>()).getResult();
+ assertEquals(BookieIdGroup2.containsAll(customBookieList),true);
+ List<BookieId> defaultBookieList = isolationPolicy
+ .newEnsemble(2, 2, 2, Collections.emptyMap(), new HashSet<>()).getResult();
+ assertEquals(BookieIdGroup1.containsAll(defaultBookieList),true);
+ }
}