You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2019/06/01 06:35:01 UTC

[pulsar] branch master updated: [pulsar-broker] tenant based bookie isolation (#3933)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ac44fe  [pulsar-broker] tenant based bookie isolation (#3933)
5ac44fe is described below

commit 5ac44fe06852aa6dc445725efd5abffd81aa6d75
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Fri May 31 23:34:54 2019 -0700

    [pulsar-broker] tenant based bookie isolation (#3933)
    
    fix tests
    
    add admin cli
    
    fix test
---
 .../bookkeeper/mledger/ManagedLedgerConfig.java    |  45 ++++
 .../mledger/impl/ManagedLedgerFactoryImpl.java     | 101 ++++++-
 .../pulsar/broker/BookKeeperClientFactory.java     |   8 +-
 .../pulsar/broker/BookKeeperClientFactoryImpl.java |  63 ++++-
 .../pulsar/broker/ManagedLedgerClientFactory.java  |  59 +++-
 .../org/apache/pulsar/broker/PulsarService.java    |   4 +
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  92 +++++++
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  24 ++
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  24 ++
 .../pulsar/broker/service/BrokerService.java       |  16 ++
 .../service/schema/BookkeeperSchemaStorage.java    |   4 +-
 .../apache/pulsar/compaction/CompactorTool.java    |   3 +-
 .../broker/MockedBookKeeperClientFactory.java      |   7 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   7 +-
 .../broker/service/BrokerBookieIsolationTest.java  | 300 +++++++++++++++++++++
 .../pulsar/compaction/CompactedTopicTest.java      |   5 +-
 .../apache/pulsar/compaction/CompactionTest.java   |   3 +-
 .../apache/pulsar/compaction/CompactorTest.java    |   5 +-
 .../org/apache/pulsar/client/admin/Namespaces.java |   4 +
 .../client/admin/internal/NamespacesImpl.java      |  22 ++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |   6 +
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  32 +++
 .../pulsar/common/policies/data/LocalPolicies.java |   6 +-
 .../zookeeper/ZkBookieRackAffinityMapping.java     |   2 +
 24 files changed, 797 insertions(+), 45 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 0e77aff..4af66eb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -24,7 +24,10 @@ import com.google.common.annotations.Beta;
 import com.google.common.base.Charsets;
 import java.time.Clock;
 import java.util.Arrays;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.api.DigestType;
 
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
@@ -61,6 +64,8 @@ public class ManagedLedgerConfig {
     private DigestType digestType = DigestType.CRC32C;
     private byte[] password = "".getBytes(Charsets.UTF_8);
     private boolean unackedRangesOpenCacheSetEnabled = true;
+    private Class<? extends EnsemblePlacementPolicy>  bookKeeperEnsemblePlacementPolicyClassName;
+    private Map<String, Object> bookKeeperEnsemblePlacementPolicyProperties;
     private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
     private Clock clock = Clock.systemUTC();
 
@@ -581,4 +586,44 @@ public class ManagedLedgerConfig {
         this.addEntryTimeoutSeconds = addEntryTimeoutSeconds;
         return this;
     }
+
+    /**
+     * Managed-ledger can setup different custom EnsemblePlacementPolicy (eg: affinity to write ledgers to only setup of
+     * group of bookies).
+     * 
+     * @return
+     */
+    public Class<? extends EnsemblePlacementPolicy> getBookKeeperEnsemblePlacementPolicyClassName() {
+        return bookKeeperEnsemblePlacementPolicyClassName;
+    }
+
+    /**
+     * Returns EnsemblePlacementPolicy configured for the Managed-ledger.
+     * 
+     * @param bookKeeperEnsemblePlacementPolicy
+     */
+    public void setBookKeeperEnsemblePlacementPolicyClassName(
+            Class<? extends EnsemblePlacementPolicy> bookKeeperEnsemblePlacementPolicyClassName) {
+        this.bookKeeperEnsemblePlacementPolicyClassName = bookKeeperEnsemblePlacementPolicyClassName;
+    }
+
+    /**
+     * Returns properties required by configured bookKeeperEnsemblePlacementPolicy.
+     * 
+     * @return
+     */
+    public Map<String, Object> getBookKeeperEnsemblePlacementPolicyProperties() {
+        return bookKeeperEnsemblePlacementPolicyProperties;
+    }
+
+    /**
+     * Managed-ledger can setup different custom EnsemblePlacementPolicy which needs
+     * bookKeeperEnsemblePlacementPolicy-Properties.
+     * 
+     * @param bookKeeperEnsemblePlacementPolicyProperties
+     */
+    public void setBookKeeperEnsemblePlacementPolicyProperties(
+            Map<String, Object> bookKeeperEnsemblePlacementPolicyProperties) {
+        this.bookKeeperEnsemblePlacementPolicyProperties = bookKeeperEnsemblePlacementPolicyProperties;
+    }
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 7399233..95ee6b4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
 
+import com.google.common.base.Objects;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Maps;
 
@@ -41,6 +42,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -79,7 +81,7 @@ import org.slf4j.LoggerFactory;
 
 public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
     private final MetaStore store;
-    private final BookKeeper bookKeeper;
+    private final BookkeeperFactoryForCustomEnsemblePlacementPolicy bookkeeperFactory;
     private final boolean isBookkeeperManaged;
     private final ZooKeeper zookeeper;
     private final ManagedLedgerFactoryConfig config;
@@ -116,20 +118,30 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
     private ManagedLedgerFactoryImpl(ZooKeeper zkc, ClientConfiguration bkClientConfiguration,
             ManagedLedgerFactoryConfig config)
             throws Exception {
-        this(new BookKeeper(bkClientConfiguration, zkc), true /* isBookkeeperManaged */, zkc,
-                config);
+        this((policyConfig) -> {
+            try {
+                return new BookKeeper(bkClientConfiguration, zkc);
+            } catch (Exception e) {
+                throw new IllegalStateException(e);
+            }
+        }, true /* isBookkeeperManaged */, zkc, config);
     }
 
     public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper) throws Exception {
-        this(bookKeeper, zooKeeper, new ManagedLedgerFactoryConfig());
+        this((policyConfig) -> bookKeeper, zooKeeper, new ManagedLedgerFactoryConfig());
     }
 
     public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper, ManagedLedgerFactoryConfig config)
             throws Exception {
-        this(bookKeeper, false /* isBookkeeperManaged */, zooKeeper, config);
+        this((policyConfig) -> bookKeeper, false /* isBookkeeperManaged */, zooKeeper, config);
     }
 
-    private ManagedLedgerFactoryImpl(BookKeeper bookKeeper, boolean isBookkeeperManaged, ZooKeeper zooKeeper,
+    public ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory, ZooKeeper zooKeeper, ManagedLedgerFactoryConfig config)
+            throws Exception {
+        this(bookKeeperGroupFactory, false /* isBookkeeperManaged */, zooKeeper, config);
+    }
+    
+    private ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory, boolean isBookkeeperManaged, ZooKeeper zooKeeper,
             ManagedLedgerFactoryConfig config) throws Exception {
         scheduledExecutor = OrderedScheduler.newSchedulerBuilder()
                 .numThreads(config.getNumManagedLedgerSchedulerThreads())
@@ -142,7 +154,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         cacheEvictionExecutor = Executors
                 .newSingleThreadExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction"));
 
-        this.bookKeeper = bookKeeper;
+        this.bookkeeperFactory = bookKeeperGroupFactory;
         this.isBookkeeperManaged = isBookkeeperManaged;
         this.zookeeper = isBookkeeperManaged ? zooKeeper : null;
         this.store = new MetaStoreImplZookeeper(zooKeeper, orderedExecutor);
@@ -283,7 +295,11 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         ledgers.computeIfAbsent(name, (mlName) -> {
             // Create the managed ledger
             CompletableFuture<ManagedLedgerImpl> future = new CompletableFuture<>();
-            final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this, bookKeeper, store, config, scheduledExecutor,
+            final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this,
+                    bookkeeperFactory.get(
+                            new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
+                                    config.getBookKeeperEnsemblePlacementPolicyProperties())),
+                    store, config, scheduledExecutor,
                     orderedExecutor, name);
             newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
                 @Override
@@ -344,8 +360,11 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
     public void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, ManagedLedgerConfig config,
             OpenReadOnlyCursorCallback callback, Object ctx) {
         checkArgument(startPosition instanceof PositionImpl);
-        ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this, bookKeeper, store, config,
-                scheduledExecutor, orderedExecutor, managedLedgerName);
+        ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
+                bookkeeperFactory
+                        .get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
+                                config.getBookKeeperEnsemblePlacementPolicyProperties())),
+                store, config, scheduledExecutor, orderedExecutor, managedLedgerName);
 
         roManagedLedger.initializeAndCreateCursor((PositionImpl) startPosition).thenAccept(roCursor -> {
             callback.openReadOnlyCursorComplete(roCursor, ctx);
@@ -409,7 +428,10 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
 
         if (isBookkeeperManaged) {
             try {
-                bookKeeper.close();
+                BookKeeper bkFactory = bookkeeperFactory.get();
+                if (bkFactory != null) {
+                    bkFactory.close();
+                }
             } catch (BKException e) {
                 throw new ManagedLedgerException(e);
             }
@@ -580,8 +602,63 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
     }
 
     public BookKeeper getBookKeeper() {
-        return bookKeeper;
+        return bookkeeperFactory.get();
     }
 
+    /**
+     * Factory to create Bookkeeper-client for a given ensemblePlacementPolicy
+     *
+     */
+    public static interface BookkeeperFactoryForCustomEnsemblePlacementPolicy {
+        default BookKeeper get() {
+            return get(null);
+        }
+        
+        /**
+         * Returns Bk-Client for a given ensemblePlacementPolicyMetadata. It returns default bK-client if
+         * ensemblePlacementPolicyMetadata is null.
+         * 
+         * @param ensemblePlacementPolicyMetadata
+         * @return
+         */
+        BookKeeper get(EnsemblePlacementPolicyConfig ensemblePlacementPolicyMetadata);
+    }
+    
+    public static class EnsemblePlacementPolicyConfig {
+        private final Class<? extends EnsemblePlacementPolicy> policyClass;
+        private final Map<String, Object> properties;
+        
+        public EnsemblePlacementPolicyConfig(Class<? extends EnsemblePlacementPolicy> policyClass,
+                Map<String, Object> properties) {
+            super();
+            this.policyClass = policyClass;
+            this.properties = properties;
+        }
+
+        public Class<? extends EnsemblePlacementPolicy> getPolicyClass() {
+            return policyClass;
+        }
+
+        public Map<String, Object> getProperties() {
+            return properties;
+        }
+        
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(policyClass != null ? policyClass.getName() : "", properties);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj instanceof EnsemblePlacementPolicyConfig) {
+                EnsemblePlacementPolicyConfig other = (EnsemblePlacementPolicyConfig) obj;
+                return Objects.equal(this.policyClass == null ? null : this.policyClass.getName(),
+                        other.policyClass == null ? null : other.policyClass.getName())
+                        && Objects.equal(this.properties, other.properties);
+            }
+            return false;
+        }
+    }
+    
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerFactoryImpl.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
index 9f91fcf..c0c43ab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
@@ -19,8 +19,11 @@
 package org.apache.pulsar.broker;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
 
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.zookeeper.ZooKeeper;
 
@@ -28,6 +31,9 @@ import org.apache.zookeeper.ZooKeeper;
  * Provider of a new BookKeeper client instance
  */
 public interface BookKeeperClientFactory {
-    BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException;
+    BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+            Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+            Map<String, Object> ensemblePlacementPolicyProperties) throws IOException;
+
     void close();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index cc1a8e5..2105af5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -19,31 +19,55 @@
 package org.apache.pulsar.broker;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
 import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 
+@SuppressWarnings("deprecation")
 public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
 
     private final AtomicReference<ZooKeeperCache> rackawarePolicyZkCache = new AtomicReference<>();
     private final AtomicReference<ZooKeeperCache> clientIsolationZkCache = new AtomicReference<>();
+    private final AtomicReference<ZooKeeperCache> zkCache = new AtomicReference<>();
 
-    @SuppressWarnings("deprecation")
     @Override
-    public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException {
-        ClientConfiguration bkConf = new ClientConfiguration();
+    public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+            Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, Map<String, Object> properties) throws IOException {
+        ClientConfiguration bkConf = createBkClientConfiguration(conf);
+        if (properties != null) {
+            properties.forEach((key, value) -> bkConf.setProperty(key, value));
+        }
+        if (ensemblePlacementPolicyClass.isPresent()) {
+            setEnsemblePlacementPolicy(bkConf, conf, zkClient, ensemblePlacementPolicyClass.get());
+        } else {
+            setDefaultEnsemblePlacementPolicy(bkConf, conf, zkClient);
+        }
+        try {
+            return BookKeeper.forConfig(bkConf)
+                    .allocator(PulsarByteBufAllocator.DEFAULT)
+                    .zk(zkClient)
+                    .build();
+        } catch (InterruptedException | BKException e) {
+            throw new IOException(e);
+        }
+    }
 
+    private ClientConfiguration createBkClientConfiguration(ServiceConfiguration conf) {
+        ClientConfiguration bkConf = new ClientConfiguration();
         if (conf.getBookkeeperClientAuthenticationPlugin() != null
                 && conf.getBookkeeperClientAuthenticationPlugin().trim().length() > 0) {
             bkConf.setClientAuthProviderFactoryClass(conf.getBookkeeperClientAuthenticationPlugin());
@@ -69,6 +93,13 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
                     TimeUnit.SECONDS);
         }
 
+        bkConf.setReorderReadSequenceEnabled(conf.isBookkeeperClientReorderReadSequenceEnabled());
+
+        return bkConf;
+    }
+
+    private void setDefaultEnsemblePlacementPolicy(ClientConfiguration bkConf, ServiceConfiguration conf,
+            ZooKeeper zkClient) {
         if (conf.isBookkeeperClientRackawarePolicyEnabled() || conf.isBookkeeperClientRegionawarePolicyEnabled()) {
             if (conf.isBookkeeperClientRegionawarePolicyEnabled()) {
                 bkConf.setEnsemblePlacementPolicy(RegionAwareEnsemblePlacementPolicy.class);
@@ -86,7 +117,6 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
 
             bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.rackawarePolicyZkCache.get());
         }
-        bkConf.setReorderReadSequenceEnabled(conf.isBookkeeperClientReorderReadSequenceEnabled());
 
         if (conf.getBookkeeperClientIsolationGroups() != null && !conf.getBookkeeperClientIsolationGroups().isEmpty()) {
             bkConf.setEnsemblePlacementPolicy(ZkIsolatedBookieEnsemblePlacementPolicy.class);
@@ -106,14 +136,18 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
                 bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.clientIsolationZkCache.get());
             }
         }
+    }
 
-        try {
-            return BookKeeper.forConfig(bkConf)
-                    .allocator(PulsarByteBufAllocator.DEFAULT)
-                    .zk(zkClient)
-                    .build();
-        } catch (InterruptedException | BKException e) {
-            throw new IOException(e);
+    private void setEnsemblePlacementPolicy(ClientConfiguration bkConf, ServiceConfiguration conf, ZooKeeper zkClient,
+            Class<? extends EnsemblePlacementPolicy> policyClass) {
+        bkConf.setEnsemblePlacementPolicy(policyClass);
+        if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
+            ZooKeeperCache zkc = new ZooKeeperCache(zkClient, conf.getZooKeeperOperationTimeoutSeconds()) {
+            };
+            if (!zkCache.compareAndSet(null, zkc)) {
+                zkc.stop();
+            }
+            bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.zkCache.get());
         }
     }
 
@@ -124,5 +158,8 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
         if (this.clientIsolationZkCache.get() != null) {
             this.clientIsolationZkCache.get().stop();
         }
+        if (this.zkCache.get() != null) {
+            this.zkCache.get().stop();
+        }
     }
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 1f401d5..bb286ad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -20,28 +20,34 @@ package org.apache.pulsar.broker;
 
 import java.io.Closeable;
 import java.io.IOException;
-
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.RejectedExecutionException;
+
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.BookkeeperFactoryForCustomEnsemblePlacementPolicy;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+
 public class ManagedLedgerClientFactory implements Closeable {
 
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerClientFactory.class);
 
     private final ManagedLedgerFactory managedLedgerFactory;
-    private final BookKeeper bkClient;
+    private final BookKeeper defaultBkClient;
+    private final Map<EnsemblePlacementPolicyConfig, BookKeeper> bkEnsemblePolicyToBkClientMap = Maps.newConcurrentMap();
 
     public ManagedLedgerClientFactory(ServiceConfiguration conf, ZooKeeper zkClient,
             BookKeeperClientFactory bookkeeperProvider) throws Exception {
-        this.bkClient = bookkeeperProvider.create(conf, zkClient);
-
         ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
         managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L);
         managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
@@ -52,7 +58,30 @@ public class ManagedLedgerClientFactory implements Closeable {
         managedLedgerFactoryConfig.setThresholdBackloggedCursor(conf.getManagedLedgerCursorBackloggedThreshold());
         managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
 
-        this.managedLedgerFactory = new ManagedLedgerFactoryImpl(bkClient, zkClient, managedLedgerFactoryConfig);
+        this.defaultBkClient = bookkeeperProvider.create(conf, zkClient, Optional.empty(), null);
+        
+        BookkeeperFactoryForCustomEnsemblePlacementPolicy bkFactory = (
+                EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) -> {
+            BookKeeper bkClient = null;
+            // find or create bk-client in cache for a specific ensemblePlacementPolicy
+            if (ensemblePlacementPolicyConfig != null && ensemblePlacementPolicyConfig.getPolicyClass() != null) {
+                bkClient = bkEnsemblePolicyToBkClientMap.computeIfAbsent(ensemblePlacementPolicyConfig, (key) -> {
+                    try {
+                        return bookkeeperProvider.create(conf, zkClient,
+                                Optional.ofNullable(ensemblePlacementPolicyConfig.getPolicyClass()),
+                                ensemblePlacementPolicyConfig.getProperties());
+                    } catch (Exception e) {
+                        log.error("Failed to initialize bk-client for policy {}, properties {}",
+                                ensemblePlacementPolicyConfig.getPolicyClass(),
+                                ensemblePlacementPolicyConfig.getProperties(), e);
+                    }
+                    return this.defaultBkClient;
+                });
+            }
+            return bkClient != null ? bkClient : defaultBkClient;
+        };
+
+        this.managedLedgerFactory = new ManagedLedgerFactoryImpl(bkFactory, zkClient, managedLedgerFactoryConfig);
     }
 
     public ManagedLedgerFactory getManagedLedgerFactory() {
@@ -60,7 +89,12 @@ public class ManagedLedgerClientFactory implements Closeable {
     }
 
     public BookKeeper getBookKeeperClient() {
-        return bkClient;
+        return defaultBkClient;
+    }
+
+    @VisibleForTesting
+    public Map<EnsemblePlacementPolicyConfig, BookKeeper> getBkEnsemblePolicyToBookKeeperMap() {
+        return bkEnsemblePolicyToBkClientMap;
     }
 
     public void close() throws IOException {
@@ -69,7 +103,7 @@ public class ManagedLedgerClientFactory implements Closeable {
             log.info("Closed managed ledger factory");
 
             try {
-                bkClient.close();
+                defaultBkClient.close();
             } catch (RejectedExecutionException ree) {
                 // when closing bookkeeper client, it will error outs all pending metadata operations.
                 // those callbacks of those operations will be triggered, and submitted to the scheduler
@@ -80,6 +114,17 @@ public class ManagedLedgerClientFactory implements Closeable {
                 // factory, however that might be introducing more unknowns.
                 log.warn("Encountered exceptions on closing bookkeeper client", ree);
             }
+            if (bkEnsemblePolicyToBkClientMap != null) {
+                bkEnsemblePolicyToBkClientMap.forEach((policy, bk) -> {
+                    try {
+                        if (bk != null) {
+                            bk.close();
+                        }
+                    } catch (Exception e) {
+                        log.warn("Failed to close bookkeeper-client for policy {}", policy, e);
+                    }
+                });
+            }
             log.info("Closed BookKeeper client");
         } catch (Exception e) {
             log.warn(e.getMessage(), e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 934ce10..098d52d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -691,6 +691,10 @@ public class PulsarService implements AutoCloseable {
         return managedLedgerClientFactory.getManagedLedgerFactory();
     }
 
+    public ManagedLedgerClientFactory getManagedLedgerClientFactory() {
+        return managedLedgerClientFactory;
+    }
+    
     public LedgerOffloader getManagedLedgerOffloader() {
         return offloader;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index f587735..48a3c5d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -20,9 +20,12 @@ package org.apache.pulsar.broker.admin.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
+import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
+import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -72,6 +75,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -79,6 +83,7 @@ import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrat
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -577,6 +582,93 @@ public abstract class NamespacesBase extends AdminResource {
         log.info("[{}] Successfully unloaded all the bundles in namespace {}", clientAppId(), namespaceName);
     }
 
+    
+    protected void internalSetBookieAffinityGroup(String bookieAffinityGroup) {
+        log.info("[{}] Setting bookie-affinity-group {} for namespace {}", clientAppId(), bookieAffinityGroup,
+                this.namespaceName);
+
+        validateSuperUserAccess();
+
+        if (namespaceName.isGlobal()) {
+            // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
+            validateGlobalNamespaceOwnership(namespaceName);
+        } else {
+            validateClusterOwnership(namespaceName.getCluster());
+            validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster());
+        }
+
+        try {
+            String path = joinPath(LOCAL_POLICIES_ROOT, this.namespaceName.toString());
+            Stat nodeStat = new Stat();
+
+            LocalPolicies localPolicies = null;
+            int version = -1;
+            try {
+                byte[] content = pulsar().getLocalZkCache().getZooKeeper().getData(path, false, nodeStat);
+                localPolicies = jsonMapper().readValue(content, LocalPolicies.class);
+                version = nodeStat.getVersion();
+            } catch (KeeperException.NoNodeException e) {
+                log.info("local-policies for {} is not setup at path {}", this.namespaceName, path);
+                // if policies is not present into localZk then create new policies
+                this.pulsar().getLocalZkCacheService().createPolicies(path, false)
+                        .get(pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
+                localPolicies = new LocalPolicies();
+            }
+            localPolicies.bookkeeperAffinityGroup = bookieAffinityGroup;
+            byte[] data = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(localPolicies);
+            pulsar().getLocalZkCache().getZooKeeper().setData(path, data, Math.toIntExact(version));
+            // invalidate namespace's local-policies
+            pulsar().getLocalZkCacheService().policiesCache().invalidate(path);
+            log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(),
+                    namespaceName, jsonMapper().writeValueAsString(localPolicies));
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update local-policy configuration for namespace {}: does not exist", clientAppId(),
+                    namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to update persistence configuration for namespace {}: concurrent modification",
+                    clientAppId(), namespaceName);
+            throw new RestException(Status.CONFLICT, "Concurrent modification");
+        } catch (Exception e) {
+            log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName,
+                    e);
+            throw new RestException(e);
+        }
+    }
+
+    protected String internalGetBookieAffinityGroup() {
+        validateSuperUserAccess();
+
+        if (namespaceName.isGlobal()) {
+            // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
+            validateGlobalNamespaceOwnership(namespaceName);
+        } else {
+            validateClusterOwnership(namespaceName.getCluster());
+            validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster());
+        }
+
+        String path = joinPath(LOCAL_POLICIES_ROOT, this.namespaceName.toString());
+        try {
+            Optional<LocalPolicies> policies = pulsar().getLocalZkCacheService().policiesCache().get(path);
+            final String bookkeeperAffinityGroup = policies.orElseThrow(() -> new RestException(Status.NOT_FOUND,
+                    "Namespace local-policies does not exist")).bookkeeperAffinityGroup;
+            if (StringUtils.isBlank(bookkeeperAffinityGroup)) {
+                throw new RestException(Status.NOT_FOUND, "bookie-affinity group does not exist");
+            }
+            return bookkeeperAffinityGroup;
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update local-policy configuration for namespace {}: does not exist", clientAppId(),
+                    namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace policies does not exist");
+        } catch (RestException re) {
+            throw re;
+        } catch (Exception e) {
+            log.error("[{}] Failed to get local-policy configuration for namespace {} at path {}", clientAppId(),
+                    namespaceName, path, e);
+            throw new RestException(e);
+        }
+    }
+
     @SuppressWarnings("deprecation")
     public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritative) {
         log.info("[{}] Unloading namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 35b5713..4dfa6ce 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -551,6 +551,30 @@ public class Namespaces extends NamespacesBase {
         internalSetPersistence(persistence);
     }
 
+    @POST
+    @Path("/{property}/{cluster}/{namespace}/persistence/bookieAffinity/{bookieAffinityGroup}")
+    @ApiOperation(hidden = true, value = "Set the bookie-affinity-group to namespace-local policy.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public void setBookieAffinityGroup(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, @PathParam("bookieAffinityGroup") String bookieAffinityGroup) {
+        validateNamespaceName(property, cluster, namespace);
+        internalSetBookieAffinityGroup(bookieAffinityGroup);
+    }
+
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/persistence/bookieAffinity")
+    @ApiOperation(hidden = true, value = "Get the bookie-affinity-group from namespace-local policy.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public String getBookieAffinityGroup(@PathParam("property") String property,
+            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, cluster, namespace);
+        return internalGetBookieAffinityGroup();
+    }
+    
     @GET
     @Path("/{property}/{cluster}/{namespace}/persistence")
     @ApiOperation(hidden = true, value = "Get the persistence configuration for a namespace.")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index b0715ff..d6b9042 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -491,6 +491,30 @@ public class Namespaces extends NamespacesBase {
         internalSetPersistence(persistence);
     }
 
+    @POST
+    @Path("/{tenant}/{namespace}/persistence/bookieAffinity/{bookieAffinityGroup}")
+    @ApiOperation(value = "Set the bookie-affinity-group to namespace-persistent policy.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public void setBookieAffinityGroup(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+            @PathParam("bookieAffinityGroup") String bookieAffinityGroup) {
+        validateNamespaceName(tenant, namespace);
+        internalSetBookieAffinityGroup(bookieAffinityGroup);
+    }
+
+    @GET
+    @Path("/{property}/{namespace}/persistence/bookieAffinity")
+    @ApiOperation(hidden = true, value = "Get the bookie-affinity-group from namespace-local policy.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public String getBookieAffinityGroup(@PathParam("property") String property,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, namespace);
+        return internalGetBookieAffinityGroup();
+    }
+    
     @GET
     @Path("/{tenant}/{namespace}/persistence")
     @ApiOperation(value = "Get the persistence configuration for a namespace.")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8ceaee8..1254088 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -24,6 +24,8 @@ import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static org.apache.commons.collections.CollectionUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
+import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -109,6 +111,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -122,12 +125,14 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
 import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -712,10 +717,13 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
 
             // Get persistence policy for this topic
             Optional<Policies> policies = Optional.empty();
+            Optional<LocalPolicies> localPolicies = Optional.empty();
             try {
                 policies = pulsar
                         .getConfigurationCache().policiesCache().get(AdminResource.path(POLICIES,
                                 namespace.toString()));
+                String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString());
+                localPolicies = pulsar().getLocalZkCacheService().policiesCache().get(path);
             } catch (Throwable t) {
                 // Ignoring since if we don't have policies, we fallback on the default
                 log.warn("Got exception when reading persistence policy for {}: {}", topicName, t.getMessage(), t);
@@ -738,6 +746,14 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
             managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
             managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+            if (localPolicies.isPresent() && StringUtils.isNotBlank(localPolicies.get().bookkeeperAffinityGroup)) {
+                managedLedgerConfig
+                        .setBookKeeperEnsemblePlacementPolicyClassName(ZkIsolatedBookieEnsemblePlacementPolicy.class);
+                Map<String, Object> properties = Maps.newHashMap();
+                properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+                        localPolicies.get().bookkeeperAffinityGroup);
+                managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+            }
             managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
             managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index b7a531b..56fca20 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -101,7 +101,9 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
     public void start() throws IOException {
         this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
             pulsar.getConfiguration(),
-            pulsar.getZkClient()
+            pulsar.getZkClient(),
+            Optional.empty(),
+            null
         );
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index 0816625..f003b04 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -26,6 +26,7 @@ import com.beust.jcommander.Parameter;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.nio.file.Paths;
+import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -106,7 +107,7 @@ public class CompactorTool {
                                               ZooKeeperClientFactory.SessionType.ReadWrite,
                                               (int)brokerConfig.getZooKeeperSessionTimeoutMillis()).get();
         BookKeeperClientFactory bkClientFactory = new BookKeeperClientFactoryImpl();
-        BookKeeper bk = bkClientFactory.create(brokerConfig, zk);
+        BookKeeper bk = bkClientFactory.create(brokerConfig, zk, Optional.empty(), null);
         try (PulsarClient pulsar = clientBuilder.build()) {
             Compactor compactor = new TwoPhaseCompactor(brokerConfig, pulsar, bk, scheduler);
             long ledgerId = compactor.compact(arguments.topic).get();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
index 728bc67..4b7f49b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
@@ -22,10 +22,13 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.io.IOException;
 import java.util.concurrent.Executors;
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 
 import org.apache.zookeeper.ZooKeeper;
@@ -51,7 +54,9 @@ public class MockedBookKeeperClientFactory implements BookKeeperClientFactory {
     }
 
     @Override
-    public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException {
+    public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+            Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+            Map<String, Object> properties) throws IOException {
         return mockedBk;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index b987503..341b96a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -24,12 +24,12 @@ import static org.mockito.Mockito.spy;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -39,6 +39,7 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.bookkeeper.util.ZkUtils;
@@ -274,7 +275,9 @@ public abstract class MockedPulsarServiceBaseTest {
     private BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory() {
 
         @Override
-        public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) {
+        public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+                Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+                Map<String, Object> properties) {
             // Always return the same instance (so that we don't loose the mock BK content on broker restart
             return mockBookKeeper;
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
new file mode 100644
index 0000000..5fe3f1d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.EnsemblePlacementPolicyConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.pulsar.broker.ManagedLedgerClientFactory;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ */
+public class BrokerBookieIsolationTest {
+
+    private LocalBookkeeperEnsemble bkEnsemble;
+    private PulsarService pulsarService;
+
+    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+    private final int PRIMARY_BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
+    private final int PRIMARY_BROKER_PORT = PortManager.nextFreePort();
+
+    private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+    private final ObjectMapper jsonMapper = ObjectMapperFactory.create();
+
+    @BeforeMethod
+    protected void setup() throws Exception {
+        // Start local bookkeeper ensemble
+        bkEnsemble = new LocalBookkeeperEnsemble(4, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
+        bkEnsemble.start();
+    }
+
+    @AfterMethod
+    protected void cleanup() throws Exception {
+        if (pulsarService != null) {
+            pulsarService.close();
+        }
+        bkEnsemble.stop();
+    }
+
+    /**
+     * Validate that broker can support tenant based bookie isolation.
+     * 
+     * <pre>
+     * 1. create two bookie-info group : default-group and isolated-group
+     * 2. namespace ns1 : uses default-group 
+     *    validate: bookie-ensemble for ns1-topics's ledger will be from default-group 
+     * 3. namespace ns2,ns3,ns4: uses isolated-group
+     *    validate: bookie-ensemble for above namespace-topics's ledger will be from isolated-group
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testBookieIsolation() throws Exception {
+
+        final String tenant1 = "tenant1";
+        final String cluster = "use";
+        final String ns1 = String.format("%s/%s/%s", tenant1, cluster, "ns1");
+        final String ns2 = String.format("%s/%s/%s", tenant1, cluster, "ns2");
+        final String ns3 = String.format("%s/%s/%s", tenant1, cluster, "ns3");
+        final String ns4 = String.format("%s/%s/%s", tenant1, cluster, "ns4");
+        final int totalPublish = 100;
+
+        final String brokerBookkeeperClientIsolationGroups = "default-group";
+        final String tenantNamespaceIsolationGroups = "tenant1-isolation";
+
+        BookieServer[] bookies = bkEnsemble.getBookies();
+        ZooKeeper zkClient = bkEnsemble.getZkClient();
+
+        Set<BookieSocketAddress> defaultBookies = Sets.newHashSet(bookies[0].getLocalAddress(),
+                bookies[1].getLocalAddress());
+        Set<BookieSocketAddress> isolatedBookies = Sets.newHashSet(bookies[2].getLocalAddress(),
+                bookies[3].getLocalAddress());
+
+        setDefaultIsolationGroup(brokerBookkeeperClientIsolationGroups, zkClient, defaultBookies);
+        setDefaultIsolationGroup(tenantNamespaceIsolationGroups, zkClient, isolatedBookies);
+
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+        config.setClusterName(cluster);
+        config.setWebServicePort(Optional.of(PRIMARY_BROKER_WEBSERVICE_PORT));
+        config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+        config.setBrokerServicePort(Optional.of(PRIMARY_BROKER_PORT));
+        config.setAdvertisedAddress("localhost");
+        config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);
+
+        config.setManagedLedgerDefaultEnsembleSize(2);
+        config.setManagedLedgerDefaultWriteQuorum(2);
+        config.setManagedLedgerDefaultAckQuorum(2);
+
+        int totalEntriesPerLedger = 20;
+        int totalLedgers = totalPublish / totalEntriesPerLedger;
+        config.setManagedLedgerMaxEntriesPerLedger(totalEntriesPerLedger);
+        config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+        pulsarService = new PulsarService(config);
+        pulsarService.start();
+
+        URL brokerUrl = new URL("http://127.0.0.1" + ":" + PRIMARY_BROKER_WEBSERVICE_PORT);
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).build();
+
+        ClusterData clusterData = new ClusterData(pulsarService.getWebServiceAddress());
+        admin.clusters().createCluster(cluster, clusterData);
+        TenantInfo tenantInfo = new TenantInfo(null, Sets.newHashSet(cluster));
+        admin.tenants().createTenant(tenant1, tenantInfo);
+        admin.namespaces().createNamespace(ns1);
+        admin.namespaces().createNamespace(ns2);
+        admin.namespaces().createNamespace(ns3);
+        admin.namespaces().createNamespace(ns4);
+        admin.namespaces().setBookieAffinityGroup(ns2, tenantNamespaceIsolationGroups);
+        admin.namespaces().setBookieAffinityGroup(ns3, tenantNamespaceIsolationGroups);
+        admin.namespaces().setBookieAffinityGroup(ns4, tenantNamespaceIsolationGroups);
+
+        assertEquals(admin.namespaces().getBookieAffinityGroup(ns2), tenantNamespaceIsolationGroups);
+        assertEquals(admin.namespaces().getBookieAffinityGroup(ns3), tenantNamespaceIsolationGroups);
+        assertEquals(admin.namespaces().getBookieAffinityGroup(ns4), tenantNamespaceIsolationGroups);
+
+        try {
+            admin.namespaces().getBookieAffinityGroup(ns1);
+        } catch (PulsarAdminException.NotFoundException e) {
+            // Ok
+        }
+
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(brokerUrl.toString())
+                .statsInterval(-1, TimeUnit.SECONDS).build();
+
+        PersistentTopic topic1 = (PersistentTopic) createTopicAndPublish(pulsarClient, ns1, "topic1", totalPublish);
+        PersistentTopic topic2 = (PersistentTopic) createTopicAndPublish(pulsarClient, ns2, "topic1", totalPublish);
+        PersistentTopic topic3 = (PersistentTopic) createTopicAndPublish(pulsarClient, ns3, "topic1", totalPublish);
+        PersistentTopic topic4 = (PersistentTopic) createTopicAndPublish(pulsarClient, ns4, "topic1", totalPublish);
+
+        Bookie bookie1 = bookies[0].getBookie();
+        Field ledgerManagerField = Bookie.class.getDeclaredField("ledgerManager");
+        ledgerManagerField.setAccessible(true);
+        LedgerManager ledgerManager = (LedgerManager) ledgerManagerField.get(bookie1);
+
+        // namespace: ns1
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1.getManagedLedger();
+        assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+        // validate ledgers' ensemble with affinity bookies
+        assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), defaultBookies);
+
+        // namespace: ns2
+        ml = (ManagedLedgerImpl) topic2.getManagedLedger();
+        assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+        // validate ledgers' ensemble with affinity bookies
+        assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+
+        // namespace: ns3
+        ml = (ManagedLedgerImpl) topic3.getManagedLedger();
+        assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+        // validate ledgers' ensemble with affinity bookies
+        assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+
+        // namespace: ns4
+        ml = (ManagedLedgerImpl) topic4.getManagedLedger();
+        assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+        // validate ledgers' ensemble with affinity bookies
+        assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+
+        ManagedLedgerClientFactory mlFactory = pulsarService.getManagedLedgerClientFactory();
+        Map<EnsemblePlacementPolicyConfig, BookKeeper> bkPlacementPolicyToBkClientMap = mlFactory
+                .getBkEnsemblePolicyToBookKeeperMap();
+
+        // broker should create only 1 bk-client and factory per isolation-group
+        assertEquals(bkPlacementPolicyToBkClientMap.size(), 1);
+        Class<? extends EnsemblePlacementPolicy> clazz = bkPlacementPolicyToBkClientMap.keySet().iterator().next()
+                .getPolicyClass();
+        System.out.println(clazz);
+
+    }
+
+    private void assertAffinityBookies(LedgerManager ledgerManager, List<LedgerInfo> ledgers1,
+            Set<BookieSocketAddress> defaultBookies) throws Exception {
+        for (LedgerInfo lInfo : ledgers1) {
+            long ledgerId = lInfo.getLedgerId();
+            CompletableFuture<Versioned<LedgerMetadata>> ledgerMetaFuture = ledgerManager.readLedgerMetadata(ledgerId);
+            LedgerMetadata ledgerMetadata = ledgerMetaFuture.get().getValue();
+            Set<BookieSocketAddress> ledgerBookies = Sets.newHashSet();
+            ledgerBookies.addAll(ledgerMetadata.getAllEnsembles().values().iterator().next());
+            assertEquals(ledgerBookies.size(), defaultBookies.size());
+            ledgerBookies.removeAll(defaultBookies);
+            assertEquals(ledgerBookies.size(), 0);
+        }
+    }
+
+    private Topic createTopicAndPublish(PulsarClient pulsarClient, String ns, String topicLocalName, int totalPublish)
+            throws Exception {
+        final String topicName = String.format("persistent://%s/%s", ns, topicLocalName);
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+                .subscribe();
+        consumer.close();
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
+
+        Producer<byte[]> producer = producerBuilder.create();
+        for (int i = 0; i < totalPublish; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        producer.close();
+
+        return pulsarService.getBrokerService().getTopicReference(topicName).get();
+    }
+
+    private void setDefaultIsolationGroup(String brokerBookkeeperClientIsolationGroups, ZooKeeper zkClient,
+            Set<BookieSocketAddress> bookieAddresses) throws Exception {
+        BookiesRackConfiguration bookies = null;
+        try {
+            byte[] data = zkClient.getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false, null);
+            System.out.println(new String(data));
+            bookies = jsonMapper.readValue(data, BookiesRackConfiguration.class);
+        } catch (KeeperException.NoNodeException e) {
+            // Ok.. create new bookie znode
+            zkClient.create(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, "".getBytes(), Acl,
+                    CreateMode.PERSISTENT);
+        }
+        if (bookies == null) {
+            bookies = new BookiesRackConfiguration();
+        }
+
+        Map<String, BookieInfo> bookieInfoMap = Maps.newHashMap();
+        for (BookieSocketAddress bkSocket : bookieAddresses) {
+            BookieInfo info = new BookieInfo("use", bkSocket.getHostName() + ":" + bkSocket.getPort());
+            bookieInfoMap.put(bkSocket.toString(), info);
+        }
+        bookies.put(brokerBookkeeperClientIsolationGroups, bookieInfoMap);
+
+        zkClient.setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookies), -1);
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(BrokerBookieIsolationTest.class);
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 444d3e4..504de2a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -27,6 +27,7 @@ import io.netty.buffer.Unpooled;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
@@ -136,7 +137,7 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testEntryLookup() throws Exception {
         BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
-                this.conf, null);
+                this.conf, null, Optional.empty(), null);
 
         Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, Long>>> compactedLedgerData
             = buildCompactedLedger(bk, 500);
@@ -192,7 +193,7 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testCleanupOldCompactedTopicLedger() throws Exception {
         BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
-                this.conf, null);
+                this.conf, null, Optional.empty(), null);
 
         LedgerHandle oldCompactedLedger = bk.createLedger(1, 1,
                 Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 1928a32..1e68f80 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -81,7 +82,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
 
         compactionScheduler = Executors.newSingleThreadScheduledExecutor(
                 new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
-        bk = pulsar.getBookKeeperClientFactory().create(this.conf, null);
+        bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, Optional.empty(), null);
     }
 
     @AfterMethod
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 32765ba..bddc2bc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -31,6 +31,7 @@ import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
@@ -81,7 +82,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
 
     private List<String> compactAndVerify(String topic, Map<String, byte[]> expected) throws Exception {
         BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
-                this.conf, null);
+                this.conf, null, Optional.empty(), null);
         Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
         long compactedLedgerId = compactor.compact(topic).get();
 
@@ -214,7 +215,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
 
         BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
-                this.conf, null);
+                this.conf, null, Optional.empty(), null);
         Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
         compactor.compact(topic).get();
     }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 29f509b..098a8f8 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -761,6 +761,10 @@ public interface Namespaces {
      */
     PersistencePolicies getPersistence(String namespace) throws PulsarAdminException;
 
+    void setBookieAffinityGroup(String namespace, String bookieAffinityGroup) throws PulsarAdminException;
+
+    String getBookieAffinityGroup(String namespace) throws PulsarAdminException;
+
     /**
      * Set the retention configuration for all the topics on a namespace.
      * <p/>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 42cdfbd..a577e53 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -404,6 +404,28 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     }
 
     @Override
+    public void setBookieAffinityGroup(String namespace, String bookieAffinityGroup) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "persistence", "bookieAffinity", bookieAffinityGroup);
+            request(path).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public String getBookieAffinityGroup(String namespace) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "persistence", "bookieAffinity");
+            return request(path).get(String.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
     public PersistencePolicies getPersistence(String namespace) throws PulsarAdminException {
         try {
             NamespaceName ns = NamespaceName.get(namespace);
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 025a818..4721929 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -280,6 +280,12 @@ public class PulsarAdminToolTest {
         namespaces.run(split("get-clusters myprop/clust/ns1"));
         verify(mockNamespaces).getNamespaceReplicationClusters("myprop/clust/ns1");
 
+        namespaces.run(split("set-bookie-affinity-group myprop/clust/ns1 --group test"));
+        verify(mockNamespaces).setBookieAffinityGroup("myprop/clust/ns1", "test");
+        
+        namespaces.run(split("get-bookie-affinity-group myprop/clust/ns1"));
+        verify(mockNamespaces).getBookieAffinityGroup("myprop/clust/ns1");
+        
         namespaces.run(split("unload myprop/clust/ns1"));
         verify(mockNamespaces).unload("myprop/clust/ns1");
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 7b7043f..359f25b 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -421,6 +421,35 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Set the bookie-affinity group name")
+    private class SetBookieAffinityGroup extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--group",
+                "-g" }, description = "Bookie-affinity group name where namespace messages should be written", required = true)
+        private String bookieAffinityGroupName;
+
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setBookieAffinityGroup(namespace, bookieAffinityGroupName);
+        }
+    }
+    
+    @Parameters(commandDescription = "Get the bookie-affinity group name")
+    private class GetBookieAffinityGroup extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getBookieAffinityGroup(namespace));
+        }
+    }
+
     @Parameters(commandDescription = "Get message TTL for a namespace")
     private class GetMessageTTL extends CliCommand {
         @Parameter(description = "tenant/namespace\n", required = true)
@@ -1125,6 +1154,9 @@ public class CmdNamespaces extends CmdBase {
 
         jcommander.addCommand("get-retention", new GetRetention());
         jcommander.addCommand("set-retention", new SetRetention());
+        
+        jcommander.addCommand("set-bookie-affinity-group", new SetBookieAffinityGroup());
+        jcommander.addCommand("get-bookie-affinity-group", new GetBookieAffinityGroup());
 
         jcommander.addCommand("unload", new Unload());
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
index d664d73..dc8b6b1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
@@ -25,6 +25,7 @@ import com.google.common.base.Objects;
 public class LocalPolicies {
 
     public BundlesData bundles;
+    public String bookkeeperAffinityGroup;
 
     public LocalPolicies() {
         bundles = defaultBundle();
@@ -32,14 +33,15 @@ public class LocalPolicies {
 
     @Override
     public int hashCode() {
-        return Objects.hashCode(bundles);
+        return Objects.hashCode(bundles, bookkeeperAffinityGroup);
     }
 
     @Override
     public boolean equals(Object obj) {
         if (obj instanceof LocalPolicies) {
             LocalPolicies other = (LocalPolicies) obj;
-            return Objects.equal(bundles, other.bundles);
+            return Objects.equal(bundles, other.bundles)
+                    && Objects.equal(bookkeeperAffinityGroup, other.bookkeeperAffinityGroup);
         }
         return false;
     }
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
index 010d45a..ca0d95d 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.zookeeper;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import java.net.UnknownHostException;
@@ -38,6 +39,7 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.pulsar.common.policies.data.BookieInfo;
 import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;