You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/06/14 00:41:12 UTC

[GitHub] sijie closed pull request #1504: Issue #1489: Better Prevent Read Outliers during short-term Bookie Slow-Down

sijie closed pull request #1504: Issue #1489: Better Prevent Read Outliers during short-term Bookie Slow-Down
URL: https://github.com/apache/bookkeeper/pull/1504
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 83e642186..749ac9cd2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -54,6 +54,7 @@
     String LAC_UPDATE_MISSES = "LAC_UPDATE_MISSES";
     String GET_BOOKIE_INFO_OP = "GET_BOOKIE_INFO";
     String SPECULATIVE_READ_COUNT = "SPECULATIVE_READ_COUNT";
+    String READ_REQUESTS_REORDERED = "READ_REQUESTS_REORDERED";
 
     // per channel stats
     String CHANNEL_SCOPE = "per_channel_bookie_client";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 9f90c5a1a..a2a37e968 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -99,6 +99,8 @@
 public class LedgerHandle implements WriteHandle {
     static final Logger LOG = LoggerFactory.getLogger(LedgerHandle.class);
 
+    static final long PENDINGREQ_NOTWRITABLE_MASK = 0x01L << 62;
+
     final byte[] ledgerKey;
     LedgerMetadata metadata;
     final BookKeeper bk;
@@ -224,7 +226,13 @@ public long getBookieFailureHistory(BookieSocketAddress bookieSocketAddress) {
             @Override
             public long getBookiePendingRequests(BookieSocketAddress bookieSocketAddress) {
                 PerChannelBookieClientPool pcbcPool = bk.bookieClient.lookupClient(bookieSocketAddress);
-                return pcbcPool == null ? 0 : pcbcPool.getNumPendingCompletionRequests();
+                if (pcbcPool == null) {
+                    return 0;
+                } else if (pcbcPool.isWritable(ledgerId)) {
+                    return pcbcPool.getNumPendingCompletionRequests();
+                } else {
+                    return pcbcPool.getNumPendingCompletionRequests() | PENDINGREQ_NOTWRITABLE_MASK;
+                }
             }
         };
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 32f94f34d..e3ce8fff1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -50,19 +50,22 @@ protected RackawareEnsemblePlacementPolicy initialize(DNSToSwitchMapping dnsReso
                                                           HashedWheelTimer timer,
                                                           boolean reorderReadsRandom,
                                                           int stabilizePeriodSeconds,
+                                                          int reorderThresholdPendingRequests,
                                                           boolean isWeighted,
                                                           int maxWeightMultiple,
                                                           int minNumRacksPerWriteQuorum,
                                                           StatsLogger statsLogger) {
         if (stabilizePeriodSeconds > 0) {
-            super.initialize(dnsResolver, timer, reorderReadsRandom, 0, isWeighted, maxWeightMultiple,
-                    minNumRacksPerWriteQuorum, statsLogger);
-            slave = new RackawareEnsemblePlacementPolicyImpl(enforceDurability);
-            slave.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, isWeighted,
+            super.initialize(dnsResolver, timer, reorderReadsRandom, 0, reorderThresholdPendingRequests, isWeighted,
                     maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
+            slave = new RackawareEnsemblePlacementPolicyImpl(enforceDurability);
+            slave.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds,
+                    reorderThresholdPendingRequests, isWeighted, maxWeightMultiple,
+                    minNumRacksPerWriteQuorum, statsLogger);
         } else {
-            super.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, isWeighted,
-                    maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
+            super.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds,
+                    reorderThresholdPendingRequests, isWeighted, maxWeightMultiple,
+                    minNumRacksPerWriteQuorum, statsLogger);
             slave = null;
         }
         return this;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 7416372fe..23afaa9ae 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -198,11 +198,13 @@ public void reloadCachedMappings() {
     protected boolean reorderReadsRandom = false;
     protected boolean enforceDurability = false;
     protected int stabilizePeriodSeconds = 0;
+    protected int reorderThresholdPendingRequests = 0;
     // looks like these only assigned in the same thread as constructor, immediately after constructor;
     // no need to make volatile
     protected StatsLogger statsLogger = null;
     protected OpStatsLogger bookiesJoinedCounter = null;
     protected OpStatsLogger bookiesLeftCounter = null;
+    protected OpStatsLogger readReorderedCounter = null;
 
     private String defaultRack = NetworkTopology.DEFAULT_RACK;
 
@@ -232,6 +234,7 @@ protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dns
                                                               HashedWheelTimer timer,
                                                               boolean reorderReadsRandom,
                                                               int stabilizePeriodSeconds,
+                                                              int reorderThresholdPendingRequests,
                                                               boolean isWeighted,
                                                               int maxWeightMultiple,
                                                               int minNumRacksPerWriteQuorum,
@@ -240,8 +243,10 @@ protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dns
         this.statsLogger = statsLogger;
         this.bookiesJoinedCounter = statsLogger.getOpStatsLogger(BookKeeperServerStats.BOOKIES_JOINED);
         this.bookiesLeftCounter = statsLogger.getOpStatsLogger(BookKeeperServerStats.BOOKIES_LEFT);
+        this.readReorderedCounter = statsLogger.getOpStatsLogger(BookKeeperClientStats.READ_REQUESTS_REORDERED);
         this.reorderReadsRandom = reorderReadsRandom;
         this.stabilizePeriodSeconds = stabilizePeriodSeconds;
+        this.reorderThresholdPendingRequests = reorderThresholdPendingRequests;
         this.dnsResolver = new DNSResolverDecorator(dnsResolver, () -> this.getDefaultRack());
         this.timer = timer;
         this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
@@ -330,6 +335,7 @@ public Long load(BookieSocketAddress key) throws Exception {
                 timer,
                 conf.getBoolean(REPP_RANDOM_READ_REORDERING, false),
                 conf.getNetworkTopologyStabilizePeriodSeconds(),
+                conf.getReorderThresholdPendingRequests(),
                 conf.getDiskWeightBasedPlacementEnabled(),
                 conf.getBookieMaxWeightMultipleForWeightBasedPlacement(),
                 conf.getMinNumRacksPerWriteQuorum(),
@@ -952,7 +958,11 @@ protected BookieNode selectRandomFromRack(String netPath, Set<Node> excludeBooki
 
     @Override
     public void registerSlowBookie(BookieSocketAddress bookieSocketAddress, long entryId) {
-        slowBookies.put(bookieSocketAddress, entryId);
+        if (reorderThresholdPendingRequests <= 0) {
+            // only put bookies on slowBookies list if reorderThresholdPendingRequests is *not* set (0);
+            // otherwise, rely on reordering of reads based on reorderThresholdPendingRequests
+            slowBookies.put(bookieSocketAddress, entryId);
+        }
     }
 
     @Override
@@ -1026,7 +1036,45 @@ public void registerSlowBookie(BookieSocketAddress bookieSocketAddress, long ent
             }
         }
 
+        boolean reordered = false;
+        if (reorderThresholdPendingRequests > 0) {
+            // if there are no slow or unavailable bookies, capture each bookie's number of
+            // pending request to reorder requests based on a threshold of pending requests
+
+            // number of pending requests per bookie (same index as writeSet)
+            long[] pendingReqs = new long[writeSet.size()];
+            int bestBookieIdx = -1;
+
+            for (int i = 0; i < writeSet.size(); i++) {
+                pendingReqs[i] = bookiesHealthInfo.getBookiePendingRequests(ensemble.get(writeSet.get(i)));
+                if (bestBookieIdx < 0 || pendingReqs[i] < pendingReqs[bestBookieIdx]) {
+                    bestBookieIdx = i;
+                }
+            }
+
+            // reorder the writeSet if the currently first bookie in our writeSet has at
+            // least
+            // reorderThresholdPendingRequests more outstanding request than the best bookie
+            if (bestBookieIdx > 0 && pendingReqs[0] >= pendingReqs[bestBookieIdx] + reorderThresholdPendingRequests) {
+                // We're not reordering the entire write set, but only move the best bookie
+                // to the first place. Chances are good that this bookie will be fast enough
+                // to not trigger the speculativeReadTimeout. But even if it hits that timeout,
+                // things may have changed by then so much that whichever bookie we put second
+                // may actually not be the second-best choice any more.
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("read set reordered from {} ({} pending) to {} ({} pending)",
+                            ensemble.get(writeSet.get(0)), pendingReqs[0], ensemble.get(writeSet.get(bestBookieIdx)),
+                            pendingReqs[bestBookieIdx]);
+                }
+                writeSet.moveAndShift(bestBookieIdx, 0);
+                reordered = true;
+            }
+        }
+
         if (!isAnyBookieUnavailable) {
+            if (reordered) {
+                readReorderedCounter.registerSuccessfulValue(1);
+            }
             return writeSet;
         }
 
@@ -1137,6 +1185,7 @@ public void registerSlowBookie(BookieSocketAddress bookieSocketAddress, long ent
         for (int i = 0; i < writeSet.size(); i++) {
             writeSet.set(i, writeSet.get(i) & ~MASK_BITS);
         }
+        readReorderedCounter.registerSuccessfulValue(1);
         return writeSet;
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 62051741f..218781b15 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -130,7 +130,8 @@ public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
             if (null == perRegionPlacement.get(region)) {
                 perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy()
                         .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds,
-                                this.isWeighted, this.maxWeightMultiple, this.minNumRacksPerWriteQuorum, statsLogger)
+                                this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple,
+                                this.minNumRacksPerWriteQuorum, statsLogger)
                         .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
             }
 
@@ -178,7 +179,8 @@ public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf,
             for (String region: regions) {
                 perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy(true)
                         .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds,
-                                this.isWeighted, this.maxWeightMultiple, this.minNumRacksPerWriteQuorum, statsLogger)
+                                this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple,
+                                this.minNumRacksPerWriteQuorum, statsLogger)
                         .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
             }
             minRegionsForDurability = conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 9f09c37d6..340b40b1f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -149,6 +149,7 @@
     // Ensemble Placement Policy
     protected static final String ENSEMBLE_PLACEMENT_POLICY = "ensemblePlacementPolicy";
     protected static final String NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS = "networkTopologyStabilizePeriodSeconds";
+    protected static final String READ_REORDER_THRESHOLD_PENDING_REQUESTS = "readReorderThresholdPendingRequests";
     protected static final String ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES =
         "ensemblePlacementPolicyOrderSlowBookies";
 
@@ -1126,6 +1127,30 @@ public ClientConfiguration setEnsemblePlacementPolicy(Class<? extends EnsemblePl
         return this;
     }
 
+    /**
+     * Get the threshold for the number of pending requests beyond which to reorder
+     * reads. If &lt;= zero, this feature is turned off.
+     *
+     * @return the threshold for the number of pending requests beyond which to
+     *         reorder reads.
+     */
+    public int getReorderThresholdPendingRequests() {
+        return getInt(READ_REORDER_THRESHOLD_PENDING_REQUESTS, 0);
+    }
+
+    /**
+     * Set the threshold for the number of pending requests beyond which to reorder
+     * reads. If zero, this feature is turned off.
+     *
+     * @param threshold
+     *            The threshold for the number of pending requests beyond which to
+     *            reorder reads.
+     */
+    public ClientConfiguration setReorderThresholdPendingRequests(int threshold) {
+        setProperty(READ_REORDER_THRESHOLD_PENDING_REQUESTS, threshold);
+        return this;
+    }
+
     /**
      * Get the network topology stabilize period in seconds. if it is zero, this feature is turned off.
      *
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index ad9c9c88f..cdff67902 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -372,6 +372,172 @@ public void testNodeDownAndReadOnlyAndNodeSlow() throws Exception {
         assertEquals(expectedSet, reorderSet);
     }
 
+    /*
+     * Tests the reordering of the writeSet based on number of pending requests.
+     * Expect the third bookie to be placed first since its number of pending requests
+     * is READ_REORDER_THRESHOLD_PENDING_REQUESTS=10 less than the originally first bookie.
+     */
+    @Test
+    public void testPendingRequestsReorder() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
+        conf.setReorderThresholdPendingRequests(10);
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 20L);
+        bookiePendingMap.put(addr2, 7L);
+        bookiePendingMap.put(addr3, 1L); // best bookie -> this one first
+        bookiePendingMap.put(addr4, 5L);
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 0, 1, 3);
+        LOG.info("reorder set : {}", reorderSet);
+        assertEquals("expect bookie idx 2 first", expectedSet, reorderSet);
+    }
+
+    /*
+     * Tests the reordering of the writeSet based on number of pending requests for
+     * an ensemble that is larger than the writeSet.
+     * Expect the sixth bookie to be placed first since its number of pending requests
+     * is READ_REORDER_THRESHOLD_PENDING_REQUESTS=10 less than the originally first bookie.
+     */
+    @Test
+    public void testPendingRequestsReorderLargeEnsemble() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
+        conf.setReorderThresholdPendingRequests(10);
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181);
+        BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.7", 3181);
+        BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.8", 3181);
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        addrs.add(addr5);
+        addrs.add(addr6);
+        addrs.add(addr7);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L); // not in write set
+        bookiePendingMap.put(addr2, 20L);
+        bookiePendingMap.put(addr3, 0L); // not in write set
+        bookiePendingMap.put(addr4, 12L);
+        bookiePendingMap.put(addr5, 9L); // not in write set
+        bookiePendingMap.put(addr6, 2L); // best bookie -> this one first
+        bookiePendingMap.put(addr7, 10L);
+        ArrayList<BookieSocketAddress> ensemble = new ArrayList<BookieSocketAddress>();
+        ensemble.add(addr1);
+        ensemble.add(addr2);
+        ensemble.add(addr3);
+        ensemble.add(addr4);
+        ensemble.add(addr5);
+        ensemble.add(addr6);
+        ensemble.add(addr7);
+
+        DistributionSchedule.WriteSet writeSet = writeSetFromValues(1, 3, 5, 6);
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+                ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(5, 1, 3, 6);
+        LOG.info("reorder set : {}", reorderSet);
+        assertEquals("expect bookie idx 5 first", expectedSet, reorderSet);
+    }
+
+    /*
+     * Tests the reordering of the writeSet based on number of pending requests.
+     * Expect no reordering in this case since the currently first bookie's number of
+     * pending requests is less than READ_REORDER_THRESHOLD_PENDING_REQUESTS=10 lower
+     * than the best bookie.
+     */
+    @Test
+    public void testPendingRequestsNoReorder1() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
+        conf.setReorderThresholdPendingRequests(10);
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 10L); // -> this one first
+        bookiePendingMap.put(addr2, 7L);
+        bookiePendingMap.put(addr3, 1L); // best bookie, but below threshold
+        bookiePendingMap.put(addr4, 5L);
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
+        LOG.info("reorder set : {}", reorderSet);
+        assertEquals("writeSet should be in original order", origWriteSet, reorderSet);
+    }
+
+    /*
+     * Tests the reordering of the writeSet based on number of pending requests.
+     * Expect no reordering in this case since the currently first bookie's number of
+     * pending requests is lowest among all bookies already.
+     */
+    @Test
+    public void testPendingRequestsNoReorder2() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
+        conf.setReorderThresholdPendingRequests(10);
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L); // -> this one first
+        bookiePendingMap.put(addr2, 7L);
+        bookiePendingMap.put(addr3, 1L);
+        bookiePendingMap.put(addr4, 5L);
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
+        LOG.info("reorder set : {}", reorderSet);
+        assertEquals("writeSet should be in original order", origWriteSet, reorderSet);
+    }
+
     @Test
     public void testReplaceBookieWithEnoughBookiesInSameRack() throws Exception {
         BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services