You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2017/12/19 18:44:55 UTC

[GitHub] sijie closed pull request #883: Issue 709: Add Slow Bookkeeper Servers to Placement Policy for read ordering

sijie closed pull request #883: Issue 709: Add Slow Bookkeeper Servers to Placement Policy for read ordering
URL: https://github.com/apache/bookkeeper/pull/883
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 2e873984b..7c6bad93d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -28,6 +28,7 @@
 import java.util.Set;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.BookiesHealthInfo;
 import org.apache.bookkeeper.client.DistributionSchedule;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -87,10 +88,15 @@ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize,
         throw new BKNotEnoughBookiesException();
     }
 
+    @Override
+    public void registerSlowBookie(BookieSocketAddress bookieSocketAddress, long entryId) {
+        return;
+    }
+
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return null;
     }
@@ -98,7 +104,7 @@ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize,
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return null;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookiesHealthInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookiesHealthInfo.java
new file mode 100644
index 000000000..b04044075
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookiesHealthInfo.java
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import org.apache.bookkeeper.net.BookieSocketAddress;
+
+/**
+ * This interface returns heuristics used to determine the health of a Bookkeeper server for read
+ * ordering.
+ */
+public interface BookiesHealthInfo {
+
+    /**
+     * Return the failure history for a bookie.
+     *
+     * @param bookieSocketAddress
+     * @return failed entries on a bookie, -1 if there have been no failures
+     */
+    long getBookieFailureHistory(BookieSocketAddress bookieSocketAddress);
+
+    /**
+     * Returns pending requests to a bookie.
+     *
+     * @param bookieSocketAddress
+     * @return number of pending requests
+     */
+    long getBookiePendingRequests(BookieSocketAddress bookieSocketAddress);
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index eab8954bb..219086df4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -152,10 +152,15 @@ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize,
         }
     }
 
+    @Override
+    public void registerSlowBookie(BookieSocketAddress bookieSocketAddress, long entryId) {
+        return;
+    }
+
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return writeSet;
     }
@@ -163,7 +168,7 @@ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize,
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         writeSet.addMissingIndices(ensemble.size());
         return writeSet;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 855ee7590..7656a4960 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -20,12 +20,12 @@
 import io.netty.util.HashedWheelTimer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.DistributionSchedule.WriteSet;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -151,7 +151,8 @@
  *
  * <h3>How to choose bookies to do speculative reads?</h3>
  *
- * <p>{@link #reorderReadSequence(ArrayList, List, Map)} and {@link #reorderReadLACSequence(ArrayList, List, Map)} are
+ * <p>{@link #reorderReadSequence(ArrayList, BookiesHealthInfo, WriteSet)} and
+ * {@link #reorderReadLACSequence(ArrayList, BookiesHealthInfo, WriteSet)} are
  * two methods exposed by the placement policy, to help client determine a better read sequence according to the
  * network topology and the bookie failure history.
  *
@@ -289,13 +290,23 @@ BookieSocketAddress replaceBookie(int ensembleSize,
                                       Set<BookieSocketAddress> excludeBookies)
         throws BKNotEnoughBookiesException;
 
+    /**
+     * Register a bookie as slow so that it is tried after available and read-only bookies.
+     *
+     * @param bookieSocketAddress
+     *          Address of bookie host
+     * @param entryId
+     *          Entry ID that caused a speculative timeout on the bookie.
+     */
+    void registerSlowBookie(BookieSocketAddress bookieSocketAddress, long entryId);
+
     /**
      * Reorder the read sequence of a given write quorum <i>writeSet</i>.
      *
      * @param ensemble
      *          Ensemble to read entries.
-     * @param bookieFailureHistory
-     *          Observed failures on the bookies
+     * @param bookiesHealthInfo
+     *          Health info for bookies
      * @param writeSet
      *          Write quorum to read entries. This will be modified, rather than
      *          allocating a new WriteSet.
@@ -305,7 +316,7 @@ BookieSocketAddress replaceBookie(int ensembleSize,
      */
     DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet);
 
 
@@ -314,8 +325,8 @@ BookieSocketAddress replaceBookie(int ensembleSize,
      *
      * @param ensemble
      *          Ensemble to read entries.
-     * @param bookieFailureHistory
-     *          Observed failures on the bookies
+     * @param bookiesHealthInfo
+     *          Health info for bookies
      * @param writeSet
      *          Write quorum to read entries. This will be modified, rather than
      *          allocating a new WriteSet.
@@ -325,7 +336,7 @@ BookieSocketAddress replaceBookie(int ensembleSize,
      */
     DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet);
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 32eeab812..d7615302b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -77,6 +77,7 @@
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.TimedGenericCallback;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
+import org.apache.bookkeeper.proto.PerChannelBookieClientPool;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
@@ -106,6 +107,7 @@
     final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory;
     final boolean enableParallelRecoveryRead;
     final int recoveryReadBatchSize;
+    final BookiesHealthInfo bookiesHealthInfo;
     final EnumSet<WriteFlag> writeFlags;
     ScheduledFuture<?> timeoutFuture = null;
 
@@ -174,6 +176,19 @@ public Long load(BookieSocketAddress key) {
                 return -1L;
             }
         });
+        this.bookiesHealthInfo = new BookiesHealthInfo() {
+            @Override
+            public long getBookieFailureHistory(BookieSocketAddress bookieSocketAddress) {
+                Long lastFailure = bookieFailureHistory.getIfPresent(bookieSocketAddress);
+                return lastFailure == null ? -1L : lastFailure;
+            }
+
+            @Override
+            public long getBookiePendingRequests(BookieSocketAddress bookieSocketAddress) {
+                PerChannelBookieClientPool pcbcPool = bk.bookieClient.lookupClient(bookieSocketAddress);
+                return pcbcPool == null ? 0 : pcbcPool.getNumPendingCompletionRequests();
+            }
+        };
 
         ensembleChangeCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.ENSEMBLE_CHANGES);
         lacUpdateHitsCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_HITS);
@@ -333,6 +348,15 @@ DistributionSchedule getDistributionSchedule() {
         return distributionSchedule;
     }
 
+    /**
+     * Get the health info for bookies for this ledger.
+     *
+     * @return BookiesHealthInfo for every bookie in the write set.
+     */
+    BookiesHealthInfo getBookiesHealthInfo() {
+        return bookiesHealthInfo;
+    }
+
     void writeLedgerConfig(GenericCallback<Void> writeCb) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Writing metadata to ledger manager: {}, {}", this.ledgerId, metadata.getVersion());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 29202315b..32816bf17 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -86,16 +86,18 @@
         final ArrayList<BookieSocketAddress> ensemble;
         final DistributionSchedule.WriteSet writeSet;
         final LedgerEntryImpl entryImpl;
+        final long eId;
 
         LedgerEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
             this.entryImpl = LedgerEntryImpl.create(lId, eId);
             this.ensemble = ensemble;
+            this.eId = eId;
 
             if (lh.bk.isReorderReadSequence()) {
                 writeSet = lh.bk.getPlacementPolicy()
                     .reorderReadSequence(
                             ensemble,
-                            lh.bookieFailureHistory.asMap(),
+                            lh.getBookiesHealthInfo(),
                             lh.distributionSchedule.getWriteSet(eId));
             } else {
                 writeSet = lh.distributionSchedule.getWriteSet(eId);
@@ -420,6 +422,21 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress
             }
         }
 
+        @Override
+        boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf buffer) {
+            boolean completed = super.complete(bookieIndex, host, buffer);
+            if (completed) {
+                int numReplicasTried = getNextReplicaIndexToReadFrom();
+                // Check if any speculative reads were issued and mark any slow bookies before
+                // the first successful speculative read as "slow"
+                for (int i = 0; i < numReplicasTried - 1; i++) {
+                    int slowBookieIndex = writeSet.get(i);
+                    BookieSocketAddress slowBookieSocketAddress = ensemble.get(slowBookieIndex);
+                    lh.bk.placementPolicy.registerSlowBookie(slowBookieSocketAddress, eId);
+                }
+            }
+            return completed;
+        }
     }
 
     PendingReadOp(LedgerHandle lh,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index db8797a18..1691f1ec1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -121,18 +121,18 @@ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize,
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
-        return super.reorderReadSequence(ensemble, bookieFailureHistory,
+        return super.reorderReadSequence(ensemble, bookiesHealthInfo,
                                          writeSet);
     }
 
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
-        return super.reorderReadLACSequence(ensemble, bookieFailureHistory,
+        return super.reorderReadLACSequence(ensemble, bookiesHealthInfo,
                                             writeSet);
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index f0572a696..735324bca 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -18,7 +18,11 @@
 package org.apache.bookkeeper.client;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.UNKNOWN_REGION;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
@@ -37,6 +41,7 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 
@@ -87,7 +92,8 @@
     static final int REMOTE_MASK      = 0x04 << 24;
     static final int REMOTE_FAIL_MASK = 0x08 << 24;
     static final int READ_ONLY_MASK   = 0x10 << 24;
-    static final int UNAVAIL_MASK     = 0x20 << 24;
+    static final int SLOW_MASK        = 0x20 << 24;
+    static final int UNAVAIL_MASK     = 0x40 << 24;
     static final int MASK_BITS        = 0xFFF << 20;
 
     static class DefaultResolver implements DNSToSwitchMapping {
@@ -183,6 +189,8 @@ public void reloadCachedMappings() {
     protected DNSToSwitchMapping dnsResolver;
     protected HashedWheelTimer timer;
     protected final Map<BookieSocketAddress, BookieNode> knownBookies;
+    // Use a loading cache so slow bookies are expired. Use entryId as values.
+    protected Cache<BookieSocketAddress, Long> slowBookies;
     protected BookieNode localNode;
     protected final ReentrantReadWriteLock rwLock;
     protected ImmutableSet<BookieSocketAddress> readOnlyBookies = null;
@@ -302,6 +310,14 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
                 dnsResolver = new DefaultResolver(() -> this.getDefaultRack());
             }
         }
+        slowBookies = CacheBuilder.newBuilder()
+            .expireAfterWrite(conf.getBookieFailureHistoryExpirationMSec(), TimeUnit.MILLISECONDS)
+            .build(new CacheLoader<BookieSocketAddress, Long>() {
+                @Override
+                public Long load(BookieSocketAddress key) throws Exception {
+                    return -1L;
+                }
+            });
         return initialize(
                 dnsResolver,
                 timer,
@@ -789,35 +805,105 @@ protected BookieNode selectRandomFromRack(String netPath, Set<Node> excludeBooki
         throw new BKNotEnoughBookiesException();
     }
 
+    @Override
+    public void registerSlowBookie(BookieSocketAddress bookieSocketAddress, long entryId) {
+        slowBookies.put(bookieSocketAddress, entryId);
+    }
+
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
+        Map<Integer, String> writeSetWithRegion = new HashMap<>();
+        for (int i = 0; i < writeSet.size(); i++) {
+            writeSetWithRegion.put(writeSet.get(i), "");
+        }
+        return reorderReadSequenceWithRegion(
+            ensemble, writeSet, writeSetWithRegion, bookiesHealthInfo, false, "", writeSet.size());
+    }
+
+    /**
+     * This function orders the read sequence with a given region. For region-unaware policies (e.g.
+     * RackAware), we pass in false for regionAware and an empty myRegion. When this happens, any
+     * remote list will stay empty. The ordering is as follows (the R* at the beginning of each list item
+     * is only present for region aware policies).
+     *      1. available (local) bookies
+     *      2. R* a remote bookie (based on remoteNodeInReorderSequence
+     *      3. R* remaining (local) bookies
+     *      4. R* remaining remote bookies
+     *      5. read only bookies
+     *      6. slow bookies
+     *      7. unavailable bookies
+     *
+     * @param ensemble
+     *          ensemble of bookies
+     * @param writeSet
+     *          write set
+     * @param writeSetWithRegion
+     *          write set with region information
+     * @param bookiesHealthInfo
+     *          heuristics about health of boookies
+     * @param regionAware
+     *          whether or not a region-aware policy is used
+     * @param myRegion
+     *          current region of policy
+     * @param remoteNodeInReorderSequence
+     *          number of local bookies to try before trying a remote bookie
+     * @return ordering of bookies to send read to
+     */
+    DistributionSchedule.WriteSet reorderReadSequenceWithRegion(
+        ArrayList<BookieSocketAddress> ensemble,
+        DistributionSchedule.WriteSet writeSet,
+        Map<Integer, String> writeSetWithRegion,
+        BookiesHealthInfo bookiesHealthInfo,
+        boolean regionAware,
+        String myRegion,
+        int remoteNodeInReorderSequence) {
+        boolean useRegionAware = regionAware && (!myRegion.equals(UNKNOWN_REGION));
         int ensembleSize = ensemble.size();
 
         for (int i = 0; i < writeSet.size(); i++) {
             int idx = writeSet.get(i);
             BookieSocketAddress address = ensemble.get(idx);
-            Long lastFailedEntryOnBookie = bookieFailureHistory.get(address);
+            String region = writeSetWithRegion.get(idx);
+            Long lastFailedEntryOnBookie = bookiesHealthInfo.getBookieFailureHistory(address);
             if (null == knownBookies.get(address)) {
                 // there isn't too much differences between readonly bookies
                 // from unavailable bookies. since there
                 // is no write requests to them, so we shouldn't try reading
-                // from readonly bookie in prior to writable
-                // bookies.
+                // from readonly bookie prior to writable bookies.
                 if ((null == readOnlyBookies)
-                        || !readOnlyBookies.contains(address)) {
+                    || !readOnlyBookies.contains(address)) {
                     writeSet.set(i, idx | UNAVAIL_MASK);
                 } else {
-                    writeSet.set(i, idx | READ_ONLY_MASK);
+                    if (slowBookies.getIfPresent(address) != null) {
+                        long numPendingReqs = bookiesHealthInfo.getBookiePendingRequests(address);
+                        // use slow bookies with less pending requests first
+                        long slowIdx = numPendingReqs * ensembleSize + idx;
+                        writeSet.set(i, (int) (slowIdx & ~MASK_BITS) | SLOW_MASK);
+                    } else {
+                        writeSet.set(i, idx | READ_ONLY_MASK);
+                    }
+                }
+            } else if (lastFailedEntryOnBookie < 0) {
+                if (slowBookies.getIfPresent(address) != null) {
+                    long numPendingReqs = bookiesHealthInfo.getBookiePendingRequests(address);
+                    long slowIdx = numPendingReqs * ensembleSize + idx;
+                    writeSet.set(i, (int) (slowIdx & ~MASK_BITS) | SLOW_MASK);
+                } else {
+                    if (useRegionAware && !myRegion.equals(region)) {
+                        writeSet.set(i, idx | REMOTE_MASK);
+                    } else {
+                        writeSet.set(i, idx | LOCAL_MASK);
+                    }
                 }
             } else {
-                if ((lastFailedEntryOnBookie == null)
-                        || (lastFailedEntryOnBookie < 0)) {
-                    writeSet.set(i, idx | LOCAL_MASK);
+                // use bookies with earlier failed entryIds first
+                long failIdx = lastFailedEntryOnBookie * ensembleSize + idx;
+                if (useRegionAware && !myRegion.equals(region)) {
+                    writeSet.set(i, (int) (failIdx & ~MASK_BITS) | REMOTE_FAIL_MASK);
                 } else {
-                    long failIdx = lastFailedEntryOnBookie * ensembleSize + idx;
                     writeSet.set(i, (int) (failIdx & ~MASK_BITS) | LOCAL_FAIL_MASK);
                 }
             }
@@ -836,15 +922,54 @@ protected BookieNode selectRandomFromRack(String netPath, Set<Node> excludeBooki
 
         if (reorderReadsRandom) {
             shuffleWithMask(writeSet, LOCAL_MASK, MASK_BITS);
+            shuffleWithMask(writeSet, REMOTE_MASK, MASK_BITS);
             shuffleWithMask(writeSet, READ_ONLY_MASK, MASK_BITS);
             shuffleWithMask(writeSet, UNAVAIL_MASK, MASK_BITS);
         }
 
-        // remove all masks
+        // nodes within a region are ordered as follows
+        // (Random?) list of nodes that have no history of failure
+        // Nodes with Failure history are ordered in the reverse
+        // order of the most recent entry that generated an error
+        // The sort will have put them in correct order,
+        // so remove the bits that sort by age.
+        for (int i = 0; i < writeSet.size(); i++) {
+            int mask = writeSet.get(i) & MASK_BITS;
+            int idx = (writeSet.get(i) & ~MASK_BITS) % ensembleSize;
+            if (mask == LOCAL_FAIL_MASK) {
+                writeSet.set(i, LOCAL_MASK | idx);
+            } else if (mask == REMOTE_FAIL_MASK) {
+                writeSet.set(i, REMOTE_MASK | idx);
+            } else if (mask == SLOW_MASK) {
+                writeSet.set(i, SLOW_MASK | idx);
+            }
+        }
+
+        // Insert a node from the remote region at the specified location so
+        // we try more than one region within the max allowed latency
+        int firstRemote = -1;
         for (int i = 0; i < writeSet.size(); i++) {
-            writeSet.set(i, (writeSet.get(i) & ~MASK_BITS) % ensembleSize);
+            if ((writeSet.get(i) & MASK_BITS) == REMOTE_MASK) {
+                firstRemote = i;
+                break;
+            }
+        }
+        if (firstRemote != -1) {
+            int i = 0;
+            for (; i < remoteNodeInReorderSequence
+                && i < writeSet.size(); i++) {
+                if ((writeSet.get(i) & MASK_BITS) != LOCAL_MASK) {
+                    break;
+                }
+            }
+            writeSet.moveAndShift(firstRemote, i);
         }
 
+
+        // remove all masks
+        for (int i = 0; i < writeSet.size(); i++) {
+            writeSet.set(i, writeSet.get(i) & ~MASK_BITS);
+        }
         return writeSet;
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index fe2f64a52..07cb31826 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -83,7 +83,7 @@
             this.writeSet = lh.distributionSchedule.getWriteSet(eId);
             if (lh.bk.reorderReadSequence) {
                 this.orderedEnsemble = lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
-                        lh.bookieFailureHistory.asMap(), writeSet.copy());
+                        lh.getBookiesHealthInfo(), writeSet.copy());
             } else {
                 this.orderedEnsemble = writeSet.copy();
             }
@@ -407,6 +407,21 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress
             }
         }
 
+        @Override
+        boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf buffer, long entryId) {
+            boolean completed = super.complete(bookieIndex, host, buffer, entryId);
+            if (completed) {
+                int numReplicasTried = getNextReplicaIndexToReadFrom();
+                // Check if any speculative reads were issued and mark any bookies before the
+                // first speculative read as slow
+                for (int i = 0; i < numReplicasTried; i++) {
+                    int slowBookieIndex = orderedEnsemble.get(i);
+                    BookieSocketAddress slowBookieSocketAddress = ensemble.get(slowBookieIndex);
+                    lh.bk.placementPolicy.registerSlowBookie(slowBookieSocketAddress, entryId);
+                }
+            }
+            return completed;
+        }
     }
 
     ReadLastConfirmedAndEntryOp(LedgerHandle lh,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index ba8798eae..f990b3f24 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -521,122 +521,30 @@ protected BookieNode replaceFromRack(BookieNode bookieNodeToReplace,
     @Override
     public final DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         if (UNKNOWN_REGION.equals(myRegion)) {
-            return super.reorderReadSequence(ensemble, bookieFailureHistory,
-                                             writeSet);
+            return super.reorderReadSequence(ensemble, bookiesHealthInfo, writeSet);
         } else {
-            int ensembleSize = ensemble.size();
-
+            Map<Integer, String> writeSetWithRegion = new HashMap<>();
             for (int i = 0; i < writeSet.size(); i++) {
                 int idx = writeSet.get(i);
-                BookieSocketAddress address = ensemble.get(idx);
-                String region = getRegion(address);
-                Long lastFailedEntryOnBookie = bookieFailureHistory.get(address);
-                if (null == knownBookies.get(address)) {
-                    // there isn't too much differences between readonly bookies
-                    // from unavailable bookies. since there
-                    // is no write requests to them, so we shouldn't try reading
-                    // from readonly bookie in prior to writable bookies.
-                    if ((null == readOnlyBookies)
-                            || !readOnlyBookies.contains(address)) {
-                        writeSet.set(i, idx | UNAVAIL_MASK);
-                    } else {
-                        writeSet.set(i, idx | READ_ONLY_MASK);
-                    }
-                } else if (region.equals(myRegion)) {
-                    if ((lastFailedEntryOnBookie == null)
-                            || (lastFailedEntryOnBookie < 0)) {
-                        writeSet.set(i, idx | LOCAL_MASK);
-                    } else {
-                        long failIdx = lastFailedEntryOnBookie * ensembleSize + idx;
-                        writeSet.set(i, (int) (failIdx & ~MASK_BITS) | LOCAL_FAIL_MASK);
-                    }
-                } else {
-                    if ((lastFailedEntryOnBookie == null)
-                            || (lastFailedEntryOnBookie < 0)) {
-                        writeSet.set(i, idx | REMOTE_MASK);
-                    } else {
-                        long failIdx = lastFailedEntryOnBookie * ensembleSize + idx;
-                        writeSet.set(i, (int) (failIdx & ~MASK_BITS)
-                                     | REMOTE_FAIL_MASK);
-                    }
-                }
-            }
-
-            // Add a mask to ensure the sort is stable, sort,
-            // and then remove mask. This maintains stability as
-            // long as there are fewer than 16 bookies in the write set.
-            for (int i = 0; i < writeSet.size(); i++) {
-                writeSet.set(i, writeSet.get(i) | ((i & 0xF) << 20));
-            }
-            writeSet.sort();
-            for (int i = 0; i < writeSet.size(); i++) {
-                writeSet.set(i, writeSet.get(i) & ~((0xF) << 20));
-            }
-
-            if (reorderReadsRandom) {
-                shuffleWithMask(writeSet, LOCAL_MASK, MASK_BITS);
-                shuffleWithMask(writeSet, REMOTE_MASK, MASK_BITS);
-                shuffleWithMask(writeSet, READ_ONLY_MASK, MASK_BITS);
-                shuffleWithMask(writeSet, UNAVAIL_MASK, MASK_BITS);
-            }
-
-            // nodes within a region are ordered as follows
-            // (Random?) list of nodes that have no history of failure
-            // Nodes with Failure history are ordered in the reverse
-            // order of the most recent entry that generated an error
-            // The sort will have put them in correct order,
-            // so remove the bits that sort by age.
-            for (int i = 0; i < writeSet.size(); i++) {
-                int mask = writeSet.get(i) & MASK_BITS;
-                int idx = (writeSet.get(i) & ~MASK_BITS) % ensembleSize;
-                if (mask == LOCAL_FAIL_MASK) {
-                    writeSet.set(i, LOCAL_MASK | idx);
-                } else if (mask == REMOTE_FAIL_MASK) {
-                    writeSet.set(i, REMOTE_MASK | idx);
-                }
-            }
-
-            // Insert a node from the remote region at the specified location so
-            // we try more than one region within the max allowed latency
-            int firstRemote = -1;
-            for (int i = 0; i < writeSet.size(); i++) {
-                if ((writeSet.get(i) & MASK_BITS) == REMOTE_MASK) {
-                    firstRemote = i;
-                    break;
-                }
-            }
-            if (firstRemote != -1) {
-                int i = 0;
-                for (; i < REMOTE_NODE_IN_REORDER_SEQUENCE && i < writeSet.size(); i++) {
-                    if ((writeSet.get(i) & MASK_BITS) != LOCAL_MASK) {
-                        break;
-                    }
-                }
-                writeSet.moveAndShift(firstRemote, i);
-            }
-
-
-            // remove all masks
-            for (int i = 0; i < writeSet.size(); i++) {
-                writeSet.set(i, writeSet.get(i) & ~MASK_BITS);
+                writeSetWithRegion.put(idx, getRegion(ensemble.get(idx)));
             }
-            return writeSet;
+            return super.reorderReadSequenceWithRegion(ensemble, writeSet, writeSetWithRegion,
+                bookiesHealthInfo, true, myRegion, REMOTE_NODE_IN_REORDER_SEQUENCE);
         }
     }
 
     @Override
     public final DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         if (UNKNOWN_REGION.equals(myRegion)) {
-            return super.reorderReadLACSequence(ensemble, bookieFailureHistory,
-                                                writeSet);
+            return super.reorderReadLACSequence(ensemble, bookiesHealthInfo, writeSet);
         }
-        DistributionSchedule.WriteSet finalList = reorderReadSequence(ensemble, bookieFailureHistory, writeSet);
+        DistributionSchedule.WriteSet finalList = reorderReadSequence(ensemble, bookiesHealthInfo, writeSet);
         finalList.addMissingIndices(ensemble.size());
         return finalList;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
index 88953b396..b70cb0f15 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
@@ -24,7 +24,6 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -461,7 +460,7 @@ public String toString() {
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return writeSet;
     }
@@ -469,10 +468,10 @@ public String toString() {
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         DistributionSchedule.WriteSet retList = reorderReadSequence(
-                ensemble, bookieFailureHistory, writeSet);
+                ensemble, bookiesHealthInfo, writeSet);
         retList.addMissingIndices(ensemble.size());
         return retList;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 257c36734..4d9c77478 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -112,6 +112,8 @@
     // Ensemble Placement Policy
     protected static final String ENSEMBLE_PLACEMENT_POLICY = "ensemblePlacementPolicy";
     protected static final String NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS = "networkTopologyStabilizePeriodSeconds";
+    protected static final String ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES =
+        "ensemblePlacementPolicyOrderSlowBookies";
 
     // Ledger Metadata Parameters
     protected static final String STORE_SYSTEMTIME_AS_LEDGER_CREATION_TIME = "storeSystemTimeAsLedgerCreationTime";
@@ -1088,6 +1090,27 @@ public ClientConfiguration setNetworkTopologyStabilizePeriodSeconds(int seconds)
         return this;
     }
 
+    /**
+     * Whether to order slow bookies in placement policy.
+     *
+     * @return flag of whether to order slow bookies in placement policy or not.
+     */
+    public boolean getEnsemblePlacementPolicySlowBookies() {
+        return getBoolean(ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES, false);
+    }
+
+    /**
+     * Enable/Disable ordering slow bookies in placement policy.
+     *
+     * @param enabled
+     *          flag to enable/disable ordering slow bookies in placement policy.
+     * @return client configuration.
+     */
+    public ClientConfiguration setEnsemblePlacementPolicySlowBookies(boolean enabled) {
+        setProperty(ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES, enabled);
+        return this;
+    }
+
     /**
      * Whether to enable recording task execution stats.
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 9b1ca9cc9..08ed19437 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -157,7 +157,7 @@ public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBook
                                           authProviderFactory, registry, pcbcPool, shFactory);
     }
 
-    private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
+    public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
         PerChannelBookieClientPool clientPool = channels.get(addr);
         if (null == clientPool) {
             closeLock.readLock().lock();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
index 41233cf83..e57b8a746 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
@@ -119,4 +119,13 @@ public void close(boolean wait) {
             pcbc.close(wait);
         }
     }
+
+    @Override
+    public long getNumPendingCompletionRequests() {
+        long numPending = 0;
+        for (PerChannelBookieClient pcbc : clients) {
+            numPending += pcbc.getNumPendingCompletionRequests();
+        }
+        return numPending;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 6e64b37ee..e0199ffb9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -355,6 +355,10 @@ private void completeOperation(GenericCallback<PerChannelBookieClient> op, int r
         }
     }
 
+    protected long getNumPendingCompletionRequests() {
+        return completionObjects.size();
+    }
+
     protected ChannelFuture connect() {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Connecting to bookie: {}", addr);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
index 80f00a595..97a6a2694 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
@@ -25,7 +25,7 @@
 /**
  * An interface to manage channel pooling for bookie client.
  */
-interface PerChannelBookieClientPool {
+public interface PerChannelBookieClientPool {
 
     /**
      * intialize the pool. the implementation should not be blocked.
@@ -67,4 +67,8 @@
      */
     void close(boolean wait);
 
+    /**
+     * Get the number of pending completion requests in the channel.
+     */
+    long getNumPendingCompletionRequests();
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 1ce8f0ce0..637df1b19 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -104,6 +104,25 @@ protected void tearDown() throws Exception {
         super.tearDown();
     }
 
+    static BookiesHealthInfo getBookiesHealthInfo() {
+        return getBookiesHealthInfo(new HashMap<>(), new HashMap<>());
+    }
+
+    static BookiesHealthInfo getBookiesHealthInfo(Map<BookieSocketAddress, Long> bookieFailureHistory,
+                                                  Map<BookieSocketAddress, Long> bookiePendingRequests) {
+        return new BookiesHealthInfo() {
+            @Override
+            public long getBookieFailureHistory(BookieSocketAddress bookieSocketAddress) {
+                return bookieFailureHistory.getOrDefault(bookieSocketAddress, -1L);
+            }
+
+            @Override
+            public long getBookiePendingRequests(BookieSocketAddress bookieSocketAddress) {
+                return bookiePendingRequests.getOrDefault(bookieSocketAddress, 0L);
+            }
+        };
+    }
+
     static void updateMyRack(String rack) throws Exception {
         StaticDNSResolver.addNodeToRack(InetAddress.getLocalHost().getHostAddress(), rack);
         StaticDNSResolver.addNodeToRack(InetAddress.getLocalHost().getHostName(), rack);
@@ -131,7 +150,7 @@ public void testNodeDown() throws Exception {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(),
+                ensemble, getBookiesHealthInfo(),
                 writeSet);
         DistributionSchedule.WriteSet expectedSet = writeSetFromValues(1, 2, 3, 0);
         LOG.info("reorder set : {}", reorderSet);
@@ -162,13 +181,75 @@ public void testNodeReadOnly() throws Exception {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(1, 2, 3, 0);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
+    @Test
+    public void testNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L);
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
         DistributionSchedule.WriteSet expectedSet = writeSetFromValues(1, 2, 3, 0);
         LOG.info("reorder set : {}", reorderSet);
         assertFalse(reorderSet.equals(origWriteSet));
         assertEquals(expectedSet, reorderSet);
     }
 
+    @Test
+    public void testTwoNodesSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        repp.registerSlowBookie(addr2, 0L);
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L);
+        bookiePendingMap.put(addr2, 2L);
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 3, 0, 1);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
     @Test
     public void testTwoNodesDown() throws Exception {
         repp.uninitalize();
@@ -191,7 +272,7 @@ public void testTwoNodesDown() throws Exception {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 3, 0, 1);
         LOG.info("reorder set : {}", reorderSet);
         assertFalse(reorderSet.equals(origWriteSet));
@@ -221,12 +302,78 @@ public void testNodeDownAndReadOnly() throws Exception {
         repp.onClusterChanged(addrs, roAddrs);
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 3, 1, 0);
         assertFalse(reorderSet.equals(origWriteSet));
         assertEquals(expectedSet, reorderSet);
     }
 
+    @Test
+    public void testNodeDownAndNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L);
+        addrs.remove(addr2);
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 3, 0, 1);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
+    @Test
+    public void testNodeDownAndReadOnlyAndNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        addrs.remove(addr1);
+        addrs.remove(addr2);
+        Set<BookieSocketAddress> ro = new HashSet<BookieSocketAddress>();
+        ro.add(addr2);
+        repp.registerSlowBookie(addr3, 0L);
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr3, 1L);
+        addrs.remove(addr2);
+        repp.onClusterChanged(addrs, ro);
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 1, 2, 0);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
     @Test
     public void testReplaceBookieWithEnoughBookiesInSameRack() throws Exception {
         BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
@@ -738,7 +885,7 @@ public void testNodeWithFailures() throws Exception {
         bookieFailures.put(addr2, 22L);
 
         DistributionSchedule.WriteSet reoderSet = repp.reorderReadSequence(
-                ensemble, bookieFailures, writeSet);
+                ensemble, getBookiesHealthInfo(bookieFailures, new HashMap<>()), writeSet);
         LOG.info("reorder set : {}", reoderSet);
         assertEquals(ensemble.get(reoderSet.get(2)), addr1);
         assertEquals(ensemble.get(reoderSet.get(3)), addr2);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index 1dcf0e10c..fe75b3105 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -35,6 +35,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -118,6 +119,26 @@ protected void tearDown() throws Exception {
         super.tearDown();
     }
 
+    static BookiesHealthInfo getBookiesHealthInfo() {
+        return getBookiesHealthInfo(new HashMap<>(), new HashMap<>());
+    }
+
+    static BookiesHealthInfo getBookiesHealthInfo(Map<BookieSocketAddress, Long> bookieFailureHistory,
+                                                  Map<BookieSocketAddress, Long> bookiePendingRequests) {
+        return new BookiesHealthInfo() {
+            @Override
+            public long getBookieFailureHistory(BookieSocketAddress bookieSocketAddress) {
+                return bookieFailureHistory.getOrDefault(bookieSocketAddress, -1L);
+            }
+
+            @Override
+            public long getBookiePendingRequests(BookieSocketAddress bookieSocketAddress) {
+                return bookiePendingRequests.getOrDefault(bookieSocketAddress, 0L);
+            }
+        };
+    }
+
+
     @Test
     public void testNotReorderReadIfInDefaultRack() throws Exception {
         repp.uninitalize();
@@ -128,7 +149,7 @@ public void testNotReorderReadIfInDefaultRack() throws Exception {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         assertEquals(origWriteSet, reorderSet);
     }
 
@@ -151,7 +172,7 @@ public void testNodeInSameRegion() throws Exception {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
 
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet.copy());
+                ensemble, getBookiesHealthInfo(), writeSet.copy());
         DistributionSchedule.WriteSet expectedSet = writeSetFromValues(0, 3, 1, 2);
         LOG.info("write set : {}", writeSet);
         LOG.info("reorder set : {}", reorderSet);
@@ -171,7 +192,7 @@ public void testNodeNotInSameRegions() throws Exception {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         LOG.info("reorder set : {}", reorderSet);
         assertEquals(origWriteSet, reorderSet);
     }
@@ -196,7 +217,7 @@ public void testNodeDown() throws Exception {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 1, 2, 0);
         LOG.info("reorder set : {}", reorderSet);
         assertFalse(reorderSet.equals(origWriteSet));
@@ -225,13 +246,73 @@ public void testNodeReadOnly() throws Exception {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 1, 2, 0);
         LOG.info("reorder set : {}", reorderSet);
         assertFalse(reorderSet.equals(origWriteSet));
         assertEquals(expectedSet, reorderSet);
     }
 
+    @Test
+    public void testNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RegionAwareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L);
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 1, 2, 0);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
+    @Test
+    public void testTwoNodesSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RegionAwareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        repp.registerSlowBookie(addr2, 0L);
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L);
+        bookiePendingMap.put(addr2, 2L);
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 2, 0, 1);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
     @Test
     public void testTwoNodesDown() throws Exception {
         repp.uninitalize();
@@ -253,13 +334,76 @@ public void testTwoNodesDown() throws Exception {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 2, 0, 1);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
+    @Test
+    public void testNodeDownAndNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RegionAwareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L);
+        addrs.remove(addr2);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
         DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 2, 0, 1);
         LOG.info("reorder set : {}", reorderSet);
         assertFalse(reorderSet.equals(origWriteSet));
         assertEquals(expectedSet, reorderSet);
     }
 
+    @Test
+    public void testNodeDownAndReadOnlyAndNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RegionAwareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        addrs.remove(addr1);
+        addrs.remove(addr2);
+        Set<BookieSocketAddress> ro = new HashSet<>();
+        ro.add(addr2);
+        repp.registerSlowBookie(addr3, 0L);
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr3, 1L);
+        repp.onClusterChanged(addrs, ro);
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 1, 2, 0);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
     @Test
     public void testReplaceBookieWithEnoughBookiesInSameRegion() throws Exception {
         BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
@@ -1068,11 +1212,11 @@ private void basicReorderReadSequenceWithLocalRegionTest(String myRegion, boolea
             if (isReadLAC) {
                 readSet = repp.reorderReadLACSequence(
                         ensemble,
-                        new HashMap<BookieSocketAddress, Long>(), writeSet);
+                        getBookiesHealthInfo(), writeSet);
             } else {
                 readSet = repp.reorderReadSequence(
                         ensemble,
-                        new HashMap<BookieSocketAddress, Long>(), writeSet);
+                        getBookiesHealthInfo(), writeSet);
             }
 
             LOG.info("Reorder {} => {}.", origWriteSet, readSet);
@@ -1124,12 +1268,12 @@ private void basicReorderReadSequenceWithRemoteRegionTest(String myRegion, boole
             if (isReadLAC) {
                 readSet = repp.reorderReadLACSequence(
                         ensemble,
-                        new HashMap<BookieSocketAddress, Long>(),
+                        getBookiesHealthInfo(),
                         writeSet.copy());
             } else {
                 readSet = repp.reorderReadSequence(
                         ensemble,
-                        new HashMap<BookieSocketAddress, Long>(),
+                        getBookiesHealthInfo(),
                         writeSet.copy());
             }
 
@@ -1199,11 +1343,11 @@ private void reorderReadSequenceWithUnavailableOrReadOnlyBookiesTest(boolean isR
             DistributionSchedule.WriteSet readSet;
             if (isReadLAC) {
                 readSet = repp.reorderReadLACSequence(
-                        ensemble, new HashMap<BookieSocketAddress, Long>(),
+                        ensemble, getBookiesHealthInfo(),
                         writeSet.copy());
             } else {
                 readSet = repp.reorderReadSequence(
-                        ensemble, new HashMap<BookieSocketAddress, Long>(),
+                        ensemble, getBookiesHealthInfo(),
                         writeSet.copy());
             }
 
@@ -1293,7 +1437,7 @@ public void testNodeWithFailures() throws Exception {
 
         LOG.info("write set : {}", writeSet2);
         DistributionSchedule.WriteSet reoderSet = repp.reorderReadSequence(
-                ensemble, bookieFailures, writeSet2);
+                ensemble, getBookiesHealthInfo(bookieFailures, new HashMap<>()), writeSet2);
         LOG.info("reorder set : {}", reoderSet);
         assertEquals(ensemble.get(reoderSet.get(0)), addr6);
         assertEquals(ensemble.get(reoderSet.get(1)), addr7);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
index 41e5b2e1c..d692cea95 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -27,6 +28,8 @@
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -69,7 +72,9 @@ long getLedgerToRead(int ensemble, int quorum) throws Exception {
     BookKeeper createClient(int specTimeout) throws Exception {
         ClientConfiguration conf = new ClientConfiguration()
             .setSpeculativeReadTimeout(specTimeout)
-            .setReadTimeout(30000);
+            .setReadTimeout(30000)
+            .setReorderReadSequenceEnabled(true)
+            .setEnsemblePlacementPolicySlowBookies(true);
         conf.setZkServers(zkUtil.getZooKeeperConnectString());
         return new BookKeeper(conf);
     }
@@ -150,6 +155,10 @@ public void testSpeculativeRead() throws Exception {
             lspec.asyncReadEntries(1, 1, speccb, null);
             speccb.expectSuccess(4000);
             nospeccb.expectTimeout(4000);
+            // Check that the second bookie is registered as slow at entryId 1
+            RackawareEnsemblePlacementPolicy rep = (RackawareEnsemblePlacementPolicy) lspec.bk.placementPolicy;
+            assertTrue(rep.slowBookies.asMap().size() == 1);
+            assertTrue(rep.slowBookies.asMap().get(second) == 1L);
         } finally {
             sleepLatch.countDown();
             lspec.close();
@@ -195,16 +204,18 @@ public void testSpeculativeReadMultipleReplicasDown() throws Exception {
                        latch1.getDuration() >= timeout * 2
                        && latch1.getDuration() < timeout * 3);
 
-            // third should have to hit one timeouts (bookie 2)
+            // bookies 1 & 2 should be registered as slow bookies because of speculative reads
+            Set<BookieSocketAddress> expectedSlowBookies = new HashSet<>();
+            expectedSlowBookies.add(l.getLedgerMetadata().getEnsembles().get(0L).get(1));
+            expectedSlowBookies.add(l.getLedgerMetadata().getEnsembles().get(0L).get(2));
+            assertEquals(((RackawareEnsemblePlacementPolicy) l.bk.placementPolicy).slowBookies.asMap().keySet(),
+                expectedSlowBookies);
+
+            // third should not hit timeouts since bookies 1 & 2 are registered as slow
             // bookie 3 has the entry
             LatchCallback latch2 = new LatchCallback();
             l.asyncReadEntries(2, 2, latch2, null);
-            latch2.expectTimeout(timeout / 2);
             latch2.expectSuccess(timeout);
-            LOG.info("Timeout {} latch2 duration {}", timeout, latch2.getDuration());
-            assertTrue("should have taken longer than one timeout, but less than 2",
-                       latch2.getDuration() >= timeout
-                       && latch2.getDuration() < timeout * 2);
 
             // fourth should have no timeout
             // bookie 3 has the entry


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services