You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/18 14:52:53 UTC
[pulsar] branch master updated: [pulsar-broker] add support of
secondary bookie-isolation-group (#4261)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 91d4495 [pulsar-broker] add support of secondary bookie-isolation-group (#4261)
91d4495 is described below
commit 91d4495e5f7b5765286e9150639f995f133f0803
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sat May 18 07:52:47 2019 -0700
[pulsar-broker] add support of secondary bookie-isolation-group (#4261)
* [pulsar-broker] add support of secondary bookie-isolation-group
* fix docs
* add negative test
* Fix tests
---
conf/broker.conf | 8 ++
conf/standalone.conf | 8 ++
.../apache/pulsar/broker/ServiceConfiguration.java | 12 +++
.../pulsar/broker/BookKeeperClientFactoryImpl.java | 4 +
.../ZkIsolatedBookieEnsemblePlacementPolicy.java | 42 ++++++++--
...kIsolatedBookieEnsemblePlacementPolicyTest.java | 98 ++++++++++++++++++++++
site2/docs/reference-configuration.md | 2 +
7 files changed, 167 insertions(+), 7 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 4ae8522..37b1281 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -406,6 +406,14 @@ bookkeeperClientReorderReadSequenceEnabled=false
# outside the specified groups will not be used by the broker
bookkeeperClientIsolationGroups=
+# Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't
+# have enough bookie available.
+bookkeeperClientSecondaryIsolationGroups=
+
+# Minimum bookies that should be available as part of bookkeeperClientIsolationGroups
+# else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list.
+bookkeeperClientMinAvailableBookiesInIsolationGroups=
+
# Enable/disable having read operations for a ledger to be sticky to a single bookie.
# If this flag is enabled, the client will use one single bookie (by preference) to read
# all entries for a ledger.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index a6a2621..7e8f692 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -272,6 +272,14 @@ bookkeeperClientReorderReadSequenceEnabled=false
# outside the specified groups will not be used by the broker
bookkeeperClientIsolationGroups=
+# Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't
+# have enough bookie available.
+bookkeeperClientSecondaryIsolationGroups=
+
+# Minimum bookies that should be available as part of bookkeeperClientIsolationGroups
+# else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list.
+bookkeeperClientMinAvailableBookiesInIsolationGroups=
+
### --- Managed Ledger --- ###
# Number of bookies to use when creating a ledger
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5209e31..e594208 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -711,6 +711,18 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Enable bookie isolation by specifying a list of bookie groups to choose from. \n\n"
+ "Any bookie outside the specified groups will not be used by the broker")
private String bookkeeperClientIsolationGroups;
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ required = false,
+ doc = "Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't have enough bookie available."
+ )
+ private String bookkeeperClientSecondaryIsolationGroups;
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ required = false,
+ doc = "Minimum bookies that should be available as part of bookkeeperClientIsolationGroups \n\n"
+ + "else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list.")
+ private int bookkeeperClientMinAvailableBookiesInIsolationGroups = 0;
@FieldContext(category = CATEGORY_STORAGE_BK, doc = "Enable/disable having read operations for a ledger to be sticky to "
+ "a single bookie.\n" +
"If this flag is enabled, the client will use one single bookie (by " +
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 65207e6..2cf5fda 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -93,6 +93,10 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
bkConf.setEnsemblePlacementPolicy(ZkIsolatedBookieEnsemblePlacementPolicy.class);
bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
conf.getBookkeeperClientIsolationGroups());
+ bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+ conf.getBookkeeperClientSecondaryIsolationGroups());
+ bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE,
+ conf.getBookkeeperClientMinAvailableBookiesInIsolationGroups());
if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
ZooKeeperCache zkc = new ZooKeeperCache(zkClient, conf.getZooKeeperOperationTimeoutSeconds()) {
};
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
index 5db20bb..a46dba0 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
@@ -54,10 +54,16 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
private static final Logger LOG = LoggerFactory.getLogger(ZkIsolatedBookieEnsemblePlacementPolicy.class);
public static final String ISOLATION_BOOKIE_GROUPS = "isolationBookieGroups";
+ // if policy doesn't find min-available bookies in primary-isolationBookieGroups then it uses bookies from
+ // secondaryIsolationBookieGroups
+ public static final String MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE = "minAvailablePrimaryIsolatedBookies";
+ public static final String SECONDARY_ISOLATION_BOOKIE_GROUPS = "secondaryIsolationBookieGroups";
private ZooKeeperCache bookieMappingCache = null;
- private final List<String> isolationGroups = new ArrayList<String>();
+ private final List<String> primaryIsolationGroups = new ArrayList<String>();
+ private final List<String> secondaryIsolationGroups = new ArrayList<String>();
+ private int minAvailablePrimaryIsolatedBookies = 0;
private final ObjectMapper jsonMapper = ObjectMapperFactory.create();
public ZkIsolatedBookieEnsemblePlacementPolicy() {
@@ -72,12 +78,22 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
String isolationGroupsString = (String) conf.getProperty(ISOLATION_BOOKIE_GROUPS);
if (!isolationGroupsString.isEmpty()) {
for (String isolationGroup : isolationGroupsString.split(",")) {
- isolationGroups.add(isolationGroup);
+ primaryIsolationGroups.add(isolationGroup);
}
bookieMappingCache = getAndSetZkCache(conf);
}
}
-
+ if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) {
+ String secondaryIsolationGroupsString = (String) conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS);
+ if (!secondaryIsolationGroupsString.isEmpty()) {
+ for (String isolationGroup : secondaryIsolationGroupsString.split(",")) {
+ secondaryIsolationGroups.add(isolationGroup);
+ }
+ }
+ }
+ minAvailablePrimaryIsolatedBookies = conf.getProperty(MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE) != null
+ ? (int) conf.getProperty(MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE)
+ : 0;
return super.initialize(conf, optionalDnsResolver, timer, featureProvider, statsLogger);
}
@@ -141,8 +157,9 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
.getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, this)
.orElseThrow(() -> new KeeperException.NoNodeException(
ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH));
- for (String group : allGroupsBookieMapping.keySet()) {
- if (!isolationGroups.contains(group)) {
+ Set<String> allBookies = allGroupsBookieMapping.keySet();
+ for (String group : allBookies) {
+ if (!primaryIsolationGroups.contains(group)) {
for (String bookieAddress : allGroupsBookieMapping.get(group).keySet()) {
blacklistedBookies.add(new BookieSocketAddress(bookieAddress));
}
@@ -150,9 +167,9 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
}
// sometime while doing isolation, user might not want to remove isolated bookies from other default
// groups. so, same set of bookies could be overlapped into isolated-group and other default groups. so,
- // try to remove those overlapped bookies from excluded-bookie list because ther are also part of
+ // try to remove those overlapped bookies from excluded-bookie list because they are also part of
// isolated-group bookies.
- for (String group : isolationGroups) {
+ for (String group : primaryIsolationGroups) {
Map<String, BookieInfo> bookieGroup = allGroupsBookieMapping.get(group);
if (bookieGroup != null && !bookieGroup.isEmpty()) {
for (String bookieAddress : bookieGroup.keySet()) {
@@ -160,6 +177,17 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
}
}
}
+ // if primary-isolated-bookies are not enough then add consider secondary isolated bookie group as well.
+ if ((allBookies.size() - blacklistedBookies.size()) < minAvailablePrimaryIsolatedBookies) {
+ for (String group : secondaryIsolationGroups) {
+ Map<String, BookieInfo> bookieGroup = allGroupsBookieMapping.get(group);
+ if (bookieGroup != null && !bookieGroup.isEmpty()) {
+ for (String bookieAddress : bookieGroup.keySet()) {
+ blacklistedBookies.remove(new BookieSocketAddress(bookieAddress));
+ }
+ }
+ }
+ }
}
} catch (Exception e) {
LOG.warn("Error getting bookie isolation info from zk: {}", e.getMessage());
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
index da0725d..8c2576b 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
+import org.junit.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -353,4 +354,101 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
}
+
+ @Test
+ public void testSecondaryIsolationGroupsBookies() throws Exception {
+ Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
+ Map<String, BookieInfo> defaultBookieGroup = new HashMap<>();
+ final String isolatedGroup = "primaryGroup";
+ final String secondaryIsolatedGroup = "secondaryGroup";
+
+ defaultBookieGroup.put(BOOKIE1, new BookieInfo("rack0", null));
+ defaultBookieGroup.put(BOOKIE2, new BookieInfo("rack1", null));
+ defaultBookieGroup.put(BOOKIE3, new BookieInfo("rack1", null));
+ defaultBookieGroup.put(BOOKIE4, new BookieInfo("rack1", null));
+ defaultBookieGroup.put(BOOKIE5, new BookieInfo("rack1", null));
+
+ Map<String, BookieInfo> primaryIsolatedBookieGroup = new HashMap<>();
+ primaryIsolatedBookieGroup.put(BOOKIE1, new BookieInfo("rack1", null));
+
+ Map<String, BookieInfo> secondaryIsolatedBookieGroup = new HashMap<>();
+ secondaryIsolatedBookieGroup.put(BOOKIE2, new BookieInfo("rack0", null));
+ secondaryIsolatedBookieGroup.put(BOOKIE4, new BookieInfo("rack0", null));
+
+ bookieMapping.put("default", defaultBookieGroup);
+ bookieMapping.put(isolatedGroup, primaryIsolatedBookieGroup);
+ bookieMapping.put(secondaryIsolatedGroup, secondaryIsolatedBookieGroup);
+
+ ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
+ jsonMapper.writeValueAsBytes(bookieMapping), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ Thread.sleep(100);
+
+ ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
+ ClientConfiguration bkClientConf = new ClientConfiguration();
+ bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
+ });
+ bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolatedGroup);
+ bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, secondaryIsolatedGroup);
+ bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE, 2);
+ isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL,
+ NullStatsLogger.INSTANCE);
+ isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
+
+ List<BookieSocketAddress> ensemble = isolationPolicy
+ .newEnsemble(3, 3, 2, Collections.emptyMap(), new HashSet<>()).getResult();
+ assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
+ assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
+ assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4)));
+
+ localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
+ }
+
+ @Test
+ public void testSecondaryIsolationGroupsBookiesNegative() throws Exception {
+
+ Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
+ Map<String, BookieInfo> defaultBookieGroup = new HashMap<>();
+ final String isolatedGroup = "primaryGroup";
+ final String secondaryIsolatedGroup = "secondaryGroup";
+
+ defaultBookieGroup.put(BOOKIE1, new BookieInfo("rack0", null));
+ defaultBookieGroup.put(BOOKIE2, new BookieInfo("rack1", null));
+ defaultBookieGroup.put(BOOKIE3, new BookieInfo("rack1", null));
+ defaultBookieGroup.put(BOOKIE4, new BookieInfo("rack1", null));
+ defaultBookieGroup.put(BOOKIE5, new BookieInfo("rack1", null));
+
+ Map<String, BookieInfo> primaryIsolatedBookieGroup = new HashMap<>();
+ primaryIsolatedBookieGroup.put(BOOKIE1, new BookieInfo("rack1", null));
+
+ bookieMapping.put("default", defaultBookieGroup);
+ bookieMapping.put(isolatedGroup, primaryIsolatedBookieGroup);
+
+ ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
+ jsonMapper.writeValueAsBytes(bookieMapping), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ Thread.sleep(100);
+
+ ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
+ ClientConfiguration bkClientConf = new ClientConfiguration();
+ bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
+ });
+ bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolatedGroup);
+ bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+ secondaryIsolatedGroup);
+ bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE, 2);
+ isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL,
+ NullStatsLogger.INSTANCE);
+ isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
+
+ try {
+ List<BookieSocketAddress> ensemble = isolationPolicy
+ .newEnsemble(3, 3, 2, Collections.emptyMap(), new HashSet<>()).getResult();
+ Assert.fail("Should have thrown BKNotEnoughBookiesException");
+ } catch (BKNotEnoughBookiesException ne) {
+ // Ok..
+ }
+
+ localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
+ }
}
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index d763e43..4f040f2 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -170,6 +170,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|bookkeeperClientRegionawarePolicyEnabled| Enable region-aware bookie selection policy. BK will chose bookies from different regions and racks when forming a new bookie ensemble. If enabled, the value of bookkeeperClientRackawarePolicyEnabled is ignored |false|
|bookkeeperClientReorderReadSequenceEnabled| Enable/disable reordering read sequence on reading entries. |false|
|bookkeeperClientIsolationGroups| Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie outside the specified groups will not be used by the broker ||
+|bookkeeperClientSecondaryIsolationGroups| Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't have enough bookie available. ||
+|bookkeeperClientMinAvailableBookiesInIsolationGroups| Minimum bookies that should be available as part of bookkeeperClientIsolationGroups else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list. ||
|bookkeeperEnableStickyReads | Enable/disable having read operations for a ledger to be sticky to a single bookie. If this flag is enabled, the client will use one single bookie (by preference) to read all entries for a ledger. | true |
|managedLedgerDefaultEnsembleSize| Number of bookies to use when creating a ledger |2|
|managedLedgerDefaultWriteQuorum| Number of copies to store for each message |2|