You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/06/12 18:41:48 UTC

svn commit: r1492274 [1/3] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ bookkee...

Author: ivank
Date: Wed Jun 12 16:41:47 2013
New Revision: 1492274

URL: http://svn.apache.org/r1492274
Log:
BOOKKEEPER-592: allow application to recommend ledger data locality (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/Configurable.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CommonConfigurationKeys.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Shell.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1492274&r1=1492273&r2=1492274&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Jun 12 16:41:47 2013
@@ -86,6 +86,8 @@ Trunk (unreleased changes)
 
       BOOKKEEPER-603: Support Boost 1.53 for Hedwig Cpp Client (jiannan via ivank)
 
+      BOOKKEEPER-592: allow application to recommend ledger data locality (sijie via ivank)
+
     NEW FEATURE:
 
       BOOKKEEPER-562: Ability to tell if a ledger is closed or not (fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1492274&r1=1492273&r2=1492274&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Wed Jun 12 16:41:47 2013
@@ -37,8 +37,10 @@ import org.apache.bookkeeper.meta.Ledger
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
+import org.apache.commons.configuration.ConfigurationException;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@@ -85,6 +87,9 @@ public class BookKeeper {
     final LedgerManagerFactory ledgerManagerFactory;
     final LedgerManager ledgerManager;
 
+    // Ensemble Placement Policy
+    final EnsemblePlacementPolicy placementPolicy;
+
     final ClientConfiguration conf;
 
     interface ZKConnectCallback {
@@ -131,10 +136,12 @@ public class BookKeeper {
         this.channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
                                                                 Executors.newCachedThreadPool());
         this.scheduler = Executors.newSingleThreadScheduledExecutor();
+        // initialize the ensemble placement
+        this.placementPolicy = initializeEnsemblePlacementPolicy(conf);
 
         mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
         bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
-        bookieWatcher = new BookieWatcher(conf, scheduler, this);
+        bookieWatcher = new BookieWatcher(conf, scheduler, placementPolicy, this);
         bookieWatcher.readBookiesBlocking();
 
         ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
@@ -195,16 +202,28 @@ public class BookKeeper {
         this.zk = zk;
         this.channelFactory = channelFactory;
         this.scheduler = Executors.newSingleThreadScheduledExecutor();
+        // initialize the ensemble placement
+        this.placementPolicy = initializeEnsemblePlacementPolicy(conf);
 
         mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
         bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
-        bookieWatcher = new BookieWatcher(conf, scheduler, this);
+        bookieWatcher = new BookieWatcher(conf, scheduler, placementPolicy, this);
         bookieWatcher.readBookiesBlocking();
 
         ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
         ledgerManager = ledgerManagerFactory.newLedgerManager();
     }
 
+    private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf)
+        throws IOException {
+        try {
+            Class<? extends EnsemblePlacementPolicy> policyCls = conf.getEnsemblePlacementPolicy();
+            return ReflectionUtils.newInstance(policyCls).initialize(conf);
+        } catch (ConfigurationException e) {
+            throw new IOException("Failed to initialize ensemble placement policy : ", e);
+        }
+    }
+
     LedgerManager getLedgerManager() {
         return ledgerManager;
     }
@@ -661,6 +680,7 @@ public class BookKeeper {
          * @param ctx
          *          optional control object
          */
+        @Override
         public void createComplete(int rc, LedgerHandle lh, Object ctx) {
             SyncCounter counter = (SyncCounter) ctx;
             counter.setLh(lh);
@@ -680,12 +700,13 @@ public class BookKeeper {
          * @param ctx
          *          optional control object
          */
+        @Override
         public void openComplete(int rc, LedgerHandle lh, Object ctx) {
             SyncCounter counter = (SyncCounter) ctx;
             counter.setLh(lh);
-            
+
             LOG.debug("Open complete: {}", rc);
-            
+
             counter.setrc(rc);
             counter.dec();
         }
@@ -700,6 +721,7 @@ public class BookKeeper {
          * @param ctx
          *            optional control object
          */
+        @Override
         public void deleteComplete(int rc, Object ctx) {
             SyncCounter counter = (SyncCounter) ctx;
             counter.setrc(rc);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=1492274&r1=1492273&r2=1492274&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java Wed Jun 12 16:41:47 2013
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -57,15 +56,15 @@ import org.apache.zookeeper.ZooDefs.Ids;
 class BookieWatcher implements Watcher, ChildrenCallback {
     static final Logger logger = LoggerFactory.getLogger(BookieWatcher.class);
 
+    public static int ZK_CONNECT_BACKOFF_SEC = 1;
+    private static final Set<InetSocketAddress> EMPTY_SET = new HashSet<InetSocketAddress>();
+
     // Bookie registration path in ZK
     private final String bookieRegistrationPath;
-    static final Set<InetSocketAddress> EMPTY_SET = new HashSet<InetSocketAddress>();
-    public static int ZK_CONNECT_BACKOFF_SEC = 1;
 
     final BookKeeper bk;
-
-    HashSet<InetSocketAddress> knownBookies = new HashSet<InetSocketAddress>();
     final ScheduledExecutorService scheduler;
+    final EnsemblePlacementPolicy placementPolicy;
 
     SafeRunnable reReadTask = new SafeRunnable() {
         @Override
@@ -77,16 +76,24 @@ class BookieWatcher implements Watcher, 
 
     public BookieWatcher(ClientConfiguration conf,
                          ScheduledExecutorService scheduler,
+                         EnsemblePlacementPolicy placementPolicy,
                          BookKeeper bk) throws KeeperException, InterruptedException  {
         this.bk = bk;
         // ZK bookie registration path
         this.bookieRegistrationPath = conf.getZkAvailableBookiesPath();
         this.scheduler = scheduler;
+        this.placementPolicy = placementPolicy;
         readOnlyBookieWatcher = new ReadOnlyBookieWatcher(conf, bk);
     }
 
-    public synchronized Collection<InetSocketAddress> getBookies() {
-        return new HashSet<InetSocketAddress>(knownBookies);
+    public Collection<InetSocketAddress> getBookies() {
+        try {
+            List<String> children = bk.getZkHandle().getChildren(this.bookieRegistrationPath, false);
+            return convertToBookieAddresses(children);
+        } catch (Exception e) {
+            logger.error("Failed to get bookies : ", e);
+            return new HashSet<InetSocketAddress>();
+        }
     }
 
     public void readBookies() {
@@ -118,13 +125,10 @@ class BookieWatcher implements Watcher, 
 
         HashSet<InetSocketAddress> newBookieAddrs = convertToBookieAddresses(children);
 
-        final HashSet<InetSocketAddress> deadBookies;
+        final Set<InetSocketAddress> deadBookies;
         synchronized (this) {
-            deadBookies = new HashSet<InetSocketAddress>(knownBookies);
-            deadBookies.removeAll(newBookieAddrs);
-            // No need to close readonly bookie clients.
-            deadBookies.removeAll(readOnlyBookieWatcher.getReadOnlyBookies());
-            knownBookies = newBookieAddrs;
+            Set<InetSocketAddress> readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies();
+            deadBookies = placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies);
         }
 
         if (bk.getBookieClient() != null) {
@@ -178,12 +182,16 @@ class BookieWatcher implements Watcher, 
 
     /**
      * Wrapper over the {@link #getAdditionalBookies(Set, int)} method when there is no exclusion list (or exisiting bookies)
-     * @param numBookiesNeeded
-     * @return
+     * @param ensembleSize
+     *          Ensemble Size
+     * @param writeQuorumSize
+     *          Write Quorum Size
+     * @return list of bookies for new ensemble.
      * @throws BKNotEnoughBookiesException
      */
-    public ArrayList<InetSocketAddress> getNewBookies(int numBookiesNeeded) throws BKNotEnoughBookiesException {
-        return getAdditionalBookies(EMPTY_SET, numBookiesNeeded);
+    public ArrayList<InetSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize)
+            throws BKNotEnoughBookiesException {
+        return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, EMPTY_SET);
     }
 
     /**
@@ -192,49 +200,10 @@ class BookieWatcher implements Watcher, 
      * @return
      * @throws BKNotEnoughBookiesException
      */
-    public InetSocketAddress getAdditionalBookie(List<InetSocketAddress> existingBookies)
+    public InetSocketAddress replaceBookie(List<InetSocketAddress> existingBookies, int bookieIdx)
             throws BKNotEnoughBookiesException {
-        return getAdditionalBookies(new HashSet<InetSocketAddress>(existingBookies), 1).get(0);
-    }
-
-    /**
-     * Returns additional bookies given an exclusion list and how many are needed
-     * @param existingBookies
-     * @param numAdditionalBookiesNeeded
-     * @return
-     * @throws BKNotEnoughBookiesException
-     */
-    public ArrayList<InetSocketAddress> getAdditionalBookies(Set<InetSocketAddress> existingBookies,
-            int numAdditionalBookiesNeeded) throws BKNotEnoughBookiesException {
-
-        ArrayList<InetSocketAddress> newBookies = new ArrayList<InetSocketAddress>();
-
-        if (numAdditionalBookiesNeeded <= 0) {
-            return newBookies;
-        }
-
-        List<InetSocketAddress> allBookies;
-
-        synchronized (this) {
-            allBookies = new ArrayList<InetSocketAddress>(knownBookies);
-        }
-
-        Collections.shuffle(allBookies);
-
-        for (InetSocketAddress bookie : allBookies) {
-            if (existingBookies.contains(bookie)) {
-                continue;
-            }
-
-            newBookies.add(bookie);
-            numAdditionalBookiesNeeded--;
-
-            if (numAdditionalBookiesNeeded == 0) {
-                return newBookies;
-            }
-        }
-
-        throw new BKNotEnoughBookiesException();
+        InetSocketAddress addr = existingBookies.get(bookieIdx);
+        return placementPolicy.replaceBookie(addr, new HashSet<InetSocketAddress>(existingBookies));
     }
 
     /**

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * Default Ensemble Placement Policy, which picks bookies randomly
+ */
+public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
+
+    static final Set<InetSocketAddress> EMPTY_SET = new HashSet<InetSocketAddress>();
+
+    private Set<InetSocketAddress> knownBookies = new HashSet<InetSocketAddress>();
+
+    @Override
+    public ArrayList<InetSocketAddress> newEnsemble(int ensembleSize, int quorumSize,
+            Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+        ArrayList<InetSocketAddress> newBookies = new ArrayList<InetSocketAddress>(ensembleSize);
+        if (ensembleSize <= 0) {
+            return newBookies;
+        }
+        List<InetSocketAddress> allBookies;
+        synchronized (this) {
+            allBookies = new ArrayList<InetSocketAddress>(knownBookies);
+        }
+        Collections.shuffle(allBookies);
+        for (InetSocketAddress bookie : allBookies) {
+            if (excludeBookies.contains(bookie)) {
+                continue;
+            }
+            newBookies.add(bookie);
+            --ensembleSize;
+            if (ensembleSize == 0) {
+                return newBookies;
+            }
+        }
+        throw new BKNotEnoughBookiesException();
+    }
+
+    @Override
+    public InetSocketAddress replaceBookie(InetSocketAddress bookieToReplace,
+            Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+        ArrayList<InetSocketAddress> addresses = newEnsemble(1, 1, excludeBookies);
+        return addresses.get(0);
+    }
+
+    @Override
+    public synchronized Set<InetSocketAddress> onClusterChanged(Set<InetSocketAddress> writableBookies,
+            Set<InetSocketAddress> readOnlyBookies) {
+        HashSet<InetSocketAddress> deadBookies;
+        deadBookies = new HashSet<InetSocketAddress>(knownBookies);
+        deadBookies.removeAll(writableBookies);
+        // readonly bookies should not be treated as dead bookies
+        deadBookies.removeAll(readOnlyBookies);
+        knownBookies = writableBookies;
+        return deadBookies;
+    }
+
+    @Override
+    public EnsemblePlacementPolicy initialize(Configuration conf) {
+        return this;
+    }
+
+    @Override
+    public void uninitalize() {
+        // do nothing
+    }
+
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * Encapsulation of the algorithm that selects a number of bookies from the cluster as an ensemble for storing
+ * data, based on the data input as well as the node properties.
+ */
+public interface EnsemblePlacementPolicy {
+
+    /**
+     * Initialize the policy.
+     *
+     * @param conf
+     *          client configuration.
+     * @return initialized ensemble placement policy
+     */
+    public EnsemblePlacementPolicy initialize(Configuration conf);
+
+    /**
+     * Uninitialize the policy
+     */
+    public void uninitalize();
+
+    /**
+     * A consistent view of the cluster (what bookies are available as writable, what bookies are available as
+     * readonly) is updated when any changes happen in the cluster.
+     *
+     * @param writableBookies
+     *          All the bookies in the cluster available for write/read.
+     * @param readOnlyBookies
+     *          All the bookies in the cluster available for readonly.
+     * @return the dead bookies during this cluster change.
+     */
+    public Set<InetSocketAddress> onClusterChanged(Set<InetSocketAddress> writableBookies,
+            Set<InetSocketAddress> readOnlyBookies);
+
+    /**
+     * Choose <i>numBookies</i> bookies for ensemble. If the count is more than the number of available
+     * nodes, {@link BKNotEnoughBookiesException} is thrown.
+     *
+     * @param ensembleSize
+     *          Ensemble Size
+     * @param writeQuorumSize
+     *          Write Quorum Size
+     * @param excludeBookies
+     *          Bookies that should not be considered as targets.
+     * @return list of bookies chosen as targets.
+     * @throws BKNotEnoughBookiesException if not enough bookies available.
+     */
+    public ArrayList<InetSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
+            Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
+
+    /**
+     * Choose a new bookie to replace <i>bookieToReplace</i>. If no bookie available in the cluster,
+     * {@link BKNotEnoughBookiesException} is thrown.
+     *
+     * @param bookieToReplace
+     *          bookie to replace
+     * @param excludeBookies
+     *          bookies that should not be considered as candidate.
+     * @return the bookie chosen as target.
+     * @throws BKNotEnoughBookiesException
+     */
+    public InetSocketAddress replaceBookie(InetSocketAddress bookieToReplace,
+            Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java?rev=1492274&r1=1492273&r2=1492274&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java Wed Jun 12 16:41:47 2013
@@ -92,7 +92,8 @@ class LedgerCreateOp implements GenericC
 
         ArrayList<InetSocketAddress> ensemble;
         try {
-            ensemble = bk.bookieWatcher.getNewBookies(metadata.getEnsembleSize());
+            ensemble = bk.bookieWatcher
+                    .newEnsemble(metadata.getEnsembleSize(), metadata.getWriteQuorumSize());
         } catch (BKNotEnoughBookiesException e) {
             LOG.error("Not enough bookies to create ledger");
             cb.createComplete(e.getCode(), null, this.ctx);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1492274&r1=1492273&r2=1492274&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Wed Jun 12 16:41:47 2013
@@ -655,7 +655,7 @@ public class LedgerHandle {
 
         // avoid parallel ensemble changes to same ensemble.
         synchronized (metadata) {
-            newBookie = bk.bookieWatcher.getAdditionalBookie(metadata.currentEnsemble);
+            newBookie = bk.bookieWatcher.replaceBookie(metadata.currentEnsemble, bookieIndex);
 
             newEnsemble.addAll(metadata.currentEnsemble);
             newEnsemble.set(bookieIndex, newBookie);

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.conf.Configurable;
+import org.apache.bookkeeper.net.CachedDNSToSwitchMapping;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.net.Node;
+import org.apache.bookkeeper.net.NodeBase;
+import org.apache.bookkeeper.net.ScriptBasedMapping;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Simple rackware ensemble placement policy.
+ *
+ * Make most of the class and methods as protected, so it could be extended to implement other algorithms.
+ */
+public class RackawareEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
+
+    static final Logger LOG = LoggerFactory.getLogger(RackawareEnsemblePlacementPolicy.class);
+
+    public static final String REPP_DNS_RESOLVER_CLASS = "reppDnsResolverClass";
+
+    /**
+     * Predicate used when choosing an ensemble.
+     */
+    protected static interface Predicate {
+        boolean apply(BookieNode candidate, Ensemble chosenBookies);
+    }
+
+    /**
+     * Ensemble used to hold the result of an ensemble selected for placement.
+     */
+    protected static interface Ensemble {
+
+        /**
+         * Append the new bookie node to the ensemble.
+         *
+         * @param node
+         *          new candidate bookie node.
+         */
+        public void addBookie(BookieNode node);
+
+        /**
+         * @return list of addresses representing the ensemble
+         */
+        public ArrayList<InetSocketAddress> toList();
+    }
+
+    protected static class TruePredicate implements Predicate {
+
+        public static final TruePredicate instance = new TruePredicate();
+
+        @Override
+        public boolean apply(BookieNode candidate, Ensemble chosenNodes) {
+            return true;
+        }
+
+    }
+
+    protected static class EnsembleForReplacement implements Ensemble {
+
+        public static final EnsembleForReplacement instance = new EnsembleForReplacement();
+        static final ArrayList<InetSocketAddress> EMPTY_LIST = new ArrayList<InetSocketAddress>(0);
+
+        @Override
+        public void addBookie(BookieNode node) {
+            // do nothing
+        }
+
+        @Override
+        public ArrayList<InetSocketAddress> toList() {
+            return EMPTY_LIST;
+        }
+
+    }
+
+    /**
+     * A predicate checking the rack coverage for write quorum in {@link RoundRobinDistributionSchedule},
+     * which ensures that a write quorum should be covered by at least two racks.
+     */
+    protected static class RRRackCoverageEnsemble implements Predicate, Ensemble {
+
+        class QuorumCoverageSet {
+            Set<String> racks = new HashSet<String>();
+            int seenBookies = 0;
+
+            boolean apply(BookieNode candidate) {
+                if (seenBookies + 1 == writeQuorumSize) {
+                    return racks.size() > (racks.contains(candidate.getNetworkLocation()) ? 1 : 0);
+                }
+                return true;
+            }
+
+            void addBookie(BookieNode candidate) {
+                ++seenBookies;
+                racks.add(candidate.getNetworkLocation());
+            }
+        }
+
+        final int ensembleSize;
+        final int writeQuorumSize;
+        final ArrayList<BookieNode> chosenNodes;
+        private final QuorumCoverageSet[] quorums;
+
+        protected RRRackCoverageEnsemble(int ensembleSize, int writeQuorumSize) {
+            this.ensembleSize = ensembleSize;
+            this.writeQuorumSize = writeQuorumSize;
+            this.chosenNodes = new ArrayList<BookieNode>(ensembleSize);
+            this.quorums = new QuorumCoverageSet[ensembleSize];
+        }
+
+        @Override
+        public boolean apply(BookieNode candidate, Ensemble ensemble) {
+            if (ensemble != this) {
+                return false;
+            }
+            // candidate position
+            int candidatePos = chosenNodes.size();
+            int startPos = candidatePos - writeQuorumSize + 1;
+            for (int i = startPos; i <= candidatePos; i++) {
+                int idx = (i + ensembleSize) % ensembleSize;
+                if (null == quorums[idx]) {
+                    quorums[idx] = new QuorumCoverageSet();
+                }
+                if (!quorums[idx].apply(candidate)) {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        @Override
+        public void addBookie(BookieNode node) {
+            int candidatePos = chosenNodes.size();
+            int startPos = candidatePos - writeQuorumSize + 1;
+            for (int i = startPos; i <= candidatePos; i++) {
+                int idx = (i + ensembleSize) % ensembleSize;
+                if (null == quorums[idx]) {
+                    quorums[idx] = new QuorumCoverageSet();
+                }
+                quorums[idx].addBookie(node);
+            }
+            chosenNodes.add(node);
+        }
+
+        @Override
+        public ArrayList<InetSocketAddress> toList() {
+            ArrayList<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>(ensembleSize);
+            for (BookieNode bn : chosenNodes) {
+                addresses.add(bn.getAddr());
+            }
+            return addresses;
+        }
+
+        @Override
+        public String toString() {
+            return chosenNodes.toString();
+        }
+
+    }
+
+    protected static class BookieNode implements Node {
+
+        private final InetSocketAddress addr; // identifier of a bookie node.
+
+        private int level; // the level in topology tree
+        private Node parent; // its parent in topology tree
+        private String location = NetworkTopology.DEFAULT_RACK; // its network location
+        private final String name;
+
+        BookieNode(InetSocketAddress addr, String networkLoc) {
+            this.addr = addr;
+            this.name = StringUtils.addrToString(addr);
+            setNetworkLocation(networkLoc);
+        }
+
+        public InetSocketAddress getAddr() {
+            return addr;
+        }
+
+        @Override
+        public int getLevel() {
+            return level;
+        }
+
+        @Override
+        public void setLevel(int level) {
+            this.level = level;
+        }
+
+        @Override
+        public Node getParent() {
+            return parent;
+        }
+
+        @Override
+        public void setParent(Node parent) {
+            this.parent = parent;
+        }
+
+        @Override
+        public String getName() {
+            return name;
+        }
+
+        @Override
+        public String getNetworkLocation() {
+            return location;
+        }
+
+        @Override
+        public void setNetworkLocation(String location) {
+            this.location = location;
+        }
+
+        @Override
+        public int hashCode() {
+            return name.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof BookieNode)) {
+                return false;
+            }
+            BookieNode other = (BookieNode) obj;
+            return getName().equals(other.getName());
+        }
+
+        @Override
+        public String toString() {
+            return String.format("<Bookie:%s>", name);
+        }
+
+    }
+
+    static class DefaultResolver implements DNSToSwitchMapping {
+
+        @Override
+        public List<String> resolve(List<String> names) {
+            List<String> rNames = new ArrayList<String>(names.size());
+            for (@SuppressWarnings("unused") String name : names) {
+                rNames.add(NetworkTopology.DEFAULT_RACK);
+            }
+            return rNames;
+        }
+
+        @Override
+        public void reloadCachedMappings() {
+            // nop
+        }
+
+    };
+
+    // for now, we just maintain the writable bookies' topology
+    private final NetworkTopology topology;
+    private DNSToSwitchMapping dnsResolver;
+    private final Map<InetSocketAddress, BookieNode> knownBookies;
+    private BookieNode localNode;
+    private final ReentrantReadWriteLock rwLock;
+
+    public RackawareEnsemblePlacementPolicy() {
+        topology = new NetworkTopology();
+        knownBookies = new HashMap<InetSocketAddress, BookieNode>();
+
+        rwLock = new ReentrantReadWriteLock();
+    }
+
+    private BookieNode createBookieNode(InetSocketAddress addr) {
+        return new BookieNode(addr, resolveNetworkLocation(addr));
+    }
+
+    @Override
+    public EnsemblePlacementPolicy initialize(Configuration conf) {
+        String dnsResolverName = conf.getString(REPP_DNS_RESOLVER_CLASS, ScriptBasedMapping.class.getName());
+        try {
+            dnsResolver = ReflectionUtils.newInstance(dnsResolverName, DNSToSwitchMapping.class);
+            if (dnsResolver instanceof Configurable) {
+                ((Configurable) dnsResolver).setConf(conf);
+            }
+        } catch (RuntimeException re) {
+            LOG.info("Failed to initialize DNS Resolver {}, used default subnet resolver.", dnsResolverName, re);
+            dnsResolver = new DefaultResolver();
+        }
+
+        BookieNode bn;
+        try {
+            bn = createBookieNode(new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), 0));
+        } catch (UnknownHostException e) {
+            LOG.error("Failed to get local host address : ", e);
+            bn = null;
+        }
+        localNode = bn;
+        LOG.info("Initialize rackaware ensemble placement policy @ {} : {}.", localNode,
+                dnsResolver.getClass().getName());
+        return this;
+    }
+
+    @Override
+    public void uninitalize() {
+        // do nothing
+    }
+
+    private String resolveNetworkLocation(InetSocketAddress addr) {
+        List<String> names = new ArrayList<String>(1);
+        if (dnsResolver instanceof CachedDNSToSwitchMapping) {
+            names.add(addr.getAddress().getHostAddress());
+        } else {
+            names.add(addr.getHostName());
+        }
+        // resolve network addresses
+        List<String> rNames = dnsResolver.resolve(names);
+        String netLoc;
+        if (null == rNames) {
+            LOG.warn("Failed to resolve network location for {}, using default rack for them : {}.", names,
+                    NetworkTopology.DEFAULT_RACK);
+            netLoc = NetworkTopology.DEFAULT_RACK;
+        } else {
+            netLoc = rNames.get(0);
+        }
+        return netLoc;
+    }
+
+    @Override
+    public Set<InetSocketAddress> onClusterChanged(Set<InetSocketAddress> writableBookies,
+            Set<InetSocketAddress> readOnlyBookies) {
+        rwLock.writeLock().lock();
+        try {
+            Set<InetSocketAddress> joinedBookies, leftBookies, deadBookies;
+            Set<InetSocketAddress> oldBookieSet = knownBookies.keySet();
+            // left bookies : bookies in known bookies, but not in new writable bookie cluster.
+            leftBookies = Sets.difference(oldBookieSet, writableBookies);
+            // joined bookies : bookies in new writable bookie cluster, but not in known bookies
+            joinedBookies = Sets.difference(writableBookies, oldBookieSet);
+            // dead bookies.
+            deadBookies = Sets.difference(leftBookies, readOnlyBookies);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "Cluster changed : left bookies are {}, joined bookies are {}, while dead bookies are {}.",
+                        new Object[] { leftBookies, joinedBookies, deadBookies });
+            }
+
+            // node left
+            for (InetSocketAddress addr : leftBookies) {
+                BookieNode node = knownBookies.remove(addr);
+                topology.remove(node);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Cluster changed : bookie {} left from cluster.", addr);
+                }
+            }
+
+            // node joined
+            for (InetSocketAddress addr : joinedBookies) {
+                BookieNode node = createBookieNode(addr);
+                topology.add(node);
+                knownBookies.put(addr, node);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Cluster changed : bookie {} joined the cluster.", addr);
+                }
+            }
+
+            return deadBookies;
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    private Set<Node> convertBookiesToNodes(Set<InetSocketAddress> excludeBookies) {
+        Set<Node> nodes = new HashSet<Node>();
+        for (InetSocketAddress addr : excludeBookies) {
+            BookieNode bn = knownBookies.get(addr);
+            if (null == bn) {
+                bn = createBookieNode(addr);
+            }
+            nodes.add(bn);
+        }
+        return nodes;
+    }
+
+    @Override
+    public ArrayList<InetSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
+            Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+        rwLock.readLock().lock();
+        try {
+            Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+            RRRackCoverageEnsemble ensemble = new RRRackCoverageEnsemble(ensembleSize, writeQuorumSize);
+            BookieNode prevNode = null;
+            int numRacks = topology.getNumOfRacks();
+            // only one rack, use the random algorithm.
+            if (numRacks < 2) {
+                List<BookieNode> bns = selectRandom(ensembleSize, excludeNodes,
+                        EnsembleForReplacement.instance);
+                ArrayList<InetSocketAddress> addrs = new ArrayList<InetSocketAddress>(ensembleSize);
+                for (BookieNode bn : bns) {
+                    addrs.add(bn.addr);
+                }
+                return addrs;
+            }
+            // pick nodes by racks, to ensure there is at least two racks per write quorum.
+            for (int i = 0; i < ensembleSize; i++) {
+                String curRack;
+                if (null == prevNode) {
+                    if (null == localNode) {
+                        curRack = NodeBase.ROOT;
+                    } else {
+                        curRack = localNode.getNetworkLocation();
+                    }
+                } else {
+                    curRack = "~" + prevNode.getNetworkLocation();
+                }
+                prevNode = selectFromRack(curRack, excludeNodes, ensemble, ensemble);
+            }
+            return ensemble.toList();
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public InetSocketAddress replaceBookie(InetSocketAddress bookieToReplace,
+            Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+        rwLock.readLock().lock();
+        try {
+            BookieNode bn = knownBookies.get(bookieToReplace);
+            if (null == bn) {
+                bn = createBookieNode(bookieToReplace);
+            }
+
+            Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+            // add the bookie to replace in exclude set
+            excludeNodes.add(bn);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Try to choose a new bookie to replace {}, excluding {}.", bookieToReplace,
+                        excludeNodes);
+            }
+            // pick a candidate from same rack to replace
+            BookieNode candidate = selectFromRack(bn.getNetworkLocation(), excludeNodes,
+                    TruePredicate.instance, EnsembleForReplacement.instance);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bn);
+            }
+            return candidate.addr;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    protected BookieNode selectFromRack(String networkLoc, Set<Node> excludeBookies, Predicate predicate,
+            Ensemble ensemble) throws BKNotEnoughBookiesException {
+        // select one from local rack
+        try {
+            return selectRandomFromRack(networkLoc, excludeBookies, predicate, ensemble);
+        } catch (BKNotEnoughBookiesException e) {
+            LOG.warn("Failed to choose a bookie from {} : "
+                     + "excluded {}, fallback to choose bookie randomly from the cluster.",
+                     networkLoc, excludeBookies);
+            // randomly choose one from whole cluster, ignore the provided predicate.
+            return selectRandom(1, excludeBookies, ensemble).get(0);
+        }
+    }
+
+    protected String getRemoteRack(BookieNode node) {
+        return "~" + node.getNetworkLocation();
+    }
+
+    /**
+     * Choose random node under a given network path.
+     *
+     * @param netPath
+     *          network path
+     * @param excludeBookies
+     *          exclude bookies
+     * @param predicate
+     *          predicate to check whether the target is a good target.
+     * @param ensemble
+     *          ensemble structure
+     * @return chosen bookie.
+     */
+    protected BookieNode selectRandomFromRack(String netPath, Set<Node> excludeBookies, Predicate predicate,
+            Ensemble ensemble) throws BKNotEnoughBookiesException {
+        List<Node> leaves = new ArrayList<Node>(topology.getLeaves(netPath));
+        Collections.shuffle(leaves);
+        for (Node n : leaves) {
+            if (excludeBookies.contains(n)) {
+                continue;
+            }
+            if (!(n instanceof BookieNode) || !predicate.apply((BookieNode) n, ensemble)) {
+                continue;
+            }
+            BookieNode bn = (BookieNode) n;
+            // got a good candidate
+            ensemble.addBookie(bn);
+            // add the candidate to exclude set
+            excludeBookies.add(bn);
+            return bn;
+        }
+        throw new BKNotEnoughBookiesException();
+    }
+
+    /**
+     * Choose a random node from whole cluster.
+     *
+     * @param numBookies
+     *          number bookies to choose
+     * @param excludeBookies
+     *          bookies set to exclude.
+     * @param ensemble
+     *          ensemble to hold the bookie chosen.
+     * @return the bookie node chosen.
+     * @throws BKNotEnoughBookiesException
+     */
+    protected List<BookieNode> selectRandom(int numBookies, Set<Node> excludeBookies, Ensemble ensemble)
+            throws BKNotEnoughBookiesException {
+        List<BookieNode> allBookies = new ArrayList<BookieNode>(knownBookies.values());
+        Collections.shuffle(allBookies);
+        List<BookieNode> newBookies = new ArrayList<BookieNode>(numBookies);
+        for (BookieNode bookie : allBookies) {
+            if (excludeBookies.contains(bookie)) {
+                continue;
+            }
+            ensemble.addBookie(bookie);
+            excludeBookies.add(bookie);
+            newBookies.add(bookie);
+            --numBookies;
+            if (numBookies == 0) {
+                return newBookies;
+            }
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Failed to find {} bookies : excludeBookies {}, allBookies {}.", new Object[] {
+                    numBookies, excludeBookies, allBookies });
+        }
+        throw new BKNotEnoughBookiesException();
+    }
+
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java?rev=1492274&r1=1492273&r2=1492274&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java Wed Jun 12 16:41:47 2013
@@ -38,12 +38,13 @@ public abstract class AbstractConfigurat
 
     static final Logger LOG = LoggerFactory.getLogger(AbstractConfiguration.class);
 
-    private static ClassLoader defaultLoader;
+    protected static final ClassLoader defaultLoader;
     static {
-        defaultLoader = Thread.currentThread().getContextClassLoader();
-        if (null == defaultLoader) {
-            defaultLoader = AbstractConfiguration.class.getClassLoader();
+        ClassLoader loader = Thread.currentThread().getContextClassLoader();
+        if (null == loader) {
+            loader = AbstractConfiguration.class.getClassLoader();
         }
+        defaultLoader = loader;
     }
 
     // Ledger Manager

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java?rev=1492274&r1=1492273&r2=1492274&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java Wed Jun 12 16:41:47 2013
@@ -20,7 +20,10 @@ package org.apache.bookkeeper.conf;
 import java.util.List;
 
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
-
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.lang.StringUtils;
 import static com.google.common.base.Charsets.UTF_8;
 
@@ -49,6 +52,9 @@ public class ClientConfiguration extends
     // Number Woker Threads
     protected final static String NUM_WORKER_THREADS = "numWorkerThreads";
 
+    // Ensemble Placement Policy
+    protected final static String ENSEMBLE_PLACEMENT_POLICY = "ensemblePlacementPolicy";
+
     /**
      * Construct a default client-side configuration
      */
@@ -312,4 +318,28 @@ public class ClientConfiguration extends
         setProperty(SPECULATIVE_READ_TIMEOUT, timeout);
         return this;
     }
+
+    /**
+     * Get Ensemble Placement Policy Class.
+     *
+     * @return ensemble placement policy class.
+     */
+    public Class<? extends EnsemblePlacementPolicy> getEnsemblePlacementPolicy()
+        throws ConfigurationException {
+        return ReflectionUtils.getClass(this, ENSEMBLE_PLACEMENT_POLICY,
+                                        RackawareEnsemblePlacementPolicy.class,
+                                        EnsemblePlacementPolicy.class,
+                                        defaultLoader);
+    }
+
+    /**
+     * Set Ensemble Placement Policy Class.
+     *
+     * @param policyClass
+     *          Ensemble Placement Policy Class.
+     */
+    public ClientConfiguration setEnsemblePlacementPolicy(Class<? extends EnsemblePlacementPolicy> policyClass) {
+        setProperty(ENSEMBLE_PLACEMENT_POLICY, policyClass.getName());
+        return this;
+    }
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/Configurable.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/Configurable.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/Configurable.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/Configurable.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.conf;
+
+import org.apache.commons.configuration.Configuration;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Class that may be configured with a {@link Configuration}.
+ */
+@Beta
+public interface Configurable {
+
+    /**
+     * Set the configuration to be used by this object.
+     *
+     * @param conf
+     *          Configuration object to use
+     */
+    public void setConf(Configuration conf);
+
+    /**
+     * Return the configuration used by this object.
+     *
+     * @return configuration used by this object.
+     */
+    public Configuration getConf();
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.net;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.bookkeeper.conf.Configurable;
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * This is a base class for DNS to Switch mappings. <p/> It is not mandatory to
+ * derive {@link DNSToSwitchMapping} implementations from it, but it is strongly
+ * recommended, as it makes it easy for the Hadoop developers to add new methods
+ * to this base class that are automatically picked up by all implementations.
+ * <p/>
+ *
+ * This class does not extend the <code>Configured</code>
+ * base class, and should not be changed to do so, as it causes problems
+ * for subclasses. The constructor of the <code>Configured</code> calls
+ * the  {@link #setConf(Configuration)} method, which will call into the
+ * subclasses before they have been fully constructed.
+ *
+ */
+public abstract class AbstractDNSToSwitchMapping implements DNSToSwitchMapping, Configurable {
+
+    private Configuration conf;
+
+    /**
+     * Create an unconfigured instance
+     */
+    protected AbstractDNSToSwitchMapping() {
+    }
+
+    /**
+     * Create an instance, caching the configuration file.
+     * This constructor does not call {@link #setConf(Configuration)}; if
+     * a subclass extracts information in that method, it must call it explicitly.
+     * @param conf the configuration
+     */
+    protected AbstractDNSToSwitchMapping(Configuration conf) {
+        this.conf = conf;
+    }
+
+    public Configuration getConf() {
+        return conf;
+    }
+
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    /**
+     * Predicate that indicates that the switch mapping is known to be
+     * single-switch. The base class returns false: it assumes all mappings are
+     * multi-rack. Subclasses may override this with methods that are more aware
+     * of their topologies.
+     *
+     * <p/>
+     *
+     * This method is used when parts of Hadoop need know whether to apply
+     * single rack vs multi-rack policies, such as during block placement.
+     * Such algorithms behave differently if they are on multi-switch systems.
+     * </p>
+     *
+     * @return true if the mapping thinks that it is on a single switch
+     */
+    public boolean isSingleSwitch() {
+        return false;
+    }
+
+    /**
+     * Get a copy of the map (for diagnostics)
+     * @return a clone of the map or null for none known
+     */
+    public Map<String, String> getSwitchMap() {
+        return null;
+    }
+
+    /**
+     * Generate a string listing the switch mapping implementation,
+     * the mapping for every known node and the number of nodes and
+     * unique switches known about -each entry to a separate line.
+     * @return a string that can be presented to the ops team or used in
+     * debug messages.
+     */
+    public String dumpTopology() {
+        Map<String, String> rack = getSwitchMap();
+        StringBuilder builder = new StringBuilder();
+        builder.append("Mapping: ").append(toString()).append("\n");
+        if (rack != null) {
+            builder.append("Map:\n");
+            Set<String> switches = new HashSet<String>();
+            for (Map.Entry<String, String> entry : rack.entrySet()) {
+                builder.append("  ").append(entry.getKey()).append(" -> ").append(entry.getValue()).append("\n");
+                switches.add(entry.getValue());
+            }
+            builder.append("Nodes: ").append(rack.size()).append("\n");
+            builder.append("Switches: ").append(switches.size()).append("\n");
+        } else {
+            builder.append("No topology information");
+        }
+        return builder.toString();
+    }
+
+    protected boolean isSingleSwitchByScriptPolicy() {
+        return conf != null && conf.getString(CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null;
+    }
+
+    /**
+     * Query for a {@link DNSToSwitchMapping} instance being on a single
+     * switch.
+     * <p/>
+     * This predicate simply assumes that all mappings not derived from
+     * this class are multi-switch.
+     * @param mapping the mapping to query
+     * @return true if the base class says it is single switch, or the mapping
+     * is not derived from this class.
+     */
+    public static boolean isMappingSingleSwitch(DNSToSwitchMapping mapping) {
+        return mapping != null && mapping instanceof AbstractDNSToSwitchMapping
+                && ((AbstractDNSToSwitchMapping) mapping).isSingleSwitch();
+    }
+
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.net;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A cached implementation of DNSToSwitchMapping that takes an
+ * raw DNSToSwitchMapping and stores the resolved network location in
+ * a cache. The following calls to a resolved network location
+ * will get its location from the cache.
+ *
+ */
+public class CachedDNSToSwitchMapping extends AbstractDNSToSwitchMapping {
+  private Map<String, String> cache = new ConcurrentHashMap<String, String>();
+
+  /**
+   * The uncached mapping
+   */
+  protected final DNSToSwitchMapping rawMapping;
+
+  /**
+   * cache a raw DNS mapping
+   * @param rawMapping the raw mapping to cache
+   */
+  public CachedDNSToSwitchMapping(DNSToSwitchMapping rawMapping) {
+    this.rawMapping = rawMapping;
+  }
+
+  /**
+   * @param names a list of hostnames to probe for being cached
+   * @return the hosts from 'names' that have not been cached previously
+   */
+  private List<String> getUncachedHosts(List<String> names) {
+    // find out all names without cached resolved location
+    List<String> unCachedHosts = new ArrayList<String>(names.size());
+    for (String name : names) {
+      if (cache.get(name) == null) {
+        unCachedHosts.add(name);
+      }
+    }
+    return unCachedHosts;
+  }
+
+  /**
+   * Caches the resolved host:rack mappings. The two list
+   * parameters must be of equal size.
+   *
+   * @param uncachedHosts a list of hosts that were uncached
+   * @param resolvedHosts a list of resolved host entries where the element
+   * at index(i) is the resolved value for the entry in uncachedHosts[i]
+   */
+  private void cacheResolvedHosts(List<String> uncachedHosts,
+      List<String> resolvedHosts) {
+    // Cache the result
+    if (resolvedHosts != null) {
+      for (int i=0; i<uncachedHosts.size(); i++) {
+        cache.put(uncachedHosts.get(i), resolvedHosts.get(i));
+      }
+    }
+  }
+
+  /**
+   * @param names a list of hostnames to look up (can be be empty)
+   * @return the cached resolution of the list of hostnames/addresses.
+   *  or null if any of the names are not currently in the cache
+   */
+  private List<String> getCachedHosts(List<String> names) {
+    List<String> result = new ArrayList<String>(names.size());
+    // Construct the result
+    for (String name : names) {
+      String networkLocation = cache.get(name);
+      if (networkLocation != null) {
+        result.add(networkLocation);
+      } else {
+        return null;
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public List<String> resolve(List<String> names) {
+    // normalize all input names to be in the form of IP addresses
+    names = NetUtils.normalizeHostNames(names);
+
+    List <String> result = new ArrayList<String>(names.size());
+    if (names.isEmpty()) {
+      return result;
+    }
+
+    List<String> uncachedHosts = getUncachedHosts(names);
+
+    // Resolve the uncached hosts
+    List<String> resolvedHosts = rawMapping.resolve(uncachedHosts);
+    //cache them
+    cacheResolvedHosts(uncachedHosts, resolvedHosts);
+    //now look up the entire list in the cache
+    return getCachedHosts(names);
+
+  }
+
+  /**
+   * Get the (host x switch) map.
+   * @return a copy of the cached map of hosts to rack
+   */
+  @Override
+  public Map<String, String> getSwitchMap() {
+    Map<String, String > switchMap = new HashMap<String, String>(cache);
+    return switchMap;
+  }
+
+
+  @Override
+  public String toString() {
+    return "cached switch mapping relaying to " + rawMapping;
+  }
+
+  /**
+   * Delegate the switch topology query to the raw mapping, via
+   * {@link AbstractDNSToSwitchMapping#isMappingSingleSwitch(DNSToSwitchMapping)}
+   * @return true iff the raw mapper is considered single-switch.
+   */
+  @Override
+  public boolean isSingleSwitch() {
+    return isMappingSingleSwitch(rawMapping);
+  }
+
+  @Override
+  public void reloadCachedMappings() {
+    cache.clear();
+  }
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CommonConfigurationKeys.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CommonConfigurationKeys.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CommonConfigurationKeys.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CommonConfigurationKeys.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.net;
+
+public interface CommonConfigurationKeys {
+
+    // script file name to resolve network topology
+    public static final String NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY = "networkTopologyScriptFileName";
+    // number of arguments that network topology resolve script used
+    public static final String NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY = "networkTopologyScriptNumberArgs";
+    // default value of NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY
+    public static final int NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_DEFAULT = 100;
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.net;
+
+import java.util.List;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * An interface that must be implemented to allow pluggable
+ * DNS-name/IP-address to RackID resolvers.
+ *
+ */
+@Beta
+public interface DNSToSwitchMapping {
+    /**
+     * Resolves a list of DNS-names/IP-addresses and returns back a list of
+     * switch information (network paths). One-to-one correspondence must be
+     * maintained between the elements in the lists.
+     * Consider an element in the argument list - x.y.com. The switch information
+     * that is returned must be a network path of the form /foo/rack,
+     * where / is the root, and 'foo' is the switch where 'rack' is connected.
+     * Note the hostname/ip-address is not part of the returned path.
+     * The network topology of the cluster would determine the number of
+     * components in the network path.
+     * <p/>
+     *
+     * If a name cannot be resolved to a rack, the implementation
+     * should return {@link NetworkTopology#DEFAULT_RACK}. This
+     * is what the bundled implementations do, though it is not a formal requirement
+     *
+     * @param names the list of hosts to resolve (can be empty)
+     * @return list of resolved network paths.
+     * If <i>names</i> is empty, the returned list is also empty
+     */
+    public List<String> resolve(List<String> names);
+
+    /**
+     * Reload all of the cached mappings.
+     *
+     * If there is a cache, this method will clear it, so that future accesses
+     * will get a chance to see the new data.
+     */
+    public void reloadCachedMappings();
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.net;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class NetUtils {
+
+    /**
+     * Given a string representation of a host, return its ip address
+     * in textual presentation.
+     *
+     * @param name a string representation of a host:
+     *             either a textual representation its IP address or its host name
+     * @return its IP address in the string format
+     */
+    public static String normalizeHostName(String name) {
+        try {
+            return InetAddress.getByName(name).getHostAddress();
+        } catch (UnknownHostException e) {
+            return name;
+        }
+    }
+
+    /**
+     * Given a collection of string representation of hosts, return a list of
+     * corresponding IP addresses in the textual representation.
+     *
+     * @param names a collection of string representations of hosts
+     * @return a list of corresponding IP addresses in the string format
+     * @see #normalizeHostName(String)
+     */
+    public static List<String> normalizeHostNames(Collection<String> names) {
+        List<String> hostNames = new ArrayList<String>(names.size());
+        for (String name : names) {
+            hostNames.add(normalizeHostName(name));
+        }
+        return hostNames;
+    }
+
+}