You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/18 14:52:53 UTC

[pulsar] branch master updated: [pulsar-broker] add support of secondary bookie-isolation-group (#4261)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 91d4495  [pulsar-broker] add support of secondary bookie-isolation-group (#4261)
91d4495 is described below

commit 91d4495e5f7b5765286e9150639f995f133f0803
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sat May 18 07:52:47 2019 -0700

    [pulsar-broker] add support of secondary bookie-isolation-group (#4261)
    
    * [pulsar-broker] add support of secondary bookie-isolation-group
    
    * fix docs
    
    * add negative test
    
    * Fix tests
---
 conf/broker.conf                                   |  8 ++
 conf/standalone.conf                               |  8 ++
 .../apache/pulsar/broker/ServiceConfiguration.java | 12 +++
 .../pulsar/broker/BookKeeperClientFactoryImpl.java |  4 +
 .../ZkIsolatedBookieEnsemblePlacementPolicy.java   | 42 ++++++++--
 ...kIsolatedBookieEnsemblePlacementPolicyTest.java | 98 ++++++++++++++++++++++
 site2/docs/reference-configuration.md              |  2 +
 7 files changed, 167 insertions(+), 7 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 4ae8522..37b1281 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -406,6 +406,14 @@ bookkeeperClientReorderReadSequenceEnabled=false
 # outside the specified groups will not be used by the broker
 bookkeeperClientIsolationGroups=
 
+# Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't 
+# have enough bookie available.
+bookkeeperClientSecondaryIsolationGroups=
+
+# Minimum bookies that should be available as part of bookkeeperClientIsolationGroups
+# else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list.
+bookkeeperClientMinAvailableBookiesInIsolationGroups=
+
 # Enable/disable having read operations for a ledger to be sticky to a single bookie.
 # If this flag is enabled, the client will use one single bookie (by preference) to read
 # all entries for a ledger.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index a6a2621..7e8f692 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -272,6 +272,14 @@ bookkeeperClientReorderReadSequenceEnabled=false
 # outside the specified groups will not be used by the broker
 bookkeeperClientIsolationGroups=
 
+# Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't 
+# have enough bookie available.
+bookkeeperClientSecondaryIsolationGroups=
+
+# Minimum bookies that should be available as part of bookkeeperClientIsolationGroups
+# else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list.
+bookkeeperClientMinAvailableBookiesInIsolationGroups=
+
 ### --- Managed Ledger --- ###
 
 # Number of bookies to use when creating a ledger
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5209e31..e594208 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -711,6 +711,18 @@ public class ServiceConfiguration implements PulsarConfiguration {
         doc = "Enable bookie isolation by specifying a list of bookie groups to choose from. \n\n"
             + "Any bookie outside the specified groups will not be used by the broker")
     private String bookkeeperClientIsolationGroups;
+    @FieldContext(
+            category = CATEGORY_STORAGE_BK,
+            required = false,
+            doc = "Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't have enough bookie available."
+                )
+    private String bookkeeperClientSecondaryIsolationGroups;
+    @FieldContext(
+            category = CATEGORY_STORAGE_BK,
+            required = false,
+            doc = "Minimum bookies that should be available as part of bookkeeperClientIsolationGroups \n\n"
+                + "else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list.")
+    private int bookkeeperClientMinAvailableBookiesInIsolationGroups = 0;
     @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Enable/disable having read operations for a ledger to be sticky to "
             + "a single bookie.\n" +
             "If this flag is enabled, the client will use one single bookie (by " +
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 65207e6..2cf5fda 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -93,6 +93,10 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
             bkConf.setEnsemblePlacementPolicy(ZkIsolatedBookieEnsemblePlacementPolicy.class);
             bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
                     conf.getBookkeeperClientIsolationGroups());
+            bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+                    conf.getBookkeeperClientSecondaryIsolationGroups());
+            bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE,
+                    conf.getBookkeeperClientMinAvailableBookiesInIsolationGroups());
             if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
                 ZooKeeperCache zkc = new ZooKeeperCache(zkClient, conf.getZooKeeperOperationTimeoutSeconds()) {
                 };
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
index 5db20bb..a46dba0 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
@@ -54,10 +54,16 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
     private static final Logger LOG = LoggerFactory.getLogger(ZkIsolatedBookieEnsemblePlacementPolicy.class);
 
     public static final String ISOLATION_BOOKIE_GROUPS = "isolationBookieGroups";
+    // if policy doesn't find min-available bookies in primary-isolationBookieGroups then it uses bookies from
+    // secondaryIsolationBookieGroups
+    public static final String MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE = "minAvailablePrimaryIsolatedBookies";
+    public static final String SECONDARY_ISOLATION_BOOKIE_GROUPS = "secondaryIsolationBookieGroups";
 
     private ZooKeeperCache bookieMappingCache = null;
 
-    private final List<String> isolationGroups = new ArrayList<String>();
+    private final List<String> primaryIsolationGroups = new ArrayList<String>();
+    private final List<String> secondaryIsolationGroups = new ArrayList<String>();
+    private int minAvailablePrimaryIsolatedBookies = 0;
     private final ObjectMapper jsonMapper = ObjectMapperFactory.create();
 
     public ZkIsolatedBookieEnsemblePlacementPolicy() {
@@ -72,12 +78,22 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
             String isolationGroupsString = (String) conf.getProperty(ISOLATION_BOOKIE_GROUPS);
             if (!isolationGroupsString.isEmpty()) {
                 for (String isolationGroup : isolationGroupsString.split(",")) {
-                    isolationGroups.add(isolationGroup);
+                    primaryIsolationGroups.add(isolationGroup);
                 }
                 bookieMappingCache = getAndSetZkCache(conf);
             }
         }
-
+        if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) {
+            String secondaryIsolationGroupsString = (String) conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS);
+            if (!secondaryIsolationGroupsString.isEmpty()) {
+                for (String isolationGroup : secondaryIsolationGroupsString.split(",")) {
+                    secondaryIsolationGroups.add(isolationGroup);
+                }
+            }
+        }
+        minAvailablePrimaryIsolatedBookies = conf.getProperty(MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE) != null
+                ? (int) conf.getProperty(MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE)
+                : 0;
         return super.initialize(conf, optionalDnsResolver, timer, featureProvider, statsLogger);
     }
 
@@ -141,8 +157,9 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
                         .getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, this)
                         .orElseThrow(() -> new KeeperException.NoNodeException(
                                 ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH));
-                for (String group : allGroupsBookieMapping.keySet()) {
-                    if (!isolationGroups.contains(group)) {
+                Set<String> allBookies = allGroupsBookieMapping.keySet();
+                for (String group : allBookies) {
+                    if (!primaryIsolationGroups.contains(group)) {
                         for (String bookieAddress : allGroupsBookieMapping.get(group).keySet()) {
                             blacklistedBookies.add(new BookieSocketAddress(bookieAddress));
                         }
@@ -150,9 +167,9 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
                 }
                 // sometime while doing isolation, user might not want to remove isolated bookies from other default
                 // groups. so, same set of bookies could be overlapped into isolated-group and other default groups. so,
-                // try to remove those overlapped bookies from excluded-bookie list because ther are also part of
+                // try to remove those overlapped bookies from excluded-bookie list because they are also part of
                 // isolated-group bookies.
-                for (String group : isolationGroups) {
+                for (String group : primaryIsolationGroups) {
                     Map<String, BookieInfo> bookieGroup = allGroupsBookieMapping.get(group);
                     if (bookieGroup != null && !bookieGroup.isEmpty()) {
                         for (String bookieAddress : bookieGroup.keySet()) {
@@ -160,6 +177,17 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
                         }
                     }
                 }
+                // if primary-isolated-bookies are not enough then add consider secondary isolated bookie group as well.
+                if ((allBookies.size() - blacklistedBookies.size()) < minAvailablePrimaryIsolatedBookies) {
+                    for (String group : secondaryIsolationGroups) {
+                        Map<String, BookieInfo> bookieGroup = allGroupsBookieMapping.get(group);
+                        if (bookieGroup != null && !bookieGroup.isEmpty()) {
+                            for (String bookieAddress : bookieGroup.keySet()) {
+                                blacklistedBookies.remove(new BookieSocketAddress(bookieAddress));
+                            }
+                        }
+                    }   
+                }
             }
         } catch (Exception e) {
             LOG.warn("Error getting bookie isolation info from zk: {}", e.getMessage());
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
index da0725d..8c2576b 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
+import org.junit.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -353,4 +354,101 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
 
         localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
     }
+    
+    @Test
+    public void testSecondaryIsolationGroupsBookies() throws Exception {
+        Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
+        Map<String, BookieInfo> defaultBookieGroup = new HashMap<>();
+        final String isolatedGroup = "primaryGroup";
+        final String secondaryIsolatedGroup = "secondaryGroup";
+
+        defaultBookieGroup.put(BOOKIE1, new BookieInfo("rack0", null));
+        defaultBookieGroup.put(BOOKIE2, new BookieInfo("rack1", null));
+        defaultBookieGroup.put(BOOKIE3, new BookieInfo("rack1", null));
+        defaultBookieGroup.put(BOOKIE4, new BookieInfo("rack1", null));
+        defaultBookieGroup.put(BOOKIE5, new BookieInfo("rack1", null));
+
+        Map<String, BookieInfo> primaryIsolatedBookieGroup = new HashMap<>();
+        primaryIsolatedBookieGroup.put(BOOKIE1, new BookieInfo("rack1", null));
+        
+        Map<String, BookieInfo> secondaryIsolatedBookieGroup = new HashMap<>();
+        secondaryIsolatedBookieGroup.put(BOOKIE2, new BookieInfo("rack0", null));
+        secondaryIsolatedBookieGroup.put(BOOKIE4, new BookieInfo("rack0", null));
+
+        bookieMapping.put("default", defaultBookieGroup);
+        bookieMapping.put(isolatedGroup, primaryIsolatedBookieGroup);
+        bookieMapping.put(secondaryIsolatedGroup, secondaryIsolatedBookieGroup);
+
+        ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
+                jsonMapper.writeValueAsBytes(bookieMapping), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        Thread.sleep(100);
+
+        ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
+        ClientConfiguration bkClientConf = new ClientConfiguration();
+        bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
+        });
+        bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolatedGroup);
+        bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, secondaryIsolatedGroup);
+        bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE, 2);
+        isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL,
+                NullStatsLogger.INSTANCE);
+        isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
+
+        List<BookieSocketAddress> ensemble = isolationPolicy
+                .newEnsemble(3, 3, 2, Collections.emptyMap(), new HashSet<>()).getResult();
+        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
+        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
+        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4)));
+
+        localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
+    }
+    
+    @Test
+    public void testSecondaryIsolationGroupsBookiesNegative() throws Exception {
+
+        Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
+        Map<String, BookieInfo> defaultBookieGroup = new HashMap<>();
+        final String isolatedGroup = "primaryGroup";
+        final String secondaryIsolatedGroup = "secondaryGroup";
+
+        defaultBookieGroup.put(BOOKIE1, new BookieInfo("rack0", null));
+        defaultBookieGroup.put(BOOKIE2, new BookieInfo("rack1", null));
+        defaultBookieGroup.put(BOOKIE3, new BookieInfo("rack1", null));
+        defaultBookieGroup.put(BOOKIE4, new BookieInfo("rack1", null));
+        defaultBookieGroup.put(BOOKIE5, new BookieInfo("rack1", null));
+
+        Map<String, BookieInfo> primaryIsolatedBookieGroup = new HashMap<>();
+        primaryIsolatedBookieGroup.put(BOOKIE1, new BookieInfo("rack1", null));
+
+        bookieMapping.put("default", defaultBookieGroup);
+        bookieMapping.put(isolatedGroup, primaryIsolatedBookieGroup);
+
+        ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
+                jsonMapper.writeValueAsBytes(bookieMapping), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        Thread.sleep(100);
+
+        ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
+        ClientConfiguration bkClientConf = new ClientConfiguration();
+        bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
+        });
+        bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolatedGroup);
+        bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+                secondaryIsolatedGroup);
+        bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE, 2);
+        isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL,
+                NullStatsLogger.INSTANCE);
+        isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
+
+        try {
+            List<BookieSocketAddress> ensemble = isolationPolicy
+                    .newEnsemble(3, 3, 2, Collections.emptyMap(), new HashSet<>()).getResult();
+            Assert.fail("Should have thrown BKNotEnoughBookiesException");
+        } catch (BKNotEnoughBookiesException ne) {
+            // Ok..
+        }
+
+        localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
+    }
 }
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index d763e43..4f040f2 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -170,6 +170,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
 |bookkeeperClientRegionawarePolicyEnabled|  Enable region-aware bookie selection policy. BK will chose bookies from different regions and racks when forming a new bookie ensemble. If enabled, the value of bookkeeperClientRackawarePolicyEnabled is ignored  |false|
 |bookkeeperClientReorderReadSequenceEnabled|  Enable/disable reordering read sequence on reading entries.  |false|
 |bookkeeperClientIsolationGroups| Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie outside the specified groups will not be used by the broker  ||
+|bookkeeperClientSecondaryIsolationGroups| Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't have enough bookie available.  ||
+|bookkeeperClientMinAvailableBookiesInIsolationGroups| Minimum bookies that should be available as part of bookkeeperClientIsolationGroups else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list.  ||
 |bookkeeperEnableStickyReads | Enable/disable having read operations for a ledger to be sticky to a single bookie. If this flag is enabled, the client will use one single bookie (by preference) to read  all entries for a ledger. | true |
 |managedLedgerDefaultEnsembleSize|  Number of bookies to use when creating a ledger |2|
 |managedLedgerDefaultWriteQuorum| Number of copies to store for each message  |2|