You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2019/01/09 00:12:52 UTC
[bookkeeper] branch master updated: Allow to configure sticky reads
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 4c85877 Allow to configure sticky reads
4c85877 is described below
commit 4c8587715104cde6dbd70d1e7de0fc1853122eda
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jan 8 16:12:48 2019 -0800
Allow to configure sticky reads
### Motivation
Currently the BK client is issuing the read requests in round-robin fashion across all the bookies in the write set.
One issue with this approach is that it's not taking full advantage of the read-ahead cache, either explicit (like in `DbLedgerStorage`) or implicit (by reading data through Linux page cache which will do some prefetching).
With `e=2`, `w=2`, when we read `e-0` from `bookie-1` and `e-1` from `bookie-2`, we fail to take advantage of the fact that `bookie-1` will have already `e-1` in memory.
Effectively with `e-2`, `w-2` the disk read IO will be doubled, compared to the amount of data served to BK clients. The larger the quorum, the bigger will be overhead (eg: `e=5`, `w=5` will lead to 5x reads from disk).
### Changes
Added a BK client flag for "sticky reads". When reading from a ledger that has `E=W` (every bookie has all the entries), the sticky reads will direct all read request to 1 single bookie in the ensemble.
Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
This closes #1808 from merlimat/single-bookie-per-ledger-read-master
---
.../bookkeeper/client/ClientInternalConf.java | 2 +
.../bookkeeper/client/EnsemblePlacementPolicy.java | 35 ++++
.../org/apache/bookkeeper/client/LedgerHandle.java | 65 +++++++
.../apache/bookkeeper/client/PendingReadOp.java | 9 +-
.../bookkeeper/conf/ClientConfiguration.java | 29 +++
.../bookkeeper/bookie/BookieStickyReadsTest.java | 213 +++++++++++++++++++++
6 files changed, 350 insertions(+), 3 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
index ac56a1f..da79108 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
@@ -41,6 +41,7 @@ class ClientInternalConf {
final long addEntryQuorumTimeoutNanos;
final boolean enableParallelRecoveryRead;
final boolean enableReorderReadSequence;
+ final boolean enableStickyReads;
final int recoveryReadBatchSize;
final int throttleValue;
final int bookieFailureHistoryExpirationMSec;
@@ -80,6 +81,7 @@ class ClientInternalConf {
this.timeoutMonitorIntervalSec = conf.getTimeoutMonitorIntervalSec();
this.enableBookieFailureTracking = conf.getEnableBookieFailureTracking();
this.useV2WireProtocol = conf.getUseV2WireProtocol();
+ this.enableStickyReads = conf.isStickyReadsEnabled();
if (conf.getFirstSpeculativeReadTimeout() > 0) {
this.readSpeculativeRequestPolicy =
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index e185964..23932a3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -22,9 +22,12 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.DistributionSchedule.WriteSet;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -351,4 +354,36 @@ public interface EnsemblePlacementPolicy {
*/
default void updateBookieInfo(Map<BookieSocketAddress, BookieInfo> bookieInfoMap) {
}
+
+ /**
+ * Select one bookie to the "sticky" bookie where all reads for a particular
+ * ledger will be directed to.
+ *
+ * <p>The default implementation will pick a bookie randomly from the ensemble.
+ * Other placement policies will be able to do better decisions based on
+ * additional informations (eg: rack or region awareness).
+ *
+ * @param metadata
+ * the {@link LedgerMetadata} object
+ * @param currentStickyBookieIndex
+ * if we are changing the sticky bookie after a read failure, the
+ * current sticky bookie is passed in so that we will avoid
+ * choosing it again
+ * @return the index, within the ensemble of the bookie chosen as the sticky
+ * bookie
+ *
+ * @since 4.9
+ */
+ default int getStickyReadBookieIndex(LedgerMetadata metadata, Optional<Integer> currentStickyBookieIndex) {
+ if (!currentStickyBookieIndex.isPresent()) {
+ // Pick one bookie randomly from the current ensemble as the initial
+ // "sticky bookie"
+ return ThreadLocalRandom.current().nextInt() % metadata.getEnsembleSize();
+ } else {
+ // When choosing a new sticky bookie index (eg: after the current
+ // one has read failures), by default we pick the next one in the
+ // ensemble, to avoid picking up the same one again.
+ return (currentStickyBookieIndex.get() + 1) % metadata.getEnsembleSize();
+ }
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index bd8ec68..7e21f97 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -42,6 +42,7 @@ import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -58,6 +59,7 @@ import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
import org.apache.bookkeeper.client.BKException.BKIncorrectParameterException;
import org.apache.bookkeeper.client.BKException.BKReadException;
+import org.apache.bookkeeper.client.DistributionSchedule.WriteSet;
import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmed;
import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmedAndEntry;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncAddCallback;
@@ -120,6 +122,18 @@ public class LedgerHandle implements WriteHandle {
*/
volatile long pendingAddsSequenceHead;
+ /**
+ * If bookie sticky reads are enabled, this will contain the index of the bookie
+ * selected as "sticky" for this ledger. The bookie is chosen at random when the
+ * LedgerHandle is created.
+ *
+ * <p>In case of failures, the bookie index will be updated (to the next bookie in
+ * the ensemble) to avoid continuing to attempt to read from a failed bookie.
+ *
+ * <p>If the index is -1, it means the sticky reads are disabled.
+ */
+ private int stickyBookieIndex;
+
long length;
final DigestManager macManager;
final DistributionSchedule distributionSchedule;
@@ -181,6 +195,13 @@ public class LedgerHandle implements WriteHandle {
this.ledgerId = ledgerId;
+ if (clientCtx.getConf().enableStickyReads
+ && getLedgerMetadata().getEnsembleSize() == getLedgerMetadata().getWriteQuorumSize()) {
+ stickyBookieIndex = clientCtx.getPlacementPolicy().getStickyReadBookieIndex(metadata, Optional.empty());
+ } else {
+ stickyBookieIndex = -1;
+ }
+
if (clientCtx.getConf().throttleValue > 0) {
this.throttler = RateLimiter.create(clientCtx.getConf().throttleValue);
} else {
@@ -237,6 +258,21 @@ public class LedgerHandle implements WriteHandle {
initializeWriteHandleState();
}
+ /**
+ * Notify the LedgerHandle that a read operation was failed on a particular bookie.
+ */
+ void recordReadErrorOnBookie(int bookieIndex) {
+ // If sticky bookie reads are enabled, switch the sticky bookie to the
+ // next bookie in the ensemble so that we avoid to keep reading from the
+ // same failed bookie
+ if (stickyBookieIndex != -1) {
+ // This will be idempotent when we have multiple read errors on the
+ // same bookie. The net result is that we just go to the next bookie
+ stickyBookieIndex = clientCtx.getPlacementPolicy().getStickyReadBookieIndex(getLedgerMetadata(),
+ Optional.of(bookieIndex));
+ }
+ }
+
protected void initializeWriteHandleState() {
if (clientCtx.getConf().explicitLacInterval > 0) {
explicitLacFlushPolicy = new ExplicitLacFlushPolicy.ExplicitLacFlushPolicyImpl(
@@ -1978,4 +2014,33 @@ public class LedgerHandle implements WriteHandle {
// becomes a property of the LedgerHandle itself.
return LedgerMetadataUtils.getCurrentEnsemble(versionedMetadata.getValue());
}
+
+ /**
+ * Return a {@link WriteSet} suitable for reading a particular entry.
+ * This will include all bookies that are cotna
+ */
+ WriteSet getWriteSetForReadOperation(long entryId) {
+ if (stickyBookieIndex != -1) {
+ // When sticky reads are enabled we want to make sure to take
+ // advantage of read-ahead (or, anyway, from efficiencies in
+ // reading sequential data from disk through the page cache).
+ // For this, all the entries that a given bookie prefetches,
+ // should read from that bookie.
+ // For example, with e=2, w=2, a=2 we would have
+ // B-1 B-2
+ // e-0 X X
+ // e-1 X X
+ // e-2 X X
+ //
+ // In this case we want all the requests to be issued to B-1 (by
+ // preference), so that cache hits will be maximized.
+ //
+ // We can only enable sticky reads if the ensemble==writeQuorum
+ // otherwise the same bookie will not have all the entries
+ // stored
+ return distributionSchedule.getWriteSet(stickyBookieIndex);
+ } else {
+ return distributionSchedule.getWriteSet(entryId);
+ }
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index e820660..65a3d76 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -98,12 +98,13 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
this.eId = eId;
if (clientCtx.getConf().enableReorderReadSequence) {
- writeSet = clientCtx.getPlacementPolicy().reorderReadSequence(
+ writeSet = clientCtx.getPlacementPolicy()
+ .reorderReadSequence(
ensemble,
lh.getBookiesHealthInfo(),
- lh.distributionSchedule.getWriteSet(eId));
+ lh.getWriteSetForReadOperation(eId));
} else {
- writeSet = lh.distributionSchedule.getWriteSet(eId);
+ writeSet = lh.getWriteSetForReadOperation(eId);
}
}
@@ -209,6 +210,8 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
errMsg, lh.ledgerId, eId, host);
}
}
+
+ lh.recordReadErrorOnBookie(bookieIndex);
}
/**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 3b390d4..29c6820 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -114,6 +114,7 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
protected static final String ENABLE_PARALLEL_RECOVERY_READ = "enableParallelRecoveryRead";
protected static final String RECOVERY_READ_BATCH_SIZE = "recoveryReadBatchSize";
protected static final String REORDER_READ_SEQUENCE_ENABLED = "reorderReadSequenceEnabled";
+ protected static final String STICKY_READS_ENABLED = "stickyReadSEnabled";
// Add Parameters
protected static final String DELAY_ENSEMBLE_CHANGE = "delayEnsembleChange";
protected static final String MAX_ALLOWED_ENSEMBLE_CHANGES = "maxNumEnsembleChanges";
@@ -1136,6 +1137,34 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
}
/**
+ * If read operation should be sticky to a single bookie or not.
+ *
+ * @return true if reorder read sequence is enabled, otherwise false.
+ */
+ public boolean isStickyReadsEnabled() {
+ return getBoolean(STICKY_READS_ENABLED, false);
+ }
+
+ /**
+ * Enable/disable having read operations for a ledger to be sticky to
+ * a single bookie.
+ *
+ * <p>If this flag is enabled, the client will use one single bookie (by
+ * preference) to read all entries for a ledger.
+ *
+ * <p>Having all the read to one bookie will increase the chances that
+ * a read request will be fullfilled by Bookie read cache (or OS file
+ * system cache) when doing sequential reads.
+ *
+ * @param enabled the flag to enable/disable sticky reads.
+ * @return client configuration instance.
+ */
+ public ClientConfiguration setStickyReadsEnabled(boolean enabled) {
+ setProperty(STICKY_READS_ENABLED, enabled);
+ return this;
+ }
+
+ /**
* Get Ensemble Placement Policy Class.
*
* @return ensemble placement policy class.
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStickyReadsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStickyReadsTest.java
new file mode 100644
index 0000000..a23b0e5
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStickyReadsTest.java
@@ -0,0 +1,213 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.beust.jcommander.internal.Lists;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+/**
+ * Tests of the main BookKeeper client.
+ */
+@Slf4j
+public class BookieStickyReadsTest extends BookKeeperClusterTestCase {
+
+ private static final int NUM_BOOKIES = 3;
+
+ private static final String READ_ENTRY_REQUEST_METRIC = "bookkeeper_server.READ_ENTRY_REQUEST";
+
+ public BookieStickyReadsTest() {
+ super(NUM_BOOKIES);
+ }
+
+ @Test
+ public void testNormalReads() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration(baseClientConf);
+
+ // Default should already be set to false
+ // conf.setStickyReadsEnabled(false);
+
+ writeAndReadEntries(conf, 3, 3, 3);
+
+ // All bookies should have received at least some read request
+ getBookieReadRequestStats().values().forEach(readRequests -> assertTrue(readRequests > 0));
+ }
+
+ @Test
+ public void testStickyFlagWithStriping() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration(baseClientConf);
+ conf.setStickyReadsEnabled(true);
+
+ writeAndReadEntries(conf, 3, 2, 2);
+
+ // All bookies should have received at least some read request since we
+ // don't enable sticky reads when striping is enabled
+ getBookieReadRequestStats().values().forEach(readRequests -> assertTrue(readRequests > 0));
+ }
+
+ @Test
+ public void stickyReadsWithNoFailures() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration(baseClientConf);
+ conf.setStickyReadsEnabled(true);
+
+ writeAndReadEntries(conf, 3, 3, 3);
+
+ // All read requests should have been made to a single bookie
+ Map<Integer, Long> stats = getBookieReadRequestStats();
+ boolean foundBookieWithRequests = false;
+ for (long readRequests : stats.values()) {
+ if (readRequests > 0) {
+ assertFalse("Another bookie already had received requests", foundBookieWithRequests);
+ foundBookieWithRequests = true;
+ }
+ }
+ }
+
+ @Test
+ public void stickyReadsWithFailures() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration(baseClientConf);
+ conf.setStickyReadsEnabled(true);
+
+ @Cleanup
+ BookKeeper bkc = new BookKeeper(conf);
+
+ final int n = 10;
+ long ledgerId;
+
+ try (WriteHandle wh = bkc.newCreateLedgerOp()
+ .withEnsembleSize(3)
+ .withWriteQuorumSize(3)
+ .withAckQuorumSize(3)
+ .withPassword("".getBytes())
+ .execute()
+ .join()) {
+ ledgerId = wh.getId();
+
+ for (int i = 0; i < n; i++) {
+ wh.append(("entry-" + i).getBytes());
+ }
+ }
+
+ @Cleanup
+ ReadHandle rh = bkc.newOpenLedgerOp()
+ .withLedgerId(ledgerId)
+ .withPassword("".getBytes())
+ .execute()
+ .join();
+
+ // Read 1 entry and detect which bookie was being used
+ @Cleanup
+ LedgerEntries entry0 = rh.read(0, 0);
+ assertArrayEquals("entry-0".getBytes(), entry0.getEntry(0).getEntryBytes());
+
+ // All read requests should have been made to a single bookie
+ int bookieWithRequests = -1;
+ for (int i = 0; i < NUM_BOOKIES; i++) {
+ long requests = getStatsProvider(i).getOpStatsLogger(READ_ENTRY_REQUEST_METRIC)
+ .getSuccessCount();
+
+ log.info("Bookie {} --- requests: {}", i, requests);
+
+ if (requests > 0) {
+ assertTrue("Another bookie already had received requests", bookieWithRequests == -1);
+ bookieWithRequests = i;
+ }
+ }
+
+ // Suspend the sticky bookie. Reads should now go to a different sticky
+ // bookie
+ bs.get(bookieWithRequests).suspendProcessing();
+
+ for (int i = 0; i < n; i++) {
+ @Cleanup
+ LedgerEntries entries = rh.read(i, i);
+
+ assertArrayEquals(("entry-" + i).getBytes(), entries.getEntry(i).getEntryBytes());
+ }
+
+ // At this point, we should have 1 bookie with 1 request (the initial
+ // request), and a second bookie with 10 requests. The 3rd bookie should
+ // have no requests
+ List<Long> requestCounts = Lists.newArrayList(getBookieReadRequestStats().values());
+ Collections.sort(requestCounts);
+
+ assertEquals(0, requestCounts.get(0).longValue());
+ assertEquals(1, requestCounts.get(1).longValue());
+ assertEquals(10, requestCounts.get(2).longValue());
+ }
+
+ private Map<Integer, Long> getBookieReadRequestStats() throws Exception {
+ Map<Integer, Long> stats = new TreeMap<>();
+ for (int i = 0; i < NUM_BOOKIES; i++) {
+ stats.put(i, getStatsProvider(i).getOpStatsLogger(READ_ENTRY_REQUEST_METRIC)
+ .getSuccessCount());
+ }
+
+ return stats;
+ }
+
+ private void writeAndReadEntries(ClientConfiguration conf, int ensembleSize, int writeQuorum, int ackQuorum)
+ throws Exception {
+ @Cleanup
+ BookKeeper bkc = new BookKeeper(conf);
+
+ @Cleanup
+ WriteHandle wh = bkc.newCreateLedgerOp()
+ .withEnsembleSize(ensembleSize)
+ .withWriteQuorumSize(writeQuorum)
+ .withAckQuorumSize(ackQuorum)
+ .withPassword("".getBytes())
+ .execute()
+ .join();
+
+ final int n = 10;
+
+ for (int i = 0; i < n; i++) {
+ wh.append(("entry-" + i).getBytes());
+ }
+
+ for (int i = 0; i < n; i++) {
+ @Cleanup
+ LedgerEntries entries = wh.read(i, i);
+
+ assertArrayEquals(("entry-" + i).getBytes(), entries.getEntry(i).getEntryBytes());
+ }
+ }
+}