You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/11/17 23:44:10 UTC

bookkeeper git commit: BOOKKEEPER-912: Allow EnsemblePlacementPolicy to choose bookies using ledger custom data (multitenancy support)

Repository: bookkeeper
Updated Branches:
  refs/heads/master 9359d682a -> 95ea4815b


BOOKKEEPER-912: Allow EnsemblePlacementPolicy to choose bookies using ledger custom data (multitenancy support)

Author: eolivelli <eo...@gmail.com>

Reviewers: sijie@apache.org <si...@apache.org>

Closes #68 from eolivelli/BOOKKEEPER-912 and squashes the following commits:

6c1eaca [eolivelli] BOOKKEEPER-912 Allow EnsemblePlacementPolicy to choose bookies using ledger custom data (multitenancy support)
7c0ab37 [eolivelli] BOOKKEEPER-912 Allow EnsemblePlacementPolicy to choose bookies using ledger custom data (multitenancy support)


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/95ea4815
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/95ea4815
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/95ea4815

Branch: refs/heads/master
Commit: 95ea4815b5313ea2ab02e5192c9f577d1c0fe24e
Parents: 9359d68
Author: eolivelli <eo...@gmail.com>
Authored: Thu Nov 17 15:44:06 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Thu Nov 17 15:44:06 2016 -0800

----------------------------------------------------------------------
 .../LocalBookieEnsemblePlacementPolicy.java     |   6 +-
 .../apache/bookkeeper/client/BookieWatcher.java |  13 +-
 .../client/DefaultEnsemblePlacementPolicy.java  |   9 +-
 .../client/EnsemblePlacementPolicy.java         |  30 ++--
 .../bookkeeper/client/LedgerCreateOp.java       |   3 +-
 .../apache/bookkeeper/client/LedgerHandle.java  |   4 +-
 .../RackawareEnsemblePlacementPolicy.java       |  20 +--
 .../RackawareEnsemblePlacementPolicyImpl.java   |   8 +-
 .../RegionAwareEnsemblePlacementPolicy.java     |   6 +-
 .../GenericEnsemblePlacementPolicyTest.java     | 148 +++++++++++++++++++
 .../TestRackawareEnsemblePlacementPolicy.java   |  22 +--
 .../TestRegionAwareEnsemblePlacementPolicy.java |  58 ++++----
 12 files changed, 235 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 508511b..e7bfe94 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -74,7 +74,9 @@ public class LocalBookieEnsemblePlacementPolicy implements EnsemblePlacementPoli
     }
 
     @Override
-    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+        java.util.Map<String, byte[]> customMetadata, Collection<BookieSocketAddress> currentEnsemble,
+        BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
         throw new BKNotEnoughBookiesException();
     }
 
@@ -90,7 +92,7 @@ public class LocalBookieEnsemblePlacementPolicy implements EnsemblePlacementPoli
 
     @Override
     public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-            Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+        java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
         if (ensembleSize > 1) {
             throw new IllegalArgumentException("Local ensemble policy can only return 1 bookie");
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index b8d8951..cec6920 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -49,6 +49,7 @@ import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
+import java.util.Map;
 
 /**
  * This class is responsible for maintaining a consistent view of what bookies
@@ -255,18 +256,19 @@ class BookieWatcher implements Watcher, ChildrenCallback {
      * @return list of bookies for new ensemble.
      * @throws BKNotEnoughBookiesException
      */
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize)
+    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
+        int ackQuorumSize, Map<String, byte[]> customMetadata)
             throws BKNotEnoughBookiesException {
         try {
             // we try to only get from the healthy bookies first
             return placementPolicy.newEnsemble(ensembleSize,
-                    writeQuorumSize, ackQuorumSize, new HashSet<BookieSocketAddress>(
+                    writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<BookieSocketAddress>(
                     quarantinedBookies.asMap().keySet()));
         } catch (BKNotEnoughBookiesException e) {
             if (logger.isDebugEnabled()) {
                 logger.debug("Not enough healthy bookies available, using quarantined bookies");
             }
-            return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, EMPTY_SET);
+            return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, EMPTY_SET);
         }
     }
 
@@ -280,6 +282,7 @@ class BookieWatcher implements Watcher, ChildrenCallback {
      * @throws BKNotEnoughBookiesException
      */
     public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+                                             Map<String, byte[]> customMetadata,
                                              List<BookieSocketAddress> existingBookies, int bookieIdx,
                                              Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
@@ -288,13 +291,13 @@ class BookieWatcher implements Watcher, ChildrenCallback {
             // we exclude the quarantined bookies also first
             Set<BookieSocketAddress> existingAndQuarantinedBookies = new HashSet<BookieSocketAddress>(existingBookies);
             existingAndQuarantinedBookies.addAll(quarantinedBookies.asMap().keySet());
-            return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+            return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
                     existingAndQuarantinedBookies, addr, excludeBookies);
         } catch (BKNotEnoughBookiesException e) {
             if (logger.isDebugEnabled()) {
                 logger.debug("Not enough healthy bookies available, using quarantined bookies");
             }
-            return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+            return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
                     new HashSet<BookieSocketAddress>(existingBookies), addr, excludeBookies);
         }
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index 640bdb7..5a2c1f2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -45,8 +45,7 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
     private Set<BookieSocketAddress> knownBookies = new HashSet<BookieSocketAddress>();
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int quorumSize, int ackQuorumSize,
-            Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int quorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
         ArrayList<BookieSocketAddress> newBookies = new ArrayList<BookieSocketAddress>(ensembleSize);
         if (ensembleSize <= 0) {
             return newBookies;
@@ -70,11 +69,9 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
     }
 
     @Override
-    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Collection<BookieSocketAddress> currentEnsemble,
-                                           BookieSocketAddress bookieToReplace,
-                                           Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
         excludeBookies.addAll(currentEnsemble);
-        ArrayList<BookieSocketAddress> addresses = newEnsemble(1, 1, 1, excludeBookies);
+        ArrayList<BookieSocketAddress> addresses = newEnsemble(1, 1, 1, customMetadata, excludeBookies);
         return addresses.get(0);
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 2af8108..4a0f307 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -80,28 +80,32 @@ public interface EnsemblePlacementPolicy {
      *          Ensemble Size
      * @param writeQuorumSize
      *          Write Quorum Size
-     * @param excludeBookies
-     *          Bookies that should not be considered as targets.
-     * @return list of bookies chosen as targets.
+     * @param ackQuorumSize
+     *          the value of ackQuorumSize
+     * @param customMetadata the value of customMetadata
+     * @param excludeBookies Bookies that should not be considered as targets.
      * @throws BKNotEnoughBookiesException if not enough bookies available.
+     * @return the java.util.ArrayList<org.apache.bookkeeper.net.BookieSocketAddress>
      */
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-                                                      Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
+    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
 
     /**
      * Choose a new bookie to replace <i>bookieToReplace</i>. If no bookie available in the cluster,
      * {@link BKNotEnoughBookiesException} is thrown.
      *
-     * @param bookieToReplace
-     *          bookie to replace
-     * @param excludeBookies
-     *          bookies that should not be considered as candidate.
-     * @return the bookie chosen as target.
+     * @param ensembleSize
+     *          the value of ensembleSize
+     * @param writeQuorumSize
+     *          the value of writeQuorumSize
+     * @param ackQuorumSize the value of ackQuorumSize
+     * @param customMetadata the value of customMetadata
+     * @param currentEnsemble the value of currentEnsemble
+     * @param bookieToReplace bookie to replace
+     * @param excludeBookies bookies that should not be considered as candidate.
      * @throws BKNotEnoughBookiesException
+     * @return the org.apache.bookkeeper.net.BookieSocketAddress
      */
-    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-                                             Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace,
-                                             Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
+    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
 
     /**
      * Reorder the read sequence of a given write quorum <i>writeSet</i>.

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index e88df31..52a5cb6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -107,7 +107,8 @@ class LedgerCreateOp implements GenericCallback<Void> {
             ensemble = bk.bookieWatcher
                     .newEnsemble(metadata.getEnsembleSize(),
                             metadata.getWriteQuorumSize(),
-                            metadata.getAckQuorumSize());
+                            metadata.getAckQuorumSize(),
+                            metadata.getCustomMetadata());
         } catch (BKNotEnoughBookiesException e) {
             LOG.error("Not enough bookies to create ledger");
             createComplete(e.getCode(), null);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 5c33929..11212a7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -995,7 +995,9 @@ public class LedgerHandle implements AutoCloseable {
             newEnsemble.addAll(metadata.currentEnsemble);
             newBookie = bk.bookieWatcher.replaceBookie(metadata.getEnsembleSize(),
                     metadata.getWriteQuorumSize(),
-                    metadata.getAckQuorumSize(), newEnsemble,
+                    metadata.getAckQuorumSize(),
+                    metadata.getCustomMetadata(),
+                    newEnsemble,
                     bookieIndex, new HashSet<>(Arrays.asList(addr)));
 
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index f42e42a..c306ca0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -79,39 +79,31 @@ public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacement
 
     @Override
     public ArrayList<BookieSocketAddress> newEnsemble(
-            int ensembleSize,
-            int writeQuorumSize,
-            int ackQuorumSize,
-            Set<BookieSocketAddress> excludeBookies)
+        int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
         try {
-            return super.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, excludeBookies);
+            return super.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, excludeBookies);
         } catch (BKException.BKNotEnoughBookiesException bnebe) {
             if (slave == null) {
                 throw bnebe;
             } else {
-                return slave.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, excludeBookies);
+                return slave.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, excludeBookies);
             }
         }
     }
 
     @Override
     public BookieSocketAddress replaceBookie(
-            int ensembleSize,
-            int writeQuorumSize,
-            int ackQuorumSize,
-            Collection<BookieSocketAddress> currentEnsemble,
-            BookieSocketAddress bookieToReplace,
-            Set<BookieSocketAddress> excludeBookies)
+        int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
         try {
-            return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+            return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
                     currentEnsemble, bookieToReplace, excludeBookies);
         } catch (BKException.BKNotEnoughBookiesException bnebe) {
             if (slave == null) {
                 throw bnebe;
             } else {
-                return slave.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+                return slave.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,customMetadata,
                         currentEnsemble, bookieToReplace, excludeBookies);
             }
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 3c41a7c..79ff0da 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -259,8 +259,7 @@ class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacemen
     }
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-                                                      Set<BookieSocketAddress> excludeBookies)
+    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
         return newEnsembleInternal(ensembleSize, writeQuorumSize, excludeBookies, null, null);
     }
@@ -355,10 +354,7 @@ class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacemen
     }
 
     @Override
-    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-                                             Collection<BookieSocketAddress> currentEnsemble,
-                                             BookieSocketAddress bookieToReplace,
-                                             Set<BookieSocketAddress> excludeBookies)
+    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
         rwLock.readLock().lock();
         try {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 181feca..265499c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -212,8 +212,7 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
 
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-                                                    Set<BookieSocketAddress> excludeBookies) throws BKException.BKNotEnoughBookiesException {
+    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies) throws BKException.BKNotEnoughBookiesException {
 
         int effectiveMinRegionsForDurability = disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;
 
@@ -392,8 +391,7 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
     }
 
     @Override
-    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace,
-                                           Set<BookieSocketAddress> excludeBookies) throws BKException.BKNotEnoughBookiesException {
+    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies) throws BKException.BKNotEnoughBookiesException {
         rwLock.readLock().lock();
         try {
             boolean enforceDurability = enforceDurabilityInReplace && !disableDurabilityFeature.isAvailable();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
new file mode 100644
index 0000000..e28a691
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import org.junit.Before;
+import org.junit.Test;
+
+public class GenericEnsemblePlacementPolicyTest extends BookKeeperClusterTestCase {
+
+    private BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
+    private static final String PASSWORD = "testPasswd";
+    private static final String property = "foo";
+    private static final byte[] value = "bar".getBytes(StandardCharsets.UTF_8);
+    private static List<Map<String, byte[]>> customMetadataOnNewEnsembleStack = new ArrayList<>();
+    private static List<Map<String, byte[]>> customMetadataOnReplaceBookieStack = new ArrayList<>();
+
+    public GenericEnsemblePlacementPolicyTest() {
+        super(0);
+        baseClientConf.setEnsemblePlacementPolicy(CustomEnsemblePlacementPolicy.class);
+    }
+
+    public static final class CustomEnsemblePlacementPolicy extends DefaultEnsemblePlacementPolicy {
+
+        @Override
+        public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize,
+            int ackQuorumSize, Map<String, byte[]> customMetadata, Collection<BookieSocketAddress> currentEnsemble,
+            BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies)
+            throws BKException.BKNotEnoughBookiesException {
+            new Exception("replaceBookie " + ensembleSize + "," + customMetadata).printStackTrace();
+            assertNotNull(customMetadata);
+            customMetadataOnReplaceBookieStack.add(customMetadata);
+            return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
+                currentEnsemble, bookieToReplace, excludeBookies);
+        }
+
+        @Override
+        public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int quorumSize,
+            int ackQuorumSize, Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
+            throws BKException.BKNotEnoughBookiesException {
+            assertNotNull(customMetadata);
+            customMetadataOnNewEnsembleStack.add(customMetadata);
+            return super.newEnsemble(ensembleSize, quorumSize, ackQuorumSize, customMetadata, excludeBookies);
+        }
+        
+    }
+
+    @Before
+    public void reset() {
+        customMetadataOnNewEnsembleStack.clear();
+        customMetadataOnReplaceBookieStack.clear();
+    }
+
+    @Test(timeout = 60000)
+    public void testNewEnsemble() throws Exception {
+        numBookies = 1;
+        startBKCluster();
+        try {
+            Map<String, byte[]> customMetadata = new HashMap<>();
+            customMetadata.put(property, value);
+            try (BookKeeper bk = new BookKeeper(baseClientConf, zkc);) {
+                bk.createLedger(1, 1, 1, digestType, PASSWORD.getBytes(), customMetadata);
+            }
+            assertEquals(1, customMetadataOnNewEnsembleStack.size());
+            assertArrayEquals(value, customMetadataOnNewEnsembleStack.get(0).get(property));
+        } finally {
+            stopBKCluster();
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testNewEnsembleWithNotEnoughtBookies() throws Exception {
+        numBookies = 0;
+        try {
+            startBKCluster();
+            Map<String, byte[]> customMetadata = new HashMap<>();
+            customMetadata.put(property, value);
+            try (BookKeeper bk = new BookKeeper(baseClientConf, zkc);) {
+                bk.createLedger(1, 1, 1, digestType, PASSWORD.getBytes(), customMetadata);
+                fail("creation should fail");
+            } catch (BKException.BKNotEnoughBookiesException bneb) {
+            }
+            assertEquals(2, customMetadataOnNewEnsembleStack.size());
+            assertArrayEquals(value, customMetadataOnNewEnsembleStack.get(0).get(property));
+            assertArrayEquals(value, customMetadataOnNewEnsembleStack.get(1).get(property));
+        } finally {
+            stopBKCluster();
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testReplaceBookie() throws Exception {
+        numBookies = 3;
+        startBKCluster();
+        try {
+            Map<String, byte[]> customMetadata = new HashMap<>();
+            customMetadata.put(property, value);
+            try (BookKeeper bk = new BookKeeper(baseClientConf, zkc);) {
+                try (LedgerHandle lh = bk.createLedger(2, 2, 2, digestType, PASSWORD.getBytes(), customMetadata);) {
+                    lh.addEntry(value);
+                    long lId = lh.getId();
+                    ArrayList<BookieSocketAddress> ensembleAtFirstEntry = lh.getLedgerMetadata().getEnsemble(lId);
+                    assertEquals(2, ensembleAtFirstEntry.size());
+                    killBookie(ensembleAtFirstEntry.get(0));
+                    lh.addEntry(value);
+                }
+            }
+            assertEquals(2, customMetadataOnNewEnsembleStack.size());
+            assertArrayEquals(value, customMetadataOnNewEnsembleStack.get(0).get(property));
+            // replaceBookie by default calls newEnsemble, so newEnsemble gets called twice
+            assertArrayEquals(value, customMetadataOnNewEnsembleStack.get(0).get(property));
+
+            assertEquals(1, customMetadataOnReplaceBookieStack.size());
+            assertArrayEquals(value, customMetadataOnReplaceBookieStack.get(0).get(property));
+
+        } finally {
+            stopBKCluster();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 4f36902..bef6bc2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -240,7 +240,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         // replace node under r2
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, new HashSet<BookieSocketAddress>(), addr2, new HashSet<BookieSocketAddress>());
+        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, new HashSet<BookieSocketAddress>());
         assertEquals(addr3, replacedBookie);
     }
 
@@ -265,7 +265,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         // replace node under r2
         Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
         excludedAddrs.add(addr1);
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
 
         assertFalse(addr1.equals(replacedBookie));
         assertTrue(addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
@@ -295,7 +295,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         excludedAddrs.add(addr3);
         excludedAddrs.add(addr4);
         try {
-            repp.replaceBookie(1, 1, 1, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+            repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
             fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
             // should throw not enou
@@ -316,9 +316,9 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<BookieSocketAddress>());
             assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2));
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<BookieSocketAddress>());
             assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is only one rack.");
@@ -344,10 +344,10 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<BookieSocketAddress>());
             int numCovered = getNumCoveredWriteQuorums(ensemble, 2);
             assertTrue(numCovered >= 1 && numCovered < 3);
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<BookieSocketAddress>());
             numCovered = getNumCoveredWriteQuorums(ensemble2, 2);
             assertTrue(numCovered >= 1 && numCovered < 3);
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -386,9 +386,9 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr8);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, null, new HashSet<BookieSocketAddress>());
             assertEquals(3, getNumCoveredWriteQuorums(ensemble1, 2));
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<BookieSocketAddress>());
             assertEquals(4, getNumCoveredWriteQuorums(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is only one rack.");
@@ -489,12 +489,12 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         // we will never use addr4 even it is in the stabilized network topology
         for (int i = 0 ; i < 5; i++) {
             ArrayList<BookieSocketAddress> ensemble =
-                    repp.newEnsemble(3, 3, 3, new HashSet<BookieSocketAddress>());
+                    repp.newEnsemble(3, 3, 3, null, new HashSet<BookieSocketAddress>());
             assertFalse(ensemble.contains(addr4));
         }
 
         // we could still use addr4 for urgent allocation if it is just bookie flapping
-        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(4, 4, 4, new HashSet<BookieSocketAddress>());
+        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(4, 4, 4, null, new HashSet<BookieSocketAddress>());
         assertTrue(ensemble.contains(addr4));
     }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index 9a32986..5c61ae3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -269,7 +269,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         // replace node under r2
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, new HashSet<BookieSocketAddress>(), addr2, new HashSet<BookieSocketAddress>());
+        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, new HashSet<BookieSocketAddress>());
         assertEquals(addr3, replacedBookie);
     }
 
@@ -294,7 +294,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         // replace node under r2
         Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
         excludedAddrs.add(addr1);
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
 
         assertFalse(addr1.equals(replacedBookie));
         assertTrue(addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
@@ -319,7 +319,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> list = repp.newEnsemble(5, 5, 3, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> list = repp.newEnsemble(5, 5, 3, null, new HashSet<BookieSocketAddress>());
             LOG.info("Ensemble : {}", list);
             fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -351,7 +351,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         excludedAddrs.add(addr3);
         excludedAddrs.add(addr4);
         try {
-            repp.replaceBookie(1, 1, 1, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+            repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
             fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
             // should throw not enou
@@ -380,9 +380,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<BookieSocketAddress>());
             assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble, 2));
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<BookieSocketAddress>());
             assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is only one rack.");
@@ -411,7 +411,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<BookieSocketAddress>());
             int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble, 2);
             assertTrue(numCovered >= 1);
             assertTrue(numCovered < 3);
@@ -419,7 +419,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
             fail("Should not get not enough bookies exception even there is only one rack.");
         }
         try {
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<BookieSocketAddress>());
             int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble2, 2);
             assertTrue(numCovered >= 1 && numCovered < 3);
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -458,9 +458,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr8);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, null, new HashSet<BookieSocketAddress>());
             assertEquals(3, getNumCoveredRegionsInWriteQuorum(ensemble1, 2));
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<BookieSocketAddress>());
             assertEquals(4, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is only one rack.");
@@ -507,22 +507,22 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr10);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null, new HashSet<BookieSocketAddress>());
             assert(ensemble.contains(addr4));
             assert(ensemble.contains(addr8));
             assert(ensemble.size() == 6);
             assertEquals(3, getNumRegionsInEnsemble(ensemble));
-            ensemble = repp.newEnsemble(7, 7, 4, new HashSet<BookieSocketAddress>());
+            ensemble = repp.newEnsemble(7, 7, 4, null, new HashSet<BookieSocketAddress>());
             assert(ensemble.contains(addr4));
             assert(ensemble.contains(addr8));
             assert(ensemble.size() == 7);
             assertEquals(3, getNumRegionsInEnsemble(ensemble));
-            ensemble = repp.newEnsemble(8, 8, 5, new HashSet<BookieSocketAddress>());
+            ensemble = repp.newEnsemble(8, 8, 5, null, new HashSet<BookieSocketAddress>());
             assert(ensemble.contains(addr4));
             assert(ensemble.contains(addr8));
             assert(ensemble.size() == 8);
             assertEquals(3, getNumRegionsInEnsemble(ensemble));
-            ensemble = repp.newEnsemble(9, 9, 5, new HashSet<BookieSocketAddress>());
+            ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
             assert(ensemble.contains(addr4));
             assert(ensemble.contains(addr8));
             assert(ensemble.size() == 9);
@@ -575,7 +575,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
             ((SettableFeature) featureProvider.scope("region1").getFeature("disallowBookies")).set(true);
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null, new HashSet<BookieSocketAddress>());
             assertEquals(2, getNumRegionsInEnsemble(ensemble));
             assert(ensemble.contains(addr1));
             assert(ensemble.contains(addr3));
@@ -589,14 +589,14 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         }
         try {
             ((SettableFeature) featureProvider.scope("region2").getFeature("disallowBookies")).set(true);
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null, new HashSet<BookieSocketAddress>());
             fail("Should get not enough bookies exception even there is only one region with insufficient bookies.");
         } catch (BKNotEnoughBookiesException bnebe) {
             // Expected
         }
         try {
             ((SettableFeature) featureProvider.scope("region2").getFeature("disallowBookies")).set(false);
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null, new HashSet<BookieSocketAddress>());
             assert(ensemble.contains(addr1));
             assert(ensemble.contains(addr3));
             assert(ensemble.contains(addr4));
@@ -669,7 +669,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
 
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, new HashSet<BookieSocketAddress>());
+            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, null, new HashSet<BookieSocketAddress>());
             assert(ensemble.size() == 10);
             assertEquals(5, getNumRegionsInEnsemble(ensemble));
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -680,7 +680,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         try{
             Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
             excludedAddrs.add(addr10);
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, excludedAddrs);
+            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, null, excludedAddrs);
             assert(ensemble.contains(addr11) && ensemble.contains(addr12));
             assert(ensemble.size() == 10);
             assertEquals(5, getNumRegionsInEnsemble(ensemble));
@@ -771,7 +771,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
 
         ArrayList<BookieSocketAddress> ensemble;
         try {
-            ensemble = repp.newEnsemble(6, 6, ackQuorum, new HashSet<BookieSocketAddress>());
+            ensemble = repp.newEnsemble(6, 6, ackQuorum, null, new HashSet<BookieSocketAddress>());
             assert(ensemble.size() == 6);
             assertEquals(3, getNumRegionsInEnsemble(ensemble));
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -792,7 +792,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
             Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
             for(BookieSocketAddress addr: region2Bookies) {
                 if (ensemble.contains(addr)) {
-                    BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, ensemble, addr, excludedAddrs);
+                    BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, null, ensemble, addr, excludedAddrs);
                     ensemble.remove(addr);
                     ensemble.add(replacedBookie);
                 }
@@ -816,7 +816,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
             Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
 
             try {
-                BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, ensemble, bookieToReplace, excludedAddrs);
+                BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, null, ensemble, bookieToReplace, excludedAddrs);
                 assert (replacedBookie.equals(replacedBookieExpected));
                 assertEquals(3, getNumRegionsInEnsemble(ensemble));
             } catch (BKNotEnoughBookiesException bnebe) {
@@ -825,7 +825,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
 
             excludedAddrs.add(replacedBookieExpected);
             try {
-                BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, ensemble, bookieToReplace, excludedAddrs);
+                BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, null, ensemble, bookieToReplace, excludedAddrs);
                 if (minDurability > 1 && !disableDurabilityFeature.isAvailable()) {
                     fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
                 }
@@ -900,7 +900,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
 
         ArrayList<BookieSocketAddress> ensemble;
         try {
-            ensemble = repp.newEnsemble(6, 6, 4, new HashSet<BookieSocketAddress>());
+            ensemble = repp.newEnsemble(6, 6, 4, null, new HashSet<BookieSocketAddress>());
             assert(ensemble.size() == 6);
         } catch (BKNotEnoughBookiesException bnebe) {
             LOG.error("BKNotEnoughBookiesException", bnebe);
@@ -911,7 +911,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
 
         try{
-            repp.replaceBookie(6, 6, 4, ensemble, addr4, excludedAddrs);
+            repp.replaceBookie(6, 6, 4, null, ensemble, addr4, excludedAddrs);
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is only one rack.");
         }
@@ -964,7 +964,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         excludedAddrs.add(addr10);
         excludedAddrs.add(addr9);
         try {
-            ArrayList<BookieSocketAddress> list = repp.newEnsemble(5, 5, 5, excludedAddrs);
+            ArrayList<BookieSocketAddress> list = repp.newEnsemble(5, 5, 5, null, excludedAddrs);
             LOG.info("Ensemble : {}", list);
             fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -1025,7 +1025,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
     private void basicReorderReadSequenceWithLocalRegionTest(String myRegion, boolean isReadLAC) throws Exception {
         prepareNetworkTopologyForReorderTests(myRegion);
 
-        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, new HashSet<BookieSocketAddress>());
+        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
         assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
 
         DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1076,7 +1076,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
     private void basicReorderReadSequenceWithRemoteRegionTest(String myRegion, boolean isReadLAC) throws Exception {
         prepareNetworkTopologyForReorderTests(myRegion);
 
-        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, new HashSet<BookieSocketAddress>());
+        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
         assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
 
         DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1139,7 +1139,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
 
         prepareNetworkTopologyForReorderTests(myRegion);
 
-        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, new HashSet<BookieSocketAddress>());
+        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
         assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
 
         DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);