You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2019/01/09 00:12:52 UTC

[bookkeeper] branch master updated: Allow to configure sticky reads

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c85877  Allow to configure sticky reads
4c85877 is described below

commit 4c8587715104cde6dbd70d1e7de0fc1853122eda
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jan 8 16:12:48 2019 -0800

    Allow to configure sticky reads
    
    ### Motivation
    
    Currently the BK client is issuing the read requests in round-robin fashion across all the bookies in the write set.
    
    One issue with this approach is that it's not taking full advantage of the read-ahead cache, either explicit (like in `DbLedgerStorage`) or implicit (by reading data through Linux page cache which will do some prefetching).
    
    With `e=2`, `w=2`, when we read `e-0` from `bookie-1` and `e-1` from `bookie-2`, we fail to take advantage of the fact that `bookie-1` will have already `e-1` in memory.
    
    Effectively with `e-2`, `w-2` the disk read IO will be doubled, compared to the amount of data served to BK clients. The larger the quorum, the bigger will be overhead (eg: `e=5`, `w=5` will lead to 5x reads from disk).
    
    ### Changes
    
    Added a BK client flag for "sticky reads". When reading from a ledger that has `E=W` (every bookie has all the entries), the sticky reads will direct all read request to 1 single bookie in the ensemble.
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1808 from merlimat/single-bookie-per-ledger-read-master
---
 .../bookkeeper/client/ClientInternalConf.java      |   2 +
 .../bookkeeper/client/EnsemblePlacementPolicy.java |  35 ++++
 .../org/apache/bookkeeper/client/LedgerHandle.java |  65 +++++++
 .../apache/bookkeeper/client/PendingReadOp.java    |   9 +-
 .../bookkeeper/conf/ClientConfiguration.java       |  29 +++
 .../bookkeeper/bookie/BookieStickyReadsTest.java   | 213 +++++++++++++++++++++
 6 files changed, 350 insertions(+), 3 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
index ac56a1f..da79108 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
@@ -41,6 +41,7 @@ class ClientInternalConf {
     final long addEntryQuorumTimeoutNanos;
     final boolean enableParallelRecoveryRead;
     final boolean enableReorderReadSequence;
+    final boolean enableStickyReads;
     final int recoveryReadBatchSize;
     final int throttleValue;
     final int bookieFailureHistoryExpirationMSec;
@@ -80,6 +81,7 @@ class ClientInternalConf {
         this.timeoutMonitorIntervalSec = conf.getTimeoutMonitorIntervalSec();
         this.enableBookieFailureTracking = conf.getEnableBookieFailureTracking();
         this.useV2WireProtocol = conf.getUseV2WireProtocol();
+        this.enableStickyReads = conf.isStickyReadsEnabled();
 
         if (conf.getFirstSpeculativeReadTimeout() > 0) {
             this.readSpeculativeRequestPolicy =
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index e185964..23932a3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -22,9 +22,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.DistributionSchedule.WriteSet;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -351,4 +354,36 @@ public interface EnsemblePlacementPolicy {
      */
     default void updateBookieInfo(Map<BookieSocketAddress, BookieInfo> bookieInfoMap) {
     }
+
+    /**
+     * Select one bookie to the "sticky" bookie where all reads for a particular
+     * ledger will be directed to.
+     *
+     * <p>The default implementation will pick a bookie randomly from the ensemble.
+     * Other placement policies will be able to do better decisions based on
+     * additional informations (eg: rack or region awareness).
+     *
+     * @param metadata
+     *            the {@link LedgerMetadata} object
+     * @param currentStickyBookieIndex
+     *            if we are changing the sticky bookie after a read failure, the
+     *            current sticky bookie is passed in so that we will avoid
+     *            choosing it again
+     * @return the index, within the ensemble of the bookie chosen as the sticky
+     *         bookie
+     *
+     * @since 4.9
+     */
+    default int getStickyReadBookieIndex(LedgerMetadata metadata, Optional<Integer> currentStickyBookieIndex) {
+        if (!currentStickyBookieIndex.isPresent()) {
+            // Pick one bookie randomly from the current ensemble as the initial
+            // "sticky bookie"
+            return ThreadLocalRandom.current().nextInt() % metadata.getEnsembleSize();
+        } else {
+            // When choosing a new sticky bookie index (eg: after the current
+            // one has read failures), by default we pick the next one in the
+            // ensemble, to avoid picking up the same one again.
+            return (currentStickyBookieIndex.get() + 1) % metadata.getEnsembleSize();
+        }
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index bd8ec68..7e21f97 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -42,6 +42,7 @@ import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -58,6 +59,7 @@ import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
 import org.apache.bookkeeper.client.BKException.BKIncorrectParameterException;
 import org.apache.bookkeeper.client.BKException.BKReadException;
+import org.apache.bookkeeper.client.DistributionSchedule.WriteSet;
 import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmed;
 import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmedAndEntry;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncAddCallback;
@@ -120,6 +122,18 @@ public class LedgerHandle implements WriteHandle {
       */
     volatile long pendingAddsSequenceHead;
 
+    /**
+     * If bookie sticky reads are enabled, this will contain the index of the bookie
+     * selected as "sticky" for this ledger. The bookie is chosen at random when the
+     * LedgerHandle is created.
+     *
+     * <p>In case of failures, the bookie index will be updated (to the next bookie in
+     * the ensemble) to avoid continuing to attempt to read from a failed bookie.
+     *
+     * <p>If the index is -1, it means the sticky reads are disabled.
+     */
+    private int stickyBookieIndex;
+
     long length;
     final DigestManager macManager;
     final DistributionSchedule distributionSchedule;
@@ -181,6 +195,13 @@ public class LedgerHandle implements WriteHandle {
 
         this.ledgerId = ledgerId;
 
+        if (clientCtx.getConf().enableStickyReads
+                && getLedgerMetadata().getEnsembleSize() == getLedgerMetadata().getWriteQuorumSize()) {
+            stickyBookieIndex = clientCtx.getPlacementPolicy().getStickyReadBookieIndex(metadata, Optional.empty());
+        } else {
+            stickyBookieIndex = -1;
+        }
+
         if (clientCtx.getConf().throttleValue > 0) {
             this.throttler = RateLimiter.create(clientCtx.getConf().throttleValue);
         } else {
@@ -237,6 +258,21 @@ public class LedgerHandle implements WriteHandle {
         initializeWriteHandleState();
     }
 
+    /**
+     * Notify the LedgerHandle that a read operation was failed on a particular bookie.
+     */
+    void recordReadErrorOnBookie(int bookieIndex) {
+        // If sticky bookie reads are enabled, switch the sticky bookie to the
+        // next bookie in the ensemble so that we avoid to keep reading from the
+        // same failed bookie
+        if (stickyBookieIndex != -1) {
+            // This will be idempotent when we have multiple read errors on the
+            // same bookie. The net result is that we just go to the next bookie
+            stickyBookieIndex = clientCtx.getPlacementPolicy().getStickyReadBookieIndex(getLedgerMetadata(),
+                    Optional.of(bookieIndex));
+        }
+    }
+
     protected void initializeWriteHandleState() {
         if (clientCtx.getConf().explicitLacInterval > 0) {
             explicitLacFlushPolicy = new ExplicitLacFlushPolicy.ExplicitLacFlushPolicyImpl(
@@ -1978,4 +2014,33 @@ public class LedgerHandle implements WriteHandle {
         // becomes a property of the LedgerHandle itself.
         return LedgerMetadataUtils.getCurrentEnsemble(versionedMetadata.getValue());
     }
+
+    /**
+     * Return a {@link WriteSet} suitable for reading a particular entry.
+     * This will include all bookies that are cotna
+     */
+    WriteSet getWriteSetForReadOperation(long entryId) {
+        if (stickyBookieIndex != -1) {
+            // When sticky reads are enabled we want to make sure to take
+            // advantage of read-ahead (or, anyway, from efficiencies in
+            // reading sequential data from disk through the page cache).
+            // For this, all the entries that a given bookie prefetches,
+            // should read from that bookie.
+            // For example, with e=2, w=2, a=2 we would have
+            // B-1 B-2
+            // e-0 X X
+            // e-1 X X
+            // e-2 X X
+            //
+            // In this case we want all the requests to be issued to B-1 (by
+            // preference), so that cache hits will be maximized.
+            //
+            // We can only enable sticky reads if the ensemble==writeQuorum
+            // otherwise the same bookie will not have all the entries
+            // stored
+            return distributionSchedule.getWriteSet(stickyBookieIndex);
+        } else {
+            return distributionSchedule.getWriteSet(entryId);
+        }
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index e820660..65a3d76 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -98,12 +98,13 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
             this.eId = eId;
 
             if (clientCtx.getConf().enableReorderReadSequence) {
-                writeSet = clientCtx.getPlacementPolicy().reorderReadSequence(
+                writeSet = clientCtx.getPlacementPolicy()
+                    .reorderReadSequence(
                             ensemble,
                             lh.getBookiesHealthInfo(),
-                            lh.distributionSchedule.getWriteSet(eId));
+                            lh.getWriteSetForReadOperation(eId));
             } else {
-                writeSet = lh.distributionSchedule.getWriteSet(eId);
+                writeSet = lh.getWriteSetForReadOperation(eId);
             }
         }
 
@@ -209,6 +210,8 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
                             errMsg, lh.ledgerId, eId, host);
                 }
             }
+
+            lh.recordReadErrorOnBookie(bookieIndex);
         }
 
         /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 3b390d4..29c6820 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -114,6 +114,7 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
     protected static final String ENABLE_PARALLEL_RECOVERY_READ = "enableParallelRecoveryRead";
     protected static final String RECOVERY_READ_BATCH_SIZE = "recoveryReadBatchSize";
     protected static final String REORDER_READ_SEQUENCE_ENABLED = "reorderReadSequenceEnabled";
+    protected static final String STICKY_READS_ENABLED = "stickyReadSEnabled";
     // Add Parameters
     protected static final String DELAY_ENSEMBLE_CHANGE = "delayEnsembleChange";
     protected static final String MAX_ALLOWED_ENSEMBLE_CHANGES = "maxNumEnsembleChanges";
@@ -1136,6 +1137,34 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
     }
 
     /**
+     * If read operation should be sticky to a single bookie or not.
+     *
+     * @return true if reorder read sequence is enabled, otherwise false.
+     */
+    public boolean isStickyReadsEnabled() {
+        return getBoolean(STICKY_READS_ENABLED, false);
+    }
+
+    /**
+     * Enable/disable having read operations for a ledger to be sticky to
+     * a single bookie.
+     *
+     * <p>If this flag is enabled, the client will use one single bookie (by
+     * preference) to read all entries for a ledger.
+     *
+     * <p>Having all the read to one bookie will increase the chances that
+     * a read request will be fullfilled by Bookie read cache (or OS file
+     * system cache) when doing sequential reads.
+     *
+     * @param enabled the flag to enable/disable sticky reads.
+     * @return client configuration instance.
+     */
+    public ClientConfiguration setStickyReadsEnabled(boolean enabled) {
+        setProperty(STICKY_READS_ENABLED, enabled);
+        return this;
+    }
+
+    /**
      * Get Ensemble Placement Policy Class.
      *
      * @return ensemble placement policy class.
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStickyReadsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStickyReadsTest.java
new file mode 100644
index 0000000..a23b0e5
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStickyReadsTest.java
@@ -0,0 +1,213 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.beust.jcommander.internal.Lists;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+/**
+ * Tests of the main BookKeeper client.
+ */
+@Slf4j
+public class BookieStickyReadsTest extends BookKeeperClusterTestCase {
+
+    private static final int NUM_BOOKIES = 3;
+
+    private static final String READ_ENTRY_REQUEST_METRIC = "bookkeeper_server.READ_ENTRY_REQUEST";
+
+    public BookieStickyReadsTest() {
+        super(NUM_BOOKIES);
+    }
+
+    @Test
+    public void testNormalReads() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration(baseClientConf);
+
+        // Default should already be set to false
+        // conf.setStickyReadsEnabled(false);
+
+        writeAndReadEntries(conf, 3, 3, 3);
+
+        // All bookies should have received at least some read request
+        getBookieReadRequestStats().values().forEach(readRequests -> assertTrue(readRequests > 0));
+    }
+
+    @Test
+    public void testStickyFlagWithStriping() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration(baseClientConf);
+        conf.setStickyReadsEnabled(true);
+
+        writeAndReadEntries(conf, 3, 2, 2);
+
+        // All bookies should have received at least some read request since we
+        // don't enable sticky reads when striping is enabled
+        getBookieReadRequestStats().values().forEach(readRequests -> assertTrue(readRequests > 0));
+    }
+
+    @Test
+    public void stickyReadsWithNoFailures() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration(baseClientConf);
+        conf.setStickyReadsEnabled(true);
+
+        writeAndReadEntries(conf, 3, 3, 3);
+
+        // All read requests should have been made to a single bookie
+        Map<Integer, Long> stats = getBookieReadRequestStats();
+        boolean foundBookieWithRequests = false;
+        for (long readRequests : stats.values()) {
+            if (readRequests > 0) {
+                assertFalse("Another bookie already had received requests", foundBookieWithRequests);
+                foundBookieWithRequests = true;
+            }
+        }
+    }
+
+    @Test
+    public void stickyReadsWithFailures() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration(baseClientConf);
+        conf.setStickyReadsEnabled(true);
+
+        @Cleanup
+        BookKeeper bkc = new BookKeeper(conf);
+
+        final int n = 10;
+        long ledgerId;
+
+        try (WriteHandle wh = bkc.newCreateLedgerOp()
+                .withEnsembleSize(3)
+                .withWriteQuorumSize(3)
+                .withAckQuorumSize(3)
+                .withPassword("".getBytes())
+                .execute()
+                .join()) {
+            ledgerId = wh.getId();
+
+            for (int i = 0; i < n; i++) {
+                wh.append(("entry-" + i).getBytes());
+            }
+        }
+
+        @Cleanup
+        ReadHandle rh = bkc.newOpenLedgerOp()
+                .withLedgerId(ledgerId)
+                .withPassword("".getBytes())
+                .execute()
+                .join();
+
+        // Read 1 entry and detect which bookie was being used
+        @Cleanup
+        LedgerEntries entry0 = rh.read(0, 0);
+        assertArrayEquals("entry-0".getBytes(), entry0.getEntry(0).getEntryBytes());
+
+        // All read requests should have been made to a single bookie
+        int bookieWithRequests = -1;
+        for (int i = 0; i < NUM_BOOKIES; i++) {
+            long requests = getStatsProvider(i).getOpStatsLogger(READ_ENTRY_REQUEST_METRIC)
+                    .getSuccessCount();
+
+            log.info("Bookie {} --- requests: {}", i, requests);
+
+            if (requests > 0) {
+                assertTrue("Another bookie already had received requests", bookieWithRequests == -1);
+                bookieWithRequests = i;
+            }
+        }
+
+        // Suspend the sticky bookie. Reads should now go to a different sticky
+        // bookie
+        bs.get(bookieWithRequests).suspendProcessing();
+
+        for (int i = 0; i < n; i++) {
+            @Cleanup
+            LedgerEntries entries = rh.read(i, i);
+
+            assertArrayEquals(("entry-" + i).getBytes(), entries.getEntry(i).getEntryBytes());
+        }
+
+        // At this point, we should have 1 bookie with 1 request (the initial
+        // request), and a second bookie with 10 requests. The 3rd bookie should
+        // have no requests
+        List<Long> requestCounts = Lists.newArrayList(getBookieReadRequestStats().values());
+        Collections.sort(requestCounts);
+
+        assertEquals(0, requestCounts.get(0).longValue());
+        assertEquals(1, requestCounts.get(1).longValue());
+        assertEquals(10, requestCounts.get(2).longValue());
+    }
+
+    private Map<Integer, Long> getBookieReadRequestStats() throws Exception {
+        Map<Integer, Long> stats = new TreeMap<>();
+        for (int i = 0; i < NUM_BOOKIES; i++) {
+            stats.put(i, getStatsProvider(i).getOpStatsLogger(READ_ENTRY_REQUEST_METRIC)
+                    .getSuccessCount());
+        }
+
+        return stats;
+    }
+
+    private void writeAndReadEntries(ClientConfiguration conf, int ensembleSize, int writeQuorum, int ackQuorum)
+            throws Exception {
+        @Cleanup
+        BookKeeper bkc = new BookKeeper(conf);
+
+        @Cleanup
+        WriteHandle wh = bkc.newCreateLedgerOp()
+                .withEnsembleSize(ensembleSize)
+                .withWriteQuorumSize(writeQuorum)
+                .withAckQuorumSize(ackQuorum)
+                .withPassword("".getBytes())
+                .execute()
+                .join();
+
+        final int n = 10;
+
+        for (int i = 0; i < n; i++) {
+            wh.append(("entry-" + i).getBytes());
+        }
+
+        for (int i = 0; i < n; i++) {
+            @Cleanup
+            LedgerEntries entries = wh.read(i, i);
+
+            assertArrayEquals(("entry-" + i).getBytes(), entries.getEntry(i).getEntryBytes());
+        }
+    }
+}