You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/14 00:41:09 UTC

[bookkeeper] branch master updated: Issue #1489: Better Prevent Read Outliers during short-term Bookie Slow-Down

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 134a6c2  Issue #1489: Better Prevent Read Outliers during short-term Bookie Slow-Down
134a6c2 is described below

commit 134a6c25a846731dc2ad0dc6c3e848170557b7b9
Author: Nicolas Michael <nm...@salesforce.com>
AuthorDate: Wed Jun 13 17:41:02 2018 -0700

    Issue #1489: Better Prevent Read Outliers during short-term Bookie Slow-Down
    
    ### Motivation
    
    Bookies can temporarily be slow for a large number of reasons, often for just a brief time of few milliseconds to seconds such as during Java Garbage Collection or EntryLog compaction. For writes, latencies of individual bookies are masked by acknowledging the client after a quorum of bookies have replied. However for reads, we don't have any equivalent feature to mask short-term latencies of individual bookies yet (in case of SequenceReadRequests). This PR implements such a feature b [...]
    
    ### Changes
    This change implements a configurable reordering of read requests in Bokkeeper client based on the number of pending requests to each bookie that could service the request. The intention is to mask the latency of one bookie by directing a read request to another bookie that could potentially service the request faster. This should help prevent read time outliers due to bookies that temporarily are responsing slow, for example due to Java garbage collection, compaction, or any other ki [...]
    
    Reordering of reads is based on a threshold of relative queue length to other bookies. Setting the threshold very low will more frequently reorder the read set and potentially result in better latency, but will also reduce data affinity of reads. Reads send to other than the preferred bookie have a low chance to be served from file system cache on that bookie, and will likely result in a physical read. Small thresholds therefore shuffle read requests more among bookies and may lead to [...]
    
    Master Issue: #1489
    
    Author: Nicolas Michael <nm...@salesforce.com>
    
    Reviewers: Yiming Zang <yz...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1504 from nicmichael/ReadReordering, closes #1489
---
 .../bookkeeper/client/BookKeeperClientStats.java   |   1 +
 .../org/apache/bookkeeper/client/LedgerHandle.java |  10 +-
 .../client/RackawareEnsemblePlacementPolicy.java   |  15 +-
 .../RackawareEnsemblePlacementPolicyImpl.java      |  51 ++++++-
 .../client/RegionAwareEnsemblePlacementPolicy.java |   6 +-
 .../bookkeeper/conf/ClientConfiguration.java       |  25 ++++
 .../TestRackawareEnsemblePlacementPolicy.java      | 166 +++++++++++++++++++++
 7 files changed, 264 insertions(+), 10 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 83e6421..749ac9c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -54,6 +54,7 @@ public interface BookKeeperClientStats {
     String LAC_UPDATE_MISSES = "LAC_UPDATE_MISSES";
     String GET_BOOKIE_INFO_OP = "GET_BOOKIE_INFO";
     String SPECULATIVE_READ_COUNT = "SPECULATIVE_READ_COUNT";
+    String READ_REQUESTS_REORDERED = "READ_REQUESTS_REORDERED";
 
     // per channel stats
     String CHANNEL_SCOPE = "per_channel_bookie_client";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 9f90c5a..a2a37e9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -99,6 +99,8 @@ import org.slf4j.LoggerFactory;
 public class LedgerHandle implements WriteHandle {
     static final Logger LOG = LoggerFactory.getLogger(LedgerHandle.class);
 
+    static final long PENDINGREQ_NOTWRITABLE_MASK = 0x01L << 62;
+
     final byte[] ledgerKey;
     LedgerMetadata metadata;
     final BookKeeper bk;
@@ -224,7 +226,13 @@ public class LedgerHandle implements WriteHandle {
             @Override
             public long getBookiePendingRequests(BookieSocketAddress bookieSocketAddress) {
                 PerChannelBookieClientPool pcbcPool = bk.bookieClient.lookupClient(bookieSocketAddress);
-                return pcbcPool == null ? 0 : pcbcPool.getNumPendingCompletionRequests();
+                if (pcbcPool == null) {
+                    return 0;
+                } else if (pcbcPool.isWritable(ledgerId)) {
+                    return pcbcPool.getNumPendingCompletionRequests();
+                } else {
+                    return pcbcPool.getNumPendingCompletionRequests() | PENDINGREQ_NOTWRITABLE_MASK;
+                }
             }
         };
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 32f94f3..e3ce8ff 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -50,19 +50,22 @@ public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacement
                                                           HashedWheelTimer timer,
                                                           boolean reorderReadsRandom,
                                                           int stabilizePeriodSeconds,
+                                                          int reorderThresholdPendingRequests,
                                                           boolean isWeighted,
                                                           int maxWeightMultiple,
                                                           int minNumRacksPerWriteQuorum,
                                                           StatsLogger statsLogger) {
         if (stabilizePeriodSeconds > 0) {
-            super.initialize(dnsResolver, timer, reorderReadsRandom, 0, isWeighted, maxWeightMultiple,
-                    minNumRacksPerWriteQuorum, statsLogger);
-            slave = new RackawareEnsemblePlacementPolicyImpl(enforceDurability);
-            slave.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, isWeighted,
+            super.initialize(dnsResolver, timer, reorderReadsRandom, 0, reorderThresholdPendingRequests, isWeighted,
                     maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
+            slave = new RackawareEnsemblePlacementPolicyImpl(enforceDurability);
+            slave.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds,
+                    reorderThresholdPendingRequests, isWeighted, maxWeightMultiple,
+                    minNumRacksPerWriteQuorum, statsLogger);
         } else {
-            super.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, isWeighted,
-                    maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
+            super.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds,
+                    reorderThresholdPendingRequests, isWeighted, maxWeightMultiple,
+                    minNumRacksPerWriteQuorum, statsLogger);
             slave = null;
         }
         return this;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 7416372..23afaa9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -198,11 +198,13 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
     protected boolean reorderReadsRandom = false;
     protected boolean enforceDurability = false;
     protected int stabilizePeriodSeconds = 0;
+    protected int reorderThresholdPendingRequests = 0;
     // looks like these only assigned in the same thread as constructor, immediately after constructor;
     // no need to make volatile
     protected StatsLogger statsLogger = null;
     protected OpStatsLogger bookiesJoinedCounter = null;
     protected OpStatsLogger bookiesLeftCounter = null;
+    protected OpStatsLogger readReorderedCounter = null;
 
     private String defaultRack = NetworkTopology.DEFAULT_RACK;
 
@@ -232,6 +234,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
                                                               HashedWheelTimer timer,
                                                               boolean reorderReadsRandom,
                                                               int stabilizePeriodSeconds,
+                                                              int reorderThresholdPendingRequests,
                                                               boolean isWeighted,
                                                               int maxWeightMultiple,
                                                               int minNumRacksPerWriteQuorum,
@@ -240,8 +243,10 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
         this.statsLogger = statsLogger;
         this.bookiesJoinedCounter = statsLogger.getOpStatsLogger(BookKeeperServerStats.BOOKIES_JOINED);
         this.bookiesLeftCounter = statsLogger.getOpStatsLogger(BookKeeperServerStats.BOOKIES_LEFT);
+        this.readReorderedCounter = statsLogger.getOpStatsLogger(BookKeeperClientStats.READ_REQUESTS_REORDERED);
         this.reorderReadsRandom = reorderReadsRandom;
         this.stabilizePeriodSeconds = stabilizePeriodSeconds;
+        this.reorderThresholdPendingRequests = reorderThresholdPendingRequests;
         this.dnsResolver = new DNSResolverDecorator(dnsResolver, () -> this.getDefaultRack());
         this.timer = timer;
         this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
@@ -330,6 +335,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
                 timer,
                 conf.getBoolean(REPP_RANDOM_READ_REORDERING, false),
                 conf.getNetworkTopologyStabilizePeriodSeconds(),
+                conf.getReorderThresholdPendingRequests(),
                 conf.getDiskWeightBasedPlacementEnabled(),
                 conf.getBookieMaxWeightMultipleForWeightBasedPlacement(),
                 conf.getMinNumRacksPerWriteQuorum(),
@@ -952,7 +958,11 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
 
     @Override
     public void registerSlowBookie(BookieSocketAddress bookieSocketAddress, long entryId) {
-        slowBookies.put(bookieSocketAddress, entryId);
+        if (reorderThresholdPendingRequests <= 0) {
+            // only put bookies on slowBookies list if reorderThresholdPendingRequests is *not* set (0);
+            // otherwise, rely on reordering of reads based on reorderThresholdPendingRequests
+            slowBookies.put(bookieSocketAddress, entryId);
+        }
     }
 
     @Override
@@ -1026,7 +1036,45 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
             }
         }
 
+        boolean reordered = false;
+        if (reorderThresholdPendingRequests > 0) {
+            // if there are no slow or unavailable bookies, capture each bookie's number of
+            // pending request to reorder requests based on a threshold of pending requests
+
+            // number of pending requests per bookie (same index as writeSet)
+            long[] pendingReqs = new long[writeSet.size()];
+            int bestBookieIdx = -1;
+
+            for (int i = 0; i < writeSet.size(); i++) {
+                pendingReqs[i] = bookiesHealthInfo.getBookiePendingRequests(ensemble.get(writeSet.get(i)));
+                if (bestBookieIdx < 0 || pendingReqs[i] < pendingReqs[bestBookieIdx]) {
+                    bestBookieIdx = i;
+                }
+            }
+
+            // reorder the writeSet if the currently first bookie in our writeSet has at
+            // least
+            // reorderThresholdPendingRequests more outstanding request than the best bookie
+            if (bestBookieIdx > 0 && pendingReqs[0] >= pendingReqs[bestBookieIdx] + reorderThresholdPendingRequests) {
+                // We're not reordering the entire write set, but only move the best bookie
+                // to the first place. Chances are good that this bookie will be fast enough
+                // to not trigger the speculativeReadTimeout. But even if it hits that timeout,
+                // things may have changed by then so much that whichever bookie we put second
+                // may actually not be the second-best choice any more.
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("read set reordered from {} ({} pending) to {} ({} pending)",
+                            ensemble.get(writeSet.get(0)), pendingReqs[0], ensemble.get(writeSet.get(bestBookieIdx)),
+                            pendingReqs[bestBookieIdx]);
+                }
+                writeSet.moveAndShift(bestBookieIdx, 0);
+                reordered = true;
+            }
+        }
+
         if (!isAnyBookieUnavailable) {
+            if (reordered) {
+                readReorderedCounter.registerSuccessfulValue(1);
+            }
             return writeSet;
         }
 
@@ -1137,6 +1185,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
         for (int i = 0; i < writeSet.size(); i++) {
             writeSet.set(i, writeSet.get(i) & ~MASK_BITS);
         }
+        readReorderedCounter.registerSuccessfulValue(1);
         return writeSet;
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 6205174..218781b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -130,7 +130,8 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
             if (null == perRegionPlacement.get(region)) {
                 perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy()
                         .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds,
-                                this.isWeighted, this.maxWeightMultiple, this.minNumRacksPerWriteQuorum, statsLogger)
+                                this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple,
+                                this.minNumRacksPerWriteQuorum, statsLogger)
                         .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
             }
 
@@ -178,7 +179,8 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
             for (String region: regions) {
                 perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy(true)
                         .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds,
-                                this.isWeighted, this.maxWeightMultiple, this.minNumRacksPerWriteQuorum, statsLogger)
+                                this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple,
+                                this.minNumRacksPerWriteQuorum, statsLogger)
                         .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
             }
             minRegionsForDurability = conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 9f09c37..340b40b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -149,6 +149,7 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
     // Ensemble Placement Policy
     protected static final String ENSEMBLE_PLACEMENT_POLICY = "ensemblePlacementPolicy";
     protected static final String NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS = "networkTopologyStabilizePeriodSeconds";
+    protected static final String READ_REORDER_THRESHOLD_PENDING_REQUESTS = "readReorderThresholdPendingRequests";
     protected static final String ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES =
         "ensemblePlacementPolicyOrderSlowBookies";
 
@@ -1127,6 +1128,30 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
     }
 
     /**
+     * Get the threshold for the number of pending requests beyond which to reorder
+     * reads. If &lt;= zero, this feature is turned off.
+     *
+     * @return the threshold for the number of pending requests beyond which to
+     *         reorder reads.
+     */
+    public int getReorderThresholdPendingRequests() {
+        return getInt(READ_REORDER_THRESHOLD_PENDING_REQUESTS, 0);
+    }
+
+    /**
+     * Set the threshold for the number of pending requests beyond which to reorder
+     * reads. If zero, this feature is turned off.
+     *
+     * @param threshold
+     *            The threshold for the number of pending requests beyond which to
+     *            reorder reads.
+     */
+    public ClientConfiguration setReorderThresholdPendingRequests(int threshold) {
+        setProperty(READ_REORDER_THRESHOLD_PENDING_REQUESTS, threshold);
+        return this;
+    }
+
+    /**
      * Get the network topology stabilize period in seconds. if it is zero, this feature is turned off.
      *
      * @return network topology stabilize period in seconds.
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index ad9c9c8..cdff679 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -372,6 +372,172 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         assertEquals(expectedSet, reorderSet);
     }
 
+    /*
+     * Tests the reordering of the writeSet based on number of pending requests.
+     * Expect the third bookie to be placed first since its number of pending requests
+     * is READ_REORDER_THRESHOLD_PENDING_REQUESTS=10 less than the originally first bookie.
+     */
+    @Test
+    public void testPendingRequestsReorder() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
+        conf.setReorderThresholdPendingRequests(10);
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 20L);
+        bookiePendingMap.put(addr2, 7L);
+        bookiePendingMap.put(addr3, 1L); // best bookie -> this one first
+        bookiePendingMap.put(addr4, 5L);
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 0, 1, 3);
+        LOG.info("reorder set : {}", reorderSet);
+        assertEquals("expect bookie idx 2 first", expectedSet, reorderSet);
+    }
+
+    /*
+     * Tests the reordering of the writeSet based on number of pending requests for
+     * an ensemble that is larger than the writeSet.
+     * Expect the sixth bookie to be placed first since its number of pending requests
+     * is READ_REORDER_THRESHOLD_PENDING_REQUESTS=10 less than the originally first bookie.
+     */
+    @Test
+    public void testPendingRequestsReorderLargeEnsemble() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
+        conf.setReorderThresholdPendingRequests(10);
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181);
+        BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.7", 3181);
+        BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.8", 3181);
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        addrs.add(addr5);
+        addrs.add(addr6);
+        addrs.add(addr7);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L); // not in write set
+        bookiePendingMap.put(addr2, 20L);
+        bookiePendingMap.put(addr3, 0L); // not in write set
+        bookiePendingMap.put(addr4, 12L);
+        bookiePendingMap.put(addr5, 9L); // not in write set
+        bookiePendingMap.put(addr6, 2L); // best bookie -> this one first
+        bookiePendingMap.put(addr7, 10L);
+        ArrayList<BookieSocketAddress> ensemble = new ArrayList<BookieSocketAddress>();
+        ensemble.add(addr1);
+        ensemble.add(addr2);
+        ensemble.add(addr3);
+        ensemble.add(addr4);
+        ensemble.add(addr5);
+        ensemble.add(addr6);
+        ensemble.add(addr7);
+
+        DistributionSchedule.WriteSet writeSet = writeSetFromValues(1, 3, 5, 6);
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+                ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(5, 1, 3, 6);
+        LOG.info("reorder set : {}", reorderSet);
+        assertEquals("expect bookie idx 5 first", expectedSet, reorderSet);
+    }
+
+    /*
+     * Tests the reordering of the writeSet based on number of pending requests.
+     * Expect no reordering in this case since the currently first bookie's number of
+     * pending requests is less than READ_REORDER_THRESHOLD_PENDING_REQUESTS=10 lower
+     * than the best bookie.
+     */
+    @Test
+    public void testPendingRequestsNoReorder1() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
+        conf.setReorderThresholdPendingRequests(10);
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 10L); // -> this one first
+        bookiePendingMap.put(addr2, 7L);
+        bookiePendingMap.put(addr3, 1L); // best bookie, but below threshold
+        bookiePendingMap.put(addr4, 5L);
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
+        LOG.info("reorder set : {}", reorderSet);
+        assertEquals("writeSet should be in original order", origWriteSet, reorderSet);
+    }
+
+    /*
+     * Tests the reordering of the writeSet based on number of pending requests.
+     * Expect no reordering in this case since the currently first bookie's number of
+     * pending requests is lowest among all bookies already.
+     */
+    @Test
+    public void testPendingRequestsNoReorder2() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
+        conf.setReorderThresholdPendingRequests(10);
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L); // -> this one first
+        bookiePendingMap.put(addr2, 7L);
+        bookiePendingMap.put(addr3, 1L);
+        bookiePendingMap.put(addr4, 5L);
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
+        LOG.info("reorder set : {}", reorderSet);
+        assertEquals("writeSet should be in original order", origWriteSet, reorderSet);
+    }
+
     @Test
     public void testReplaceBookieWithEnoughBookiesInSameRack() throws Exception {
         BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.