You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ay...@apache.org on 2018/06/01 23:28:32 UTC
[bookkeeper] branch master updated: Issue #1409: Added server side
backpressure (@bug W-3651831@) (#1410)
This is an automated email from the ASF dual-hosted git repository.
ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 319f761 Issue #1409: Added server side backpressure (@bug W-3651831@) (#1410)
319f761 is described below
commit 319f761ca1769d0b35aa70c99b3a1ea8795acaf8
Author: Andrey Yegorov <dl...@users.noreply.github.com>
AuthorDate: Fri Jun 1 16:28:17 2018 -0700
Issue #1409: Added server side backpressure (@bug W-3651831@) (#1410)
(@bug W-3651831@) backpressure: server-side backpressure
---
.../bookkeeper/bookie/BookKeeperServerStats.java | 8 +
.../apache/bookkeeper/bookie/EntryMemTable.java | 38 +-
.../bookie/EntryMemTableWithParallelFlusher.java | 1 +
.../java/org/apache/bookkeeper/bookie/Journal.java | 23 +-
.../apache/bookkeeper/bookie/JournalChannel.java | 18 +-
.../apache/bookkeeper/bookie/LedgerStorage.java | 6 +
.../bookkeeper/bookie/SlowBufferedChannel.java | 93 ++++
.../bookkeeper/bookie/SortedLedgerStorage.java | 125 +++++-
.../bookkeeper/conf/ClientConfiguration.java | 1 +
.../bookkeeper/conf/ServerConfiguration.java | 152 ++++++-
.../bookkeeper/processor/RequestProcessor.java | 1 -
.../apache/bookkeeper/proto/BookieNettyServer.java | 5 +
.../bookkeeper/proto/BookieRequestProcessor.java | 196 +++++++++
.../org/apache/bookkeeper/proto/BookieServer.java | 11 +
.../bookkeeper/proto/GetBookieInfoProcessorV3.java | 10 +-
.../bookkeeper/proto/PacketProcessorBaseV3.java | 36 +-
.../bookkeeper/proto/PerChannelBookieClient.java | 18 +-
.../bookkeeper/proto/ReadEntryProcessorV3.java | 3 +
.../bookkeeper/proto/WriteEntryProcessorV3.java | 8 +
.../org/apache/bookkeeper/util/StringUtils.java | 18 +
.../org/apache/bookkeeper/bookie/EntryLogTest.java | 5 +-
.../apache/bookkeeper/bookie/LedgerCacheTest.java | 9 +-
.../bookie/SlowInterleavedLedgerStorage.java | 140 ++++++
.../bookie/SlowSortedLedgerStorage.java} | 31 +-
.../bookie/SortedLedgerStorageCheckpointTest.java | 6 +-
.../bookkeeper/proto/BookieBackpressureTest.java | 469 +++++++++++++++++++++
.../proto/ForceLedgerProcessorV3Test.java | 1 +
.../proto/TestBookieRequestProcessor.java | 8 +-
.../proto/WriteEntryProcessorV3Test.java | 40 ++
.../main/resources/bookkeeper/findbugsExclude.xml | 6 +
30 files changed, 1394 insertions(+), 92 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index d488bc9..19db7e9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -77,6 +77,13 @@ public interface BookKeeperServerStats {
String BOOKIE_ADD_ENTRY_BYTES = "BOOKIE_ADD_ENTRY_BYTES";
String BOOKIE_READ_ENTRY_BYTES = "BOOKIE_READ_ENTRY_BYTES";
+ String ADD_ENTRY_IN_PROGRESS = "ADD_ENTRY_IN_PROGRESS";
+ String ADD_ENTRY_BLOCKED = "ADD_ENTRY_BLOCKED";
+ String ADD_ENTRY_BLOCKED_WAIT = "ADD_ENTRY_BLOCKED_WAIT";
+ String READ_ENTRY_IN_PROGRESS = "READ_ENTRY_IN_PROGRESS";
+ String READ_ENTRY_BLOCKED = "READ_ENTRY_BLOCKED";
+ String READ_ENTRY_BLOCKED_WAIT = "READ_ENTRY_BLOCKED_WAIT";
+
//
// Journal Stats (scoped under SERVER_SCOPE)
//
@@ -137,6 +144,7 @@ public interface BookKeeperServerStats {
String JOURNAL_NUM_FLUSH_MAX_WAIT = "JOURNAL_NUM_FLUSH_MAX_WAIT";
String SKIP_LIST_FLUSH_BYTES = "SKIP_LIST_FLUSH_BYTES";
String SKIP_LIST_THROTTLING = "SKIP_LIST_THROTTLING";
+ String SKIP_LIST_THROTTLING_LATENCY = "SKIP_LIST_THROTTLING_LATENCY";
String READ_LAST_ENTRY_NOENTRY_ERROR = "READ_LAST_ENTRY_NOENTRY_ERROR";
String LEDGER_CACHE_NUM_EVICTED_LEDGERS = "LEDGER_CACHE_NUM_EVICTED_LEDGERS";
String PENDING_GET_FILE_INFO = "PENDING_GET_FILE_INFO";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
index 70f437c..7328398 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
@@ -24,10 +24,12 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_GET_E
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_PUT_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_SNAPSHOT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_THROTTLING;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_THROTTLING_LATENCY;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -103,6 +105,7 @@ public class EntryMemTable implements AutoCloseable{
final AtomicLong size;
final long skipListSizeLimit;
+ final Semaphore skipListSemaphore;
SkipListArena allocator;
@@ -119,6 +122,7 @@ public class EntryMemTable implements AutoCloseable{
private final OpStatsLogger getEntryStats;
final Counter flushBytesCounter;
private final Counter throttlingCounter;
+ private final OpStatsLogger throttlingStats;
/**
* Constructor.
@@ -136,12 +140,22 @@ public class EntryMemTable implements AutoCloseable{
// skip list size limit
this.skipListSizeLimit = conf.getSkipListSizeLimit();
+ if (skipListSizeLimit > (Integer.MAX_VALUE - 1) / 2) {
+ // gives 2*1023MB for mem table.
+ // consider a way to create semaphore with long num of permits
+ // until that 1023MB should be enough for everything (tm)
+ throw new IllegalArgumentException("skiplist size over " + ((Integer.MAX_VALUE - 1) / 2));
+ }
+ // double the size for snapshot in progress + incoming data
+ this.skipListSemaphore = new Semaphore((int) skipListSizeLimit * 2);
+
// Stats
this.snapshotStats = statsLogger.getOpStatsLogger(SKIP_LIST_SNAPSHOT);
this.putEntryStats = statsLogger.getOpStatsLogger(SKIP_LIST_PUT_ENTRY);
this.getEntryStats = statsLogger.getOpStatsLogger(SKIP_LIST_GET_ENTRY);
this.flushBytesCounter = statsLogger.getCounter(SKIP_LIST_FLUSH_BYTES);
this.throttlingCounter = statsLogger.getCounter(SKIP_LIST_THROTTLING);
+ this.throttlingStats = statsLogger.getOpStatsLogger(SKIP_LIST_THROTTLING_LATENCY);
}
void dump() {
@@ -264,6 +278,7 @@ public class EntryMemTable implements AutoCloseable{
}
}
+ skipListSemaphore.release((int) size);
return size;
}
@@ -286,18 +301,6 @@ public class EntryMemTable implements AutoCloseable{
}
/**
- * Throttling writer w/ 1 ms delay.
- */
- private void throttleWriters() {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- throttlingCounter.inc();
- }
-
- /**
* Write an update.
*
* @param entry
@@ -314,11 +317,18 @@ public class EntryMemTable implements AutoCloseable{
Checkpoint cp = snapshot();
if ((null != cp) || (!previousFlushSucceeded.get())) {
cb.onSizeLimitReached(cp);
- } else {
- throttleWriters();
}
}
+ final int len = entry.remaining();
+ if (!skipListSemaphore.tryAcquire(len)) {
+ throttlingCounter.inc();
+ final long throttlingStartTimeNanos = MathUtils.nowInNano();
+ skipListSemaphore.acquireUninterruptibly(len);
+ throttlingStats.registerSuccessfulEvent(MathUtils.elapsedNanos(throttlingStartTimeNanos),
+ TimeUnit.NANOSECONDS);
+ }
+
this.lock.readLock().lock();
try {
EntryKeyValue toAdd = cloneWithAllocator(ledgerId, entryId, entry);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java
index a3849e9..4f2cf02 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java
@@ -144,6 +144,7 @@ class EntryMemTableWithParallelFlusher extends EntryMemTable {
}
}
}
+ skipListSemaphore.release(flushedSize.intValue());
return flushedSize.longValue();
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 71477c0..1da0435 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -36,6 +36,7 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -77,6 +78,19 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
}
/**
+ * For testability.
+ */
+ @FunctionalInterface
+ public interface BufferedChannelBuilder {
+ BufferedChannelBuilder DEFAULT_BCBUILDER =
+ (FileChannel fc, int capacity) -> new BufferedChannel(fc, capacity);
+
+ BufferedChannel create(FileChannel fc, int capacity) throws IOException;
+ }
+
+
+
+ /**
* List all journal ids by a specified journal id filer.
*
* @param journalDir journal dir
@@ -935,11 +949,14 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
while (true) {
// new journal file to write
if (null == logFile) {
+
logId = logId + 1;
journalCreationWatcher.reset().start();
logFile = new JournalChannel(journalDirectory, logId, journalPreAllocSize, journalWriteBufferSize,
- journalAlignmentSize, removePagesFromCache, journalFormatVersionToWrite);
+ journalAlignmentSize, removePagesFromCache,
+ journalFormatVersionToWrite, getBufferedChannelBuilder());
+
journalCreationStats.registerSuccessfulEvent(
journalCreationWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
@@ -1121,6 +1138,10 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
LOG.info("Journal exited loop!");
}
+ public BufferedChannelBuilder getBufferedChannelBuilder() {
+ return BufferedChannelBuilder.DEFAULT_BCBUILDER;
+ }
+
/**
* Shuts down the journal.
*/
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
index 6d5cae0..507c933 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
@@ -93,7 +93,8 @@ class JournalChannel implements Closeable {
// Open journal for scanning starting from given position.
JournalChannel(File journalDirectory, long logId,
long preAllocSize, int writeBufferSize, long position) throws IOException {
- this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE, position, false, V5);
+ this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE,
+ position, false, V5, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER);
}
// Open journal to write
@@ -101,11 +102,20 @@ class JournalChannel implements Closeable {
long preAllocSize, int writeBufferSize, int journalAlignSize,
boolean fRemoveFromPageCache, int formatVersionToWrite) throws IOException {
this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize,
- START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite);
+ fRemoveFromPageCache, formatVersionToWrite, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER);
+ }
+
+ JournalChannel(File journalDirectory, long logId,
+ long preAllocSize, int writeBufferSize, int journalAlignSize,
+ boolean fRemoveFromPageCache, int formatVersionToWrite,
+ Journal.BufferedChannelBuilder bcBuilder) throws IOException {
+ this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize,
+ START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite, bcBuilder);
}
/**
* Create a journal file.
+ * Allows injection of BufferedChannelBuilder for testing purposes.
*
* @param journalDirectory
* directory to store the journal file.
@@ -128,7 +138,7 @@ class JournalChannel implements Closeable {
private JournalChannel(File journalDirectory, long logId,
long preAllocSize, int writeBufferSize, int journalAlignSize,
long position, boolean fRemoveFromPageCache,
- int formatVersionToWrite) throws IOException {
+ int formatVersionToWrite, Journal.BufferedChannelBuilder bcBuilder) throws IOException {
this.journalAlignSize = journalAlignSize;
this.zeros = ByteBuffer.allocate(journalAlignSize);
this.preAllocSize = preAllocSize - preAllocSize % journalAlignSize;
@@ -160,7 +170,7 @@ class JournalChannel implements Closeable {
bb.clear();
fc.write(bb);
- bc = new BufferedChannel(fc, writeBufferSize);
+ bc = bcBuilder.create(fc, writeBufferSize);
forceWrite(true);
nextPrealloc = this.preAllocSize;
fc.write(zeros, nextPrealloc - journalAlignSize);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index dcfac31..34e32b9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -169,4 +169,10 @@ public interface LedgerStorage {
void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException;
ByteBuf getExplicitLac(long ledgerId);
+
+ // for testability
+ default LedgerStorage getUnderlyingLedgerStorage() {
+ return this;
+ }
+
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java
new file mode 100644
index 0000000..9fdc34c
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java
@@ -0,0 +1,93 @@
+package org.apache.bookkeeper.bookie;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Strictly for testing.
+ * Have to be alongside with prod code for Journal to inject in tests.
+ */
+public class SlowBufferedChannel extends BufferedChannel {
+ public volatile long getDelay = 0;
+ public volatile long addDelay = 0;
+ public volatile long flushDelay = 0;
+
+ public SlowBufferedChannel(FileChannel fc, int capacity) throws IOException {
+ super(fc, capacity);
+ }
+
+ public SlowBufferedChannel(FileChannel fc, int writeCapacity, int readCapacity) throws IOException {
+ super(fc, writeCapacity, readCapacity);
+ }
+
+ public void setAddDelay(long delay) {
+ addDelay = delay;
+ }
+
+ public void setGetDelay(long delay) {
+ getDelay = delay;
+ }
+
+ public void setFlushDelay(long delay) {
+ flushDelay = delay;
+ }
+
+ @Override
+ public synchronized void write(ByteBuf src) throws IOException {
+ delayMs(addDelay);
+ super.write(src);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ delayMs(flushDelay);
+ super.flush();
+ }
+
+ @Override
+ public long forceWrite(boolean forceMetadata) throws IOException {
+ delayMs(flushDelay);
+ return super.forceWrite(forceMetadata);
+ }
+
+ @Override
+ public synchronized int read(ByteBuf dest, long pos) throws IOException {
+ delayMs(getDelay);
+ return super.read(dest, pos);
+ }
+
+ private static void delayMs(long delay) {
+ if (delay < 1) {
+ return;
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(delay);
+ } catch (InterruptedException e) {
+ //noop
+ }
+ }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 815c65e..5c4f75a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
@@ -41,16 +42,23 @@ import org.slf4j.LoggerFactory;
* entries will be first added into a {@code MemTable}, and then be flushed back to the
* {@code InterleavedLedgerStorage} when the {@code MemTable} becomes full.
*/
-public class SortedLedgerStorage extends InterleavedLedgerStorage
- implements LedgerStorage, CacheCallback, SkipListFlusher {
+public class SortedLedgerStorage
+ implements LedgerStorage, CacheCallback, SkipListFlusher,
+ CompactableLedgerStorage, EntryLogger.EntryLogListener {
private static final Logger LOG = LoggerFactory.getLogger(SortedLedgerStorage.class);
EntryMemTable memTable;
private ScheduledExecutorService scheduler;
private StateManager stateManager;
+ private final InterleavedLedgerStorage interleavedLedgerStorage;
public SortedLedgerStorage() {
- super();
+ this(new InterleavedLedgerStorage());
+ }
+
+ @VisibleForTesting
+ protected SortedLedgerStorage(InterleavedLedgerStorage ils) {
+ interleavedLedgerStorage = ils;
}
@Override
@@ -63,7 +71,8 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
Checkpointer checkpointer,
StatsLogger statsLogger)
throws IOException {
- super.initialize(
+
+ interleavedLedgerStorage.initialize(
conf,
ledgerManager,
ledgerDirsManager,
@@ -72,6 +81,7 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
checkpointSource,
checkpointer,
statsLogger);
+
if (conf.isEntryLogPerLedgerEnabled()) {
this.memTable = new EntryMemTableWithParallelFlusher(conf, checkpointSource, statsLogger);
} else {
@@ -96,7 +106,7 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
} catch (IOException e) {
LOG.error("Exception thrown while flushing ledger cache.", e);
}
- super.start();
+ interleavedLedgerStorage.start();
}
@Override
@@ -111,30 +121,50 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
} catch (Exception e) {
LOG.error("Error while closing the memtable", e);
}
- super.shutdown();
+ interleavedLedgerStorage.shutdown();
}
@Override
public boolean ledgerExists(long ledgerId) throws IOException {
// Done this way because checking the skip list is an O(logN) operation compared to
// the O(1) for the ledgerCache.
- if (!super.ledgerExists(ledgerId)) {
+ if (!interleavedLedgerStorage.ledgerExists(ledgerId)) {
EntryKeyValue kv = memTable.getLastEntry(ledgerId);
if (null == kv) {
- return super.ledgerExists(ledgerId);
+ return interleavedLedgerStorage.ledgerExists(ledgerId);
}
}
return true;
}
@Override
+ public boolean setFenced(long ledgerId) throws IOException {
+ return interleavedLedgerStorage.setFenced(ledgerId);
+ }
+
+ @Override
+ public boolean isFenced(long ledgerId) throws IOException {
+ return interleavedLedgerStorage.isFenced(ledgerId);
+ }
+
+ @Override
+ public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
+ interleavedLedgerStorage.setMasterKey(ledgerId, masterKey);
+ }
+
+ @Override
+ public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
+ return interleavedLedgerStorage.readMasterKey(ledgerId);
+ }
+
+ @Override
public long addEntry(ByteBuf entry) throws IOException {
long ledgerId = entry.getLong(entry.readerIndex() + 0);
long entryId = entry.getLong(entry.readerIndex() + 8);
long lac = entry.getLong(entry.readerIndex() + 16);
memTable.addEntry(ledgerId, entryId, entry.nioBuffer(), this);
- ledgerCache.updateLastAddConfirmed(ledgerId, lac);
+ interleavedLedgerStorage.ledgerCache.updateLastAddConfirmed(ledgerId, lac);
return entryId;
}
@@ -149,7 +179,7 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
return kv.getValueAsByteBuffer();
}
// If it doesn't exist in the skip list, then fallback to the ledger cache+index.
- return super.getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
+ return interleavedLedgerStorage.getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
}
@Override
@@ -159,13 +189,13 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
}
ByteBuf buffToRet;
try {
- buffToRet = super.getEntry(ledgerId, entryId);
+ buffToRet = interleavedLedgerStorage.getEntry(ledgerId, entryId);
} catch (Bookie.NoEntryException nee) {
EntryKeyValue kv = memTable.getEntry(ledgerId, entryId);
if (null == kv) {
// The entry might have been flushed since we last checked, so query the ledger cache again.
// If the entry truly doesn't exist, then this will throw a NoEntryException
- buffToRet = super.getEntry(ledgerId, entryId);
+ buffToRet = interleavedLedgerStorage.getEntry(ledgerId, entryId);
} else {
buffToRet = kv.getValueAsByteBuffer();
}
@@ -175,22 +205,55 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
}
@Override
+ public long getLastAddConfirmed(long ledgerId) throws IOException {
+ return interleavedLedgerStorage.getLastAddConfirmed(ledgerId);
+ }
+
+ @Override
+ public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException {
+ return interleavedLedgerStorage.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
+ }
+
+ @Override
public void checkpoint(final Checkpoint checkpoint) throws IOException {
long numBytesFlushed = memTable.flush(this, checkpoint);
- entryLogger.prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
- super.checkpoint(checkpoint);
+ interleavedLedgerStorage.getEntryLogger().prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
+ interleavedLedgerStorage.checkpoint(checkpoint);
+ }
+
+ @Override
+ public void deleteLedger(long ledgerId) throws IOException {
+ interleavedLedgerStorage.deleteLedger(ledgerId);
+ }
+
+ @Override
+ public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
+ interleavedLedgerStorage.registerLedgerDeletionListener(listener);
+ }
+
+ @Override
+ public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
+ interleavedLedgerStorage.setExplicitlac(ledgerId, lac);
+ }
+
+ @Override
+ public ByteBuf getExplicitLac(long ledgerId) {
+ return interleavedLedgerStorage.getExplicitLac(ledgerId);
}
@Override
public void process(long ledgerId, long entryId,
ByteBuf buffer) throws IOException {
- processEntry(ledgerId, entryId, buffer, false);
+ interleavedLedgerStorage.processEntry(ledgerId, entryId, buffer, false);
}
@Override
public void flush() throws IOException {
memTable.flush(this, Checkpoint.MAX);
- super.flush();
+ interleavedLedgerStorage.flush();
}
// CacheCallback functions.
@@ -212,10 +275,10 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
public void run() {
try {
LOG.info("Started flushing mem table.");
- entryLogger.prepareEntryMemTableFlush();
+ interleavedLedgerStorage.getEntryLogger().prepareEntryMemTableFlush();
memTable.flush(SortedLedgerStorage.this);
- if (entryLogger.commitEntryMemTableFlush()) {
- checkpointer.startCheckpoint(cp);
+ if (interleavedLedgerStorage.getEntryLogger().commitEntryMemTableFlush()) {
+ interleavedLedgerStorage.checkpointer.startCheckpoint(cp);
}
} catch (Exception e) {
stateManager.transitionToReadOnlyMode();
@@ -237,4 +300,28 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
return (BookieStateManager) stateManager;
}
+ @Override
+ public EntryLogger getEntryLogger() {
+ return interleavedLedgerStorage.getEntryLogger();
+ }
+
+ @Override
+ public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) throws IOException {
+ return interleavedLedgerStorage.getActiveLedgersInRange(firstLedgerId, lastLedgerId);
+ }
+
+ @Override
+ public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
+ interleavedLedgerStorage.updateEntriesLocations(locations);
+ }
+
+ @Override
+ public void flushEntriesLocationsIndex() throws IOException {
+ interleavedLedgerStorage.flushEntriesLocationsIndex();
+ }
+
+ @Override
+ public LedgerStorage getUnderlyingLedgerStorage() {
+ return interleavedLedgerStorage;
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 1630d1a..9f09c37 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -125,6 +125,7 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
protected static final String PCBC_TIMEOUT_TIMER_NUM_TICKS = "pcbcTimeoutTimerNumTicks";
protected static final String TIMEOUT_TIMER_TICK_DURATION_MS = "timeoutTimerTickDurationMs";
protected static final String TIMEOUT_TIMER_NUM_TICKS = "timeoutTimerNumTicks";
+ // backpressure configuration
protected static final String WAIT_TIMEOUT_ON_BACKPRESSURE = "waitTimeoutOnBackpressureMs";
// Bookie health check settings
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index d51e93d..faf671d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -81,6 +81,12 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
protected static final String JOURNAL_ALIGNMENT_SIZE = "journalAlignmentSize";
protected static final String NUM_JOURNAL_CALLBACK_THREADS = "numJournalCallbackThreads";
protected static final String JOURNAL_FORMAT_VERSION_TO_WRITE = "journalFormatVersionToWrite";
+ // backpressure control
+ protected static final String MAX_ADDS_IN_PROGRESS_LIMIT = "maxAddsInProgressLimit";
+ protected static final String MAX_READS_IN_PROGRESS_LIMIT = "maxReadsInProgressLimit";
+ protected static final String CLOSE_CHANNEL_ON_RESPONSE_TIMEOUT = "closeChannelOnResponseTimeout";
+ protected static final String WAIT_TIMEOUT_ON_RESPONSE_BACKPRESSURE = "waitTimeoutOnResponseBackpressureMs";
+
// Bookie Parameters
protected static final String BOOKIE_PORT = "bookiePort";
protected static final String LISTENING_INTERFACE = "listeningInterface";
@@ -93,10 +99,12 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
protected static final String LEDGER_DIRS = "ledgerDirectories";
protected static final String INDEX_DIRS = "indexDirectories";
protected static final String ALLOW_STORAGE_EXPANSION = "allowStorageExpansion";
- // NIO Parameters
+ // NIO and Netty Parameters
protected static final String SERVER_TCP_NODELAY = "serverTcpNoDelay";
protected static final String SERVER_SOCK_KEEPALIVE = "serverSockKeepalive";
protected static final String SERVER_SOCK_LINGER = "serverTcpLinger";
+ protected static final String SERVER_WRITEBUFFER_LOW_WATER_MARK = "serverWriteBufferLowWaterMark";
+ protected static final String SERVER_WRITEBUFFER_HIGH_WATER_MARK = "serverWriteBufferHighWaterMark";
// Zookeeper Parameters
protected static final String ZK_RETRY_BACKOFF_START_MS = "zkRetryBackoffStartMs";
@@ -635,6 +643,102 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
}
/**
+ * Get max number of adds in progress. 0 == unlimited.
+ *
+ * @return Max number of adds in progress.
+ */
+ public int getMaxAddsInProgressLimit() {
+ return this.getInt(MAX_ADDS_IN_PROGRESS_LIMIT, 0);
+ }
+
+ /**
+ * Set max number of adds in progress. 0 == unlimited.
+ *
+ * @param value
+ * max number of adds in progress.
+ * @return server configuration.
+ */
+ public ServerConfiguration setMaxAddsInProgressLimit(int value) {
+ this.setProperty(MAX_ADDS_IN_PROGRESS_LIMIT, value);
+ return this;
+ }
+
+ /**
+ * Get max number of reads in progress. 0 == unlimited.
+ *
+ * @return Max number of reads in progress.
+ */
+ public int getMaxReadsInProgressLimit() {
+ return this.getInt(MAX_READS_IN_PROGRESS_LIMIT, 0);
+ }
+
+ /**
+ * Set max number of reads in progress. 0 == unlimited.
+ *
+ * @param value
+ * max number of reads in progress.
+ * @return server configuration.
+ */
+ public ServerConfiguration setMaxReadsInProgressLimit(int value) {
+ this.setProperty(MAX_READS_IN_PROGRESS_LIMIT, value);
+ return this;
+ }
+
+ /**
+ * Configures action in case if server timed out sending response to the client.
+ * true == close the channel and drop response
+ * false == drop response
+ * Requires waitTimeoutOnBackpressureMs >= 0 otherwise ignored.
+ *
+ * @return value indicating if channel should be closed.
+ */
+ public boolean getCloseChannelOnResponseTimeout(){
+ return this.getBoolean(CLOSE_CHANNEL_ON_RESPONSE_TIMEOUT, false);
+ }
+
+ /**
+ * Configures action in case if server timed out sending response to the client.
+ * true == close the channel and drop response
+ * false == drop response
+ * Requires waitTimeoutOnBackpressureMs >= 0 otherwise ignored.
+ *
+ * @param value
+ * @return server configuration.
+ */
+ public ServerConfiguration setCloseChannelOnResponseTimeout(boolean value) {
+ this.setProperty(CLOSE_CHANNEL_ON_RESPONSE_TIMEOUT, value);
+ return this;
+ }
+
+ /**
+ * Timeout controlling wait on response send in case of unresponsive client
+ * (i.e. client in long GC etc.)
+ *
+ * @return timeout value
+ * negative value disables the feature
+ * 0 to allow request to fail immediately
+ * Default is -1 (disabled)
+ */
+ public long getWaitTimeoutOnResponseBackpressureMillis() {
+ return getLong(WAIT_TIMEOUT_ON_RESPONSE_BACKPRESSURE, -1);
+ }
+
+ /**
+ * Timeout controlling wait on response send in case of unresponsive client
+ * (i.e. client in long GC etc.)
+ *
+ * @param value
+ * negative value disables the feature
+ * 0 to allow request to fail immediately
+ * Default is -1 (disabled)
+ * @return client configuration.
+ */
+ public ServerConfiguration setWaitTimeoutOnResponseBackpressureMillis(long value) {
+ setProperty(WAIT_TIMEOUT_ON_RESPONSE_BACKPRESSURE, value);
+ return this;
+ }
+
+ /**
* Get bookie port that bookie server listen on.
*
* @return bookie port
@@ -1540,6 +1644,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
/**
* Get skip list data size limitation (default 64MB).
+ * Max value is 1,073,741,823
*
* @return skip list data size limitation
*/
@@ -1554,6 +1659,10 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
* @return server configuration object.
*/
public ServerConfiguration setSkipListSizeLimit(int size) {
+ if (size > (Integer.MAX_VALUE - 1) / 2) {
+ // gives max of 2*1023MB for mem table (one being checkpointed and still writable).
+ throw new IllegalArgumentException("skiplist size over " + ((Integer.MAX_VALUE - 1) / 2));
+ }
setProperty(SKIP_LIST_SIZE_LIMIT, size);
return this;
}
@@ -2750,6 +2859,47 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
}
/**
+ * Get server netty channel write buffer low water mark.
+ *
+ * @return netty channel write buffer low water mark.
+ */
+ public int getServerWriteBufferLowWaterMark() {
+ return getInt(SERVER_WRITEBUFFER_LOW_WATER_MARK, 384 * 1024);
+ }
+
+ /**
+ * Set server netty channel write buffer low water mark.
+ *
+ * @param waterMark
+ * netty channel write buffer low water mark.
+ * @return client configuration.
+ */
+ public ServerConfiguration setServerWriteBufferLowWaterMark(int waterMark) {
+ setProperty(SERVER_WRITEBUFFER_LOW_WATER_MARK, waterMark);
+ return this;
+ }
+
+ /**
+ * Get server netty channel write buffer high water mark.
+ *
+ * @return netty channel write buffer high water mark.
+ */
+ public int getServerWriteBufferHighWaterMark() {
+ return getInt(SERVER_WRITEBUFFER_HIGH_WATER_MARK, 512 * 1024);
+ }
+
+ /**
+ * Set server netty channel write buffer high water mark.
+ *
+ * @param waterMark
+ * netty channel write buffer high water mark.
+ * @return client configuration.
+ */
+ public ServerConfiguration setServerWriteBufferHighWaterMark(int waterMark) {
+ setProperty(SERVER_WRITEBUFFER_HIGH_WATER_MARK, waterMark);
+ return this;
+ }
+ /**
* Set registration manager class.
*
* @param regManagerClass
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
index 2c8cf7a..8b328ef 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
@@ -41,5 +41,4 @@ public interface RequestProcessor extends AutoCloseable {
* channel received the given request <i>r</i>
*/
void processRequest(Object r, Channel channel);
-
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index bc303b9..d687a5c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -35,6 +35,7 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.group.ChannelGroup;
@@ -284,6 +285,8 @@ class BookieNettyServer {
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(),
conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax()));
+ bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
+ conf.getServerWriteBufferLowWaterMark(), conf.getServerWriteBufferHighWaterMark()));
if (eventLoopGroup instanceof EpollEventLoopGroup) {
bootstrap.channel(EpollServerSocketChannel.class);
@@ -343,6 +346,8 @@ class BookieNettyServer {
jvmBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(),
conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax()));
+ jvmBootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
+ conf.getServerWriteBufferLowWaterMark(), conf.getServerWriteBufferHighWaterMark()));
if (jvmEventLoopGroup instanceof DefaultEventLoopGroup) {
jvmBootstrap.channel(LocalServerChannel.class);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 2aebbb9..becfe68 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -22,6 +22,9 @@ package org.apache.bookkeeper.proto;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_IN_PROGRESS;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER;
@@ -29,9 +32,12 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER_RE
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED_WAIT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_READ;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_REQUEST;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_IN_PROGRESS;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_PRE_WAIT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_READ;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_REQUEST;
@@ -45,6 +51,9 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC_REQUEST;
import static org.apache.bookkeeper.proto.RequestUtils.hasFlag;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
@@ -54,19 +63,25 @@ import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
+import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.auth.AuthToken;
import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.tls.SecurityException;
@@ -88,6 +103,7 @@ public class BookieRequestProcessor implements RequestProcessor {
* worker threads.
*/
private final ServerConfiguration serverCfg;
+ private final long waitTimeoutOnBackpressureMillis;
/**
* This is the Bookie instance that is used to handle all read and write requests.
@@ -150,10 +166,27 @@ public class BookieRequestProcessor implements RequestProcessor {
final OpStatsLogger getBookieInfoRequestStats;
final OpStatsLogger getBookieInfoStats;
final OpStatsLogger channelWriteStats;
+ final OpStatsLogger addEntryBlockedStats;
+ final OpStatsLogger readEntryBlockedStats;
+
+ final AtomicInteger addsInProgress = new AtomicInteger(0);
+ final AtomicInteger maxAddsInProgress = new AtomicInteger(0);
+ final AtomicInteger addsBlocked = new AtomicInteger(0);
+ final AtomicInteger readsInProgress = new AtomicInteger(0);
+ final AtomicInteger readsBlocked = new AtomicInteger(0);
+ final AtomicInteger maxReadsInProgress = new AtomicInteger(0);
+
+ final Semaphore addsSemaphore;
+ final Semaphore readsSemaphore;
+
+ // to temporary blacklist channels
+ final Optional<Cache<Channel, Boolean>> blacklistedChannels;
+ final Consumer<Channel> onResponseTimeout;
public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
StatsLogger statsLogger, SecurityHandlerFactory shFactory) throws SecurityException {
this.serverCfg = serverCfg;
+ this.waitTimeoutOnBackpressureMillis = serverCfg.getWaitTimeoutOnResponseBackpressureMillis();
this.bookie = bookie;
this.readThreadPool = createExecutor(
this.serverCfg.getNumReadWorkerThreads(),
@@ -190,6 +223,25 @@ public class BookieRequestProcessor implements RequestProcessor {
shFactory.init(NodeType.Server, serverCfg);
}
+ if (waitTimeoutOnBackpressureMillis > 0) {
+ blacklistedChannels = Optional.of(CacheBuilder.newBuilder()
+ .expireAfterWrite(waitTimeoutOnBackpressureMillis, TimeUnit.MILLISECONDS)
+ .build());
+ } else {
+ blacklistedChannels = Optional.empty();
+ }
+
+ if (serverCfg.getCloseChannelOnResponseTimeout()) {
+ onResponseTimeout = (ch) -> {
+ LOG.warn("closing channel {} because it was non-writable for longer than {} ms",
+ ch, waitTimeoutOnBackpressureMillis);
+ ch.close();
+ };
+ } else {
+ // noop
+ onResponseTimeout = (ch) -> {};
+ }
+
// Expose Stats
this.statsEnabled = serverCfg.isStatisticsEnabled();
this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
@@ -214,6 +266,126 @@ public class BookieRequestProcessor implements RequestProcessor {
this.getBookieInfoStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO);
this.getBookieInfoRequestStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO_REQUEST);
this.channelWriteStats = statsLogger.getOpStatsLogger(CHANNEL_WRITE);
+
+ this.addEntryBlockedStats = statsLogger.getOpStatsLogger(ADD_ENTRY_BLOCKED_WAIT);
+ this.readEntryBlockedStats = statsLogger.getOpStatsLogger(READ_ENTRY_BLOCKED_WAIT);
+
+ int maxAdds = serverCfg.getMaxAddsInProgressLimit();
+ addsSemaphore = maxAdds > 0 ? new Semaphore(maxAdds, true) : null;
+
+ int maxReads = serverCfg.getMaxReadsInProgressLimit();
+ readsSemaphore = maxReads > 0 ? new Semaphore(maxReads, true) : null;
+
+ statsLogger.registerGauge(ADD_ENTRY_IN_PROGRESS, new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return addsInProgress;
+ }
+ });
+
+ statsLogger.registerGauge(ADD_ENTRY_BLOCKED, new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return addsBlocked;
+ }
+ });
+
+ statsLogger.registerGauge(READ_ENTRY_IN_PROGRESS, new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return readsInProgress;
+ }
+ });
+
+ statsLogger.registerGauge(READ_ENTRY_BLOCKED, new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return readsBlocked;
+ }
+ });
+
+ }
+
+ protected void onAddRequestStart(Channel channel) {
+ if (addsSemaphore != null) {
+ if (!addsSemaphore.tryAcquire()) {
+ final long throttlingStartTimeNanos = MathUtils.nowInNano();
+ channel.config().setAutoRead(false);
+ LOG.info("Too many add requests in progress, disabling autoread on channel {}", channel);
+ addsBlocked.incrementAndGet();
+ addsSemaphore.acquireUninterruptibly();
+ channel.config().setAutoRead(true);
+ final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos);
+ LOG.info("Re-enabled autoread on channel {} after AddRequest delay of {} nanos", channel, delayNanos);
+ addEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS);
+ addsBlocked.decrementAndGet();
+ }
+ }
+ final int curr = addsInProgress.incrementAndGet();
+ maxAddsInProgress.accumulateAndGet(curr, Integer::max);
+ }
+
+ protected void onAddRequestFinish() {
+ addsInProgress.decrementAndGet();
+ if (addsSemaphore != null) {
+ addsSemaphore.release();
+ }
+ }
+
+ protected void onReadRequestStart(Channel channel) {
+ if (readsSemaphore != null) {
+ if (!readsSemaphore.tryAcquire()) {
+ final long throttlingStartTimeNanos = MathUtils.nowInNano();
+ channel.config().setAutoRead(false);
+ LOG.info("Too many read requests in progress, disabling autoread on channel {}", channel);
+ readsBlocked.incrementAndGet();
+ readsSemaphore.acquireUninterruptibly();
+ channel.config().setAutoRead(true);
+ final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos);
+ LOG.info("Re-enabled autoread on channel {} after ReadRequest delay of {} nanos", channel, delayNanos);
+ readEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS);
+ readsBlocked.decrementAndGet();
+ }
+ }
+ final int curr = readsInProgress.incrementAndGet();
+ maxReadsInProgress.accumulateAndGet(curr, Integer::max);
+ }
+
+ protected void onReadRequestFinish() {
+ readsInProgress.decrementAndGet();
+ if (readsSemaphore != null) {
+ readsSemaphore.release();
+ }
+ }
+
+ @VisibleForTesting
+ int maxAddsInProgressCount() {
+ return maxAddsInProgress.get();
+ }
+
+ @VisibleForTesting
+ int maxReadsInProgressCount() {
+ return maxReadsInProgress.get();
}
@Override
@@ -572,4 +744,28 @@ public class BookieRequestProcessor implements RequestProcessor {
}
}
}
+
+ public long getWaitTimeoutOnBackpressureMillis() {
+ return waitTimeoutOnBackpressureMillis;
+ }
+
+ public void blacklistChannel(Channel channel) {
+ blacklistedChannels
+ .ifPresent(x -> x.put(channel, true));
+ }
+
+ public void invalidateBlacklist(Channel channel) {
+ blacklistedChannels
+ .ifPresent(x -> x.invalidate(channel));
+ }
+
+ public boolean isBlacklisted(Channel channel) {
+ return blacklistedChannels
+ .map(x -> x.getIfPresent(channel))
+ .orElse(false);
+ }
+
+ public void handleNonWritableChannel(Channel channel) {
+ onResponseTimeout.accept(channel);
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index c0214d3..d386b91 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -29,6 +29,8 @@ import java.io.IOException;
import java.net.UnknownHostException;
import java.security.AccessControlException;
import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieCriticalThread;
import org.apache.bookkeeper.bookie.BookieException;
@@ -127,6 +129,10 @@ public class BookieServer {
running = true;
deathWatcher = new DeathWatcher(conf);
deathWatcher.start();
+
+ // fixes test flappers at random places until ISSUE#1400 is resolved
+ // https://github.com/apache/bookkeeper/issues/1400
+ TimeUnit.MILLISECONDS.sleep(250);
}
@VisibleForTesting
@@ -139,6 +145,11 @@ public class BookieServer {
return bookie;
}
+ @VisibleForTesting
+ public BookieRequestProcessor getBookieRequestProcessor() {
+ return (BookieRequestProcessor) requestProcessor;
+ }
+
/**
* Suspend processing of requests in the bookie (for testing).
*/
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
index fc55e4a..d964957 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
@@ -54,7 +54,7 @@ public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements R
if (!isVersionCompatible()) {
getBookieInfoResponse.setStatus(StatusCode.EBADVERSION);
- requestProcessor.getBookieInfoStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+ requestProcessor.getGetBookieInfoStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
return getBookieInfoResponse.build();
}
@@ -66,11 +66,11 @@ public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements R
long freeDiskSpace = 0L, totalDiskSpace = 0L;
try {
if ((requested & GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE) != 0) {
- freeDiskSpace = requestProcessor.bookie.getTotalFreeSpace();
+ freeDiskSpace = requestProcessor.getBookie().getTotalFreeSpace();
getBookieInfoResponse.setFreeDiskSpace(freeDiskSpace);
}
if ((requested & GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE) != 0) {
- totalDiskSpace = requestProcessor.bookie.getTotalDiskSpace();
+ totalDiskSpace = requestProcessor.getBookie().getTotalDiskSpace();
getBookieInfoResponse.setTotalDiskCapacity(totalDiskSpace);
}
LOG.debug("FreeDiskSpace info is " + freeDiskSpace + " totalDiskSpace is: " + totalDiskSpace);
@@ -80,7 +80,7 @@ public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements R
}
getBookieInfoResponse.setStatus(status);
- requestProcessor.getBookieInfoStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
+ requestProcessor.getGetBookieInfoStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
return getBookieInfoResponse.build();
}
@@ -98,6 +98,6 @@ public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements R
.setGetBookieInfoResponse(getBookieInfoResponse);
sendResponse(response.getStatus(),
response.build(),
- requestProcessor.getBookieInfoRequestStats);
+ requestProcessor.getGetBookieInfoRequestStats());
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index 9cd9fd5..7dc29a3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -32,6 +32,7 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.bookkeeper.util.StringUtils;
/**
* A base class for bookkeeper protocol v3 packet processors.
@@ -53,6 +54,40 @@ public abstract class PacketProcessorBaseV3 extends SafeRunnable {
protected void sendResponse(StatusCode code, Object response, OpStatsLogger statsLogger) {
final long writeNanos = MathUtils.nowInNano();
+
+ final long timeOut = requestProcessor.getWaitTimeoutOnBackpressureMillis();
+ if (timeOut >= 0 && !channel.isWritable()) {
+ if (!requestProcessor.isBlacklisted(channel)) {
+ synchronized (channel) {
+ if (!channel.isWritable() && !requestProcessor.isBlacklisted(channel)) {
+ final long waitUntilNanos = writeNanos + TimeUnit.MILLISECONDS.toNanos(timeOut);
+ while (!channel.isWritable() && MathUtils.nowInNano() < waitUntilNanos) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ if (!channel.isWritable()) {
+ requestProcessor.blacklistChannel(channel);
+ requestProcessor.handleNonWritableChannel(channel);
+ }
+ }
+ }
+ }
+
+ if (!channel.isWritable()) {
+ LOGGER.warn("cannot write response to non-writable channel {} for request {}", channel,
+ StringUtils.requestToString(request));
+ requestProcessor.getChannelWriteStats()
+ .registerFailedEvent(MathUtils.elapsedNanos(writeNanos), TimeUnit.NANOSECONDS);
+ statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
+ return;
+ } else {
+ requestProcessor.invalidateBlacklist(channel);
+ }
+ }
+
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@@ -71,7 +106,6 @@ public abstract class PacketProcessorBaseV3 extends SafeRunnable {
}
}
});
-
}
protected boolean isVersionCompatible() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 4d69acf..39436d4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -126,6 +126,7 @@ import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -939,7 +940,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
final Object request,
final boolean allowFastFail) {
if (channel == null) {
- LOG.warn("Operation {} failed: channel == null", requestToString(request));
+ LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request));
errorOut(key);
return;
}
@@ -952,7 +953,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
if (allowFastFail && !isWritable) {
LOG.warn("Operation {} failed: TooManyRequestsException",
- requestToString(request));
+ StringUtils.requestToString(request));
errorOut(key, BKException.Code.TooManyRequestsException);
return;
@@ -975,22 +976,11 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
channel.writeAndFlush(request, promise);
} catch (Throwable e) {
- LOG.warn("Operation {} failed", requestToString(request), e);
+ LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
errorOut(key);
}
}
- private static String requestToString(Object request) {
- if (request instanceof BookkeeperProtocol.Request) {
- BookkeeperProtocol.BKPacketHeader header = ((BookkeeperProtocol.Request) request).getHeader();
- return String.format("Req(txnId=%d,op=%s,version=%s)",
- header.getTxnId(), header.getOperation(),
- header.getVersion());
- } else {
- return request.toString();
- }
- }
-
void errorOut(final CompletionKey key) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing completion key: {}", key);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index 3d2a1ea..12c0201 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -65,6 +65,8 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
BookieRequestProcessor requestProcessor,
ExecutorService fenceThreadPool) {
super(request, channel, requestProcessor);
+ requestProcessor.onReadRequestStart(channel);
+
this.readRequest = request.getReadRequest();
this.ledgerId = readRequest.getLedgerId();
this.entryId = readRequest.getEntryId();
@@ -313,6 +315,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
sendResponse(response.getStatus(),
response.build(),
reqStats);
+ requestProcessor.onReadRequestFinish();
}
//
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index fe68da4..7747e5c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +48,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
public WriteEntryProcessorV3(Request request, Channel channel,
BookieRequestProcessor requestProcessor) {
super(request, channel, requestProcessor);
+ requestProcessor.onAddRequestStart(channel);
}
// Returns null if there is no exception thrown
@@ -173,6 +175,12 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
}
}
+ @Override
+ protected void sendResponse(StatusCode code, Object response, OpStatsLogger statsLogger) {
+ super.sendResponse(code, response, statsLogger);
+ requestProcessor.onAddRequestFinish();
+ }
+
/**
* this toString method filters out body and masterKey from the output.
* masterKey contains the password of the ledger and body is customer data,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
index f55edd1..73bf018 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
@@ -20,6 +20,8 @@ package org.apache.bookkeeper.util;
import java.io.IOException;
+import org.apache.bookkeeper.proto.BookkeeperProtocol;
+
/**
* Provided utilites for parsing network addresses, ledger-id from node paths
* etc.
@@ -161,4 +163,20 @@ public class StringUtils {
}
}
+ /**
+ * Builds string representation of teh request without extra (i.e. binary) data
+ *
+ * @param request
+ * @return string representation of request
+ */
+ public static String requestToString(Object request) {
+ if (request instanceof BookkeeperProtocol.Request) {
+ BookkeeperProtocol.BKPacketHeader header = ((BookkeeperProtocol.Request) request).getHeader();
+ return String.format("Req(txnId=%d,op=%s,version=%s)",
+ header.getTxnId(), header.getOperation(),
+ header.getVersion());
+ } else {
+ return request.toString();
+ }
+ }
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index f5e73a1..fbfe7c9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -303,7 +303,8 @@ public class EntryLogTest {
Bookie bookie = new Bookie(conf);
EntryLogger entryLogger = new EntryLogger(conf,
bookie.getLedgerDirsManager());
- InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage);
+ InterleavedLedgerStorage ledgerStorage =
+ ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage());
ledgerStorage.entryLogger = entryLogger;
// Create ledgers
ledgerStorage.setMasterKey(1, "key".getBytes());
@@ -675,7 +676,7 @@ public class EntryLogTest {
conf.setLedgerStorageClass(ledgerStorageClass);
conf.setEntryLogPerLedgerEnabled(entryLogPerLedgerEnabled);
Bookie bookie = new Bookie(conf);
- InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage);
+ CompactableLedgerStorage ledgerStorage = (CompactableLedgerStorage) bookie.ledgerStorage;
Random rand = new Random(0);
if (ledgerStorageClass.equals(SortedLedgerStorage.class.getName())) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index ae3b4cd..1a01299 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -89,7 +89,7 @@ public class LedgerCacheTest {
bookie = new Bookie(conf);
activeLedgers = new SnapshotMap<Long, Boolean>();
- ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage).ledgerCache;
+ ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage()).ledgerCache;
}
@After
@@ -116,8 +116,8 @@ public class LedgerCacheTest {
if (ledgerCache != null) {
ledgerCache.close();
}
- ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage).ledgerCache = new LedgerCacheImpl(
- conf, activeLedgers, bookie.getIndexDirsManager());
+ ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage())
+ .ledgerCache = new LedgerCacheImpl(conf, activeLedgers, bookie.getIndexDirsManager());
flushThread = new Thread() {
public void run() {
while (true) {
@@ -275,7 +275,8 @@ public class LedgerCacheTest {
conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(), ledgerDir2.getAbsolutePath() });
Bookie bookie = new Bookie(conf);
- InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage);
+ InterleavedLedgerStorage ledgerStorage =
+ ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage());
LedgerCacheImpl ledgerCache = (LedgerCacheImpl) ledgerStorage.ledgerCache;
// Create ledger index file
ledgerStorage.setMasterKey(1, "key".getBytes());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
new file mode 100644
index 0000000..c3a8f63
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
@@ -0,0 +1,140 @@
+package org.apache.bookkeeper.bookie;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Strictly for testing.
+ * have to be in org.apache.bookkeeper.bookie to not introduce changes to InterleavedLedgerStorage
+ */
+public class SlowInterleavedLedgerStorage extends InterleavedLedgerStorage {
+
+ public static final String PROP_SLOW_STORAGE_FLUSH_DELAY = "test.slowStorage.flushDelay";
+ public static final String PROP_SLOW_STORAGE_ADD_DELAY = "test.slowStorage.addDelay";
+ public static final String PROP_SLOW_STORAGE_GET_DELAY = "test.slowStorage.getDelay";
+
+ /**
+ * Strictly for testing.
+ */
+ public static class SlowEntryLogger extends EntryLogger {
+ public volatile long getDelay = 0;
+ public volatile long addDelay = 0;
+ public volatile long flushDelay = 0;
+
+ public SlowEntryLogger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, EntryLogListener listener)
+ throws IOException {
+ super(conf, ledgerDirsManager, listener);
+ }
+
+ public SlowEntryLogger setAddDelay(long delay) {
+ addDelay = delay;
+ return this;
+ }
+
+ public SlowEntryLogger setGetDelay(long delay) {
+ getDelay = delay;
+ return this;
+ }
+
+ public SlowEntryLogger setFlushDelay(long delay) {
+ flushDelay = delay;
+ return this;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ delayMs(flushDelay);
+ super.flush();
+ }
+
+ @Override
+ public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+ delayMs(addDelay);
+ return super.addEntry(ledger, entry, rollLog);
+ }
+
+ @Override
+ public ByteBuf readEntry(long ledgerId, long entryId, long location)
+ throws IOException, Bookie.NoEntryException {
+ delayMs(getDelay);
+ return super.readEntry(ledgerId, entryId, location);
+ }
+
+ private static void delayMs(long delay) {
+ if (delay < 1) {
+ return;
+ }
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ //noop
+ }
+ }
+
+ }
+
+ public SlowInterleavedLedgerStorage() {
+ super();
+ }
+
+ @Override
+ public void initialize(ServerConfiguration conf,
+ LedgerManager ledgerManager,
+ LedgerDirsManager ledgerDirsManager,
+ LedgerDirsManager indexDirsManager,
+ StateManager stateManager,
+ CheckpointSource checkpointSource,
+ Checkpointer checkpointer,
+ StatsLogger statsLogger)
+ throws IOException {
+ super.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
+ stateManager, checkpointSource, checkpointer, statsLogger);
+ // do not want to add these to config class, reading throw "raw" interface
+ long getDelay = conf.getLong(PROP_SLOW_STORAGE_GET_DELAY, 0);
+ long addDelay = conf.getLong(PROP_SLOW_STORAGE_ADD_DELAY, 0);
+ long flushDelay = conf.getLong(PROP_SLOW_STORAGE_FLUSH_DELAY, 0);
+
+ entryLogger = new SlowEntryLogger(conf, ledgerDirsManager, this)
+ .setAddDelay(addDelay)
+ .setGetDelay(getDelay)
+ .setFlushDelay(flushDelay);
+ }
+
+ public void setAddDelay(long delay) {
+ ((SlowEntryLogger) entryLogger).setAddDelay(delay);
+ }
+
+ public void setGetDelay(long delay) {
+ ((SlowEntryLogger) entryLogger).setGetDelay(delay);
+ }
+
+ public void setFlushDelay(long delay) {
+ ((SlowEntryLogger) entryLogger).setFlushDelay(delay);
+ }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowSortedLedgerStorage.java
similarity index 61%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
copy to bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowSortedLedgerStorage.java
index 2c8cf7a..e12976d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowSortedLedgerStorage.java
@@ -1,4 +1,6 @@
-/**
+package org.apache.bookkeeper.bookie;
+
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,28 +20,17 @@
* under the License.
*
*/
-package org.apache.bookkeeper.processor;
-
-import io.netty.channel.Channel;
/**
- * A request processor that is used for processing requests at bookie side.
+ * Strictly for unit testing.
*/
-public interface RequestProcessor extends AutoCloseable {
-
- /**
- * Close the request processor.
- */
- void close();
+public class SlowSortedLedgerStorage extends SortedLedgerStorage {
- /**
- * Process request.
- *
- * @param r
- * request to process
- * @param channel
- * channel received the given request <i>r</i>
- */
- void processRequest(Object r, Channel channel);
+ public SlowSortedLedgerStorage() {
+ this(new SlowInterleavedLedgerStorage());
+ }
+ SlowSortedLedgerStorage(InterleavedLedgerStorage ils) {
+ super(ils);
+ }
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index f3ef108..322cdd0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -225,7 +225,7 @@ public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) storage.getEntryLogger()
.getEntryLogManager();
entryLogManager.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
- long leastUnflushedLogId = storage.entryLogger.getLeastUnflushedLogId();
+ long leastUnflushedLogId = storage.getEntryLogger().getLeastUnflushedLogId();
long currentLogId = entryLogManager.getCurrentLogId();
log.info("Least unflushed entry log : current = {}, leastUnflushed = {}", currentLogId, leastUnflushedLogId);
@@ -244,8 +244,8 @@ public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
assertEquals(0, storage.memTable.kvmap.size());
assertTrue(
"current log " + currentLogId + " contains entries added from memtable should be forced to disk"
- + " but least unflushed log is " + storage.entryLogger.getLeastUnflushedLogId(),
- storage.entryLogger.getLeastUnflushedLogId() > currentLogId);
+ + " but least unflushed log is " + storage.getEntryLogger().getLeastUnflushedLogId(),
+ storage.getEntryLogger().getLeastUnflushedLogId() > currentLogId);
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
new file mode 100644
index 0000000..5454a70
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
@@ -0,0 +1,469 @@
+package org.apache.bookkeeper.proto;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.nio.channels.FileChannel;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.Journal;
+import org.apache.bookkeeper.bookie.SlowBufferedChannel;
+import org.apache.bookkeeper.bookie.SlowInterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.SlowSortedLedgerStorage;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Tests for backpressure handling on the server side.
+ */
+// PowerMock usage is problematic here due to https://github.com/powermock/powermock/issues/822
+public class BookieBackpressureTest extends BookKeeperClusterTestCase
+ implements AddCallback, ReadCallback, ReadLastConfirmedCallback {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BookieBackpressureTest.class);
+
+ byte[] ledgerPassword = "aaa".getBytes();
+
+ final byte[] data = new byte[8 * 1024];
+
+ // test related constants
+ static final int NUM_ENTRIES_TO_WRITE = 200;
+ static final int ENTRIES_IN_MEMTABLE = 2;
+ static final int MAX_PENDING = 2 * ENTRIES_IN_MEMTABLE + 1;
+ static final int NUM_OF_LEDGERS = 2 * MAX_PENDING;
+
+ DigestType digestType;
+
+ long getDelay;
+ long addDelay;
+ long flushDelay;
+
+ public BookieBackpressureTest() {
+ super(1);
+ this.digestType = DigestType.CRC32;
+
+ baseClientConf.setAddEntryTimeout(100);
+ baseClientConf.setAddEntryQuorumTimeout(100);
+ baseClientConf.setReadEntryTimeout(100);
+ }
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ getDelay = 0;
+ addDelay = 0;
+ flushDelay = 0;
+ }
+
+ class SyncObj {
+ long lastConfirmed;
+ volatile int counter;
+ boolean value;
+ AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
+ Enumeration<LedgerEntry> ls = null;
+
+ public SyncObj() {
+ counter = 0;
+ lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
+ value = false;
+ }
+
+ void setReturnCode(int rc) {
+ this.rc.compareAndSet(BKException.Code.OK, rc);
+ }
+
+ void setLedgerEntries(Enumeration<LedgerEntry> ls) {
+ this.ls = ls;
+ }
+ }
+
+ private void mockJournal(Bookie bookie, long getDelay, long addDelay, long flushDelay) throws Exception {
+ if (getDelay <= 0 && addDelay <= 0 && flushDelay <= 0) {
+ return;
+ }
+
+ List<Journal> journals = getJournals(bookie);
+ for (int i = 0; i < journals.size(); i++) {
+ Journal mock = spy(journals.get(i));
+ when(mock.getBufferedChannelBuilder()).thenReturn((FileChannel fc, int capacity) -> {
+ SlowBufferedChannel sbc = new SlowBufferedChannel(fc, capacity);
+ sbc.setAddDelay(addDelay);
+ sbc.setGetDelay(getDelay);
+ sbc.setFlushDelay(flushDelay);
+ return sbc;
+ });
+
+ journals.set(i, mock);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<Journal> getJournals(Bookie bookie) throws NoSuchFieldException, IllegalAccessException {
+ Field f = bookie.getClass().getDeclaredField("journals");
+ f.setAccessible(true);
+
+ return (List<Journal>) f.get(bookie);
+ }
+
+ @Test
+ public void testWriteNoBackpressureSlowJournal() throws Exception {
+ //disable backpressure for writes
+ bsConfs.get(0).setMaxAddsInProgressLimit(0);
+ addDelay = 1;
+
+ doWritesNoBackpressure(0);
+ }
+
+ @Test
+ public void testWriteNoBackpressureSlowJournalFlush() throws Exception {
+ //disable backpressure for writes
+ bsConfs.get(0).setMaxAddsInProgressLimit(0);
+ // to increase frequency of flushes
+ bsConfs.get(0).setJournalAdaptiveGroupWrites(false);
+ flushDelay = 1;
+
+ doWritesNoBackpressure(0);
+ }
+
+ @Test
+ public void testWriteWithBackpressureSlowJournal() throws Exception {
+ //enable backpressure with MAX_PENDING writes in progress
+ bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
+ flushDelay = 1;
+
+ doWritesWithBackpressure(0);
+ }
+
+
+ @Test
+ public void testWriteWithBackpressureSlowJournalFlush() throws Exception {
+ //enable backpressure with MAX_PENDING writes in progress
+ bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
+ // to increase frequency of flushes
+ bsConfs.get(0).setJournalAdaptiveGroupWrites(false);
+ flushDelay = 1;
+
+ doWritesWithBackpressure(0);
+ }
+
+ @Test
+ public void testWriteNoBackpressureSlowInterleavedStorage() throws Exception {
+ //disable backpressure for writes
+ bsConfs.get(0).setMaxAddsInProgressLimit(0);
+ bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+ bsConfs.get(0).setWriteBufferBytes(data.length);
+
+ bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
+
+ doWritesNoBackpressure(0);
+ }
+
+ @Test
+ public void testWriteWithBackpressureSlowInterleavedStorage() throws Exception {
+ //enable backpressure with MAX_PENDING writes in progress
+ bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
+ bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+ bsConfs.get(0).setWriteBufferBytes(data.length);
+
+ bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
+
+ doWritesWithBackpressure(0);
+ }
+
+ @Test
+ public void testWriteNoBackpressureSlowInterleavedStorageFlush() throws Exception {
+ //disable backpressure for writes
+ bsConfs.get(0).setMaxAddsInProgressLimit(0);
+ bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+ bsConfs.get(0).setWriteBufferBytes(data.length);
+
+ bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
+
+ doWritesNoBackpressure(0);
+ }
+
+ @Test
+ public void testWriteWithBackpressureSlowInterleavedStorageFlush() throws Exception {
+ //enable backpressure with MAX_PENDING writes in progress
+ bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
+ bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+ bsConfs.get(0).setWriteBufferBytes(data.length);
+
+ bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
+
+ doWritesWithBackpressure(0);
+ }
+
+ @Test
+ public void testWriteNoBackpressureSortedStorage() throws Exception {
+ //disable backpressure for writes
+ bsConfs.get(0).setMaxAddsInProgressLimit(0);
+ bsConfs.get(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName());
+ bsConfs.get(0).setWriteBufferBytes(data.length);
+
+ // one for memtable being flushed, one for the part accepting the data
+ assertTrue("for the test, memtable should not keep more entries than allowed",
+ ENTRIES_IN_MEMTABLE * 2 <= MAX_PENDING);
+ bsConfs.get(0).setSkipListSizeLimit(data.length * ENTRIES_IN_MEMTABLE - 1);
+ bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
+ bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
+
+ doWritesNoBackpressure(0);
+ }
+
+ @Test
+ public void testWriteWithBackpressureSortedStorage() throws Exception {
+ //enable backpressure with MAX_PENDING writes in progress
+ bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
+ bsConfs.get(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName());
+ bsConfs.get(0).setWriteBufferBytes(data.length);
+
+ // one for memtable being flushed, one for the part accepting the data
+ assertTrue("for the test, memtable should not keep more entries than allowed",
+ ENTRIES_IN_MEMTABLE * 2 <= MAX_PENDING);
+ bsConfs.get(0).setSkipListSizeLimit(data.length * ENTRIES_IN_MEMTABLE - 1);
+ bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
+ bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
+
+ doWritesWithBackpressure(0);
+ }
+
+ @Test
+ public void testReadsNoBackpressure() throws Exception {
+ //disable backpressure for reads
+ bsConfs.get(0).setMaxReadsInProgressLimit(0);
+ bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+ bsConfs.get(0).setWriteBufferBytes(data.length);
+
+ bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_GET_DELAY, "1");
+
+ final BookieRequestProcessor brp = generateDataAndDoReads(0);
+
+ Assert.assertThat("reads in progress should exceed MAX_PENDING",
+ brp.maxReadsInProgressCount(), Matchers.greaterThan(MAX_PENDING));
+ }
+
+ @Test
+ public void testReadsWithBackpressure() throws Exception {
+ //enable backpressure for reads
+ bsConfs.get(0).setMaxReadsInProgressLimit(MAX_PENDING);
+ bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+ bsConfs.get(0).setWriteBufferBytes(data.length);
+
+ bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_GET_DELAY, "1");
+
+ final BookieRequestProcessor brp = generateDataAndDoReads(0);
+
+ Assert.assertThat("reads in progress should NOT exceed MAX_PENDING ",
+ brp.maxReadsInProgressCount(), Matchers.lessThanOrEqualTo(MAX_PENDING));
+ }
+
+ private BookieRequestProcessor generateDataAndDoReads(final int bkId) throws Exception {
+ BookieServer bks = bs.get(bkId);
+ bks.shutdown();
+ bks = new BookieServer(bsConfs.get(bkId));
+ mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+ bks.start();
+ bs.set(bkId, bks);
+
+ LOG.info("creating ledgers");
+ // Create ledgers
+ final int numEntriesForReads = 10;
+ LedgerHandle[] lhs = new LedgerHandle[NUM_OF_LEDGERS];
+ for (int i = 0; i < NUM_OF_LEDGERS; i++) {
+ lhs[i] = bkc.createLedger(1, 1, digestType, ledgerPassword);
+ LOG.info("created ledger ID: {}", lhs[i].getId());
+ }
+
+ LOG.info("generating data for reads");
+ final CountDownLatch writesCompleteLatch = new CountDownLatch(numEntriesForReads * NUM_OF_LEDGERS);
+ for (int i = 0; i < numEntriesForReads; i++) {
+ for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) {
+ lhs[ledger].asyncAddEntry(data, (rc2, lh, entryId, ctx) -> writesCompleteLatch.countDown(), null);
+ }
+ }
+ writesCompleteLatch.await();
+
+ LOG.info("issue bunch of async reads");
+ final CountDownLatch readsCompleteLatch = new CountDownLatch(numEntriesForReads * NUM_OF_LEDGERS);
+ for (int i = 0; i < numEntriesForReads; i++) {
+ for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) {
+ lhs[ledger].asyncReadEntries(i, i, (rc, lh, seq, ctx) -> readsCompleteLatch.countDown(), null);
+ }
+ }
+ readsCompleteLatch.await();
+ LOG.info("reads finished");
+
+ return bks.getBookieRequestProcessor();
+ }
+
+ // here we expect that backpressure is disabled and number of writes in progress
+ // will exceed the limit
+ private void doWritesNoBackpressure(final int bkId) throws Exception {
+ BookieServer bks = bs.get(bkId);
+ bks.shutdown();
+ bks = new BookieServer(bsConfs.get(bkId));
+ mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+ bks.start();
+ bs.set(bkId, bks);
+
+ LOG.info("Creating ledgers");
+ LedgerHandle[] lhs = new LedgerHandle[NUM_OF_LEDGERS];
+ for (int i = 0; i < NUM_OF_LEDGERS; i++) {
+ lhs[i] = bkc.createLedger(1, 1, digestType, ledgerPassword);
+ LOG.info("created ledger ID: {}", lhs[i].getId());
+ }
+
+ final CountDownLatch completeLatch = new CountDownLatch(NUM_ENTRIES_TO_WRITE * NUM_OF_LEDGERS);
+
+ LOG.info("submitting writes");
+ for (int i = 0; i < NUM_ENTRIES_TO_WRITE; i++) {
+ for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) {
+ lhs[ledger].asyncAddEntry(data, (rc2, lh, entryId, ctx) -> completeLatch.countDown(), null);
+ }
+ }
+
+ boolean exceededLimit = false;
+ BookieRequestProcessor brp = bks.getBookieRequestProcessor();
+ while (!completeLatch.await(1, TimeUnit.MILLISECONDS)) {
+ int val = brp.maxAddsInProgressCount();
+ if (val > MAX_PENDING) {
+ exceededLimit = true;
+ break;
+ }
+ LOG.info("Waiting until all writes succeeded or maxAddsInProgressCount {} > MAX_PENDING {}",
+ val, MAX_PENDING);
+ }
+
+ assertTrue("expected to exceed number of pending writes", exceededLimit);
+
+ for (int i = 0; i < NUM_OF_LEDGERS; i++) {
+ lhs[i].close();
+ }
+ }
+
+ // here we expect that backpressure is enabled and number of writes in progress
+ // will never exceed the limit
+ private void doWritesWithBackpressure(final int bkId) throws Exception {
+ BookieServer bks = bs.get(bkId);
+ bks.shutdown();
+ bks = new BookieServer(bsConfs.get(bkId));
+ mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+ bks.start();
+ bs.set(bkId, bks);
+
+ LOG.info("Creating ledgers");
+ LedgerHandle[] lhs = new LedgerHandle[NUM_OF_LEDGERS];
+ for (int i = 0; i < NUM_OF_LEDGERS; i++) {
+ lhs[i] = bkc.createLedger(1, 1, digestType, ledgerPassword);
+ LOG.info("created ledger ID: {}", lhs[i].getId());
+ }
+
+ final CountDownLatch completeLatch = new CountDownLatch(NUM_ENTRIES_TO_WRITE * NUM_OF_LEDGERS);
+ final AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
+
+ LOG.info("submitting writes");
+ for (int i = 0; i < NUM_ENTRIES_TO_WRITE; i++) {
+ for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) {
+ lhs[ledger].asyncAddEntry(data, (rc2, lh, entryId, ctx) -> {
+ rc.compareAndSet(BKException.Code.OK, rc2);
+ completeLatch.countDown();
+ }, null);
+ }
+ }
+
+ LOG.info("test submitted all writes");
+ BookieRequestProcessor brp = bks.getBookieRequestProcessor();
+ while (!completeLatch.await(1, TimeUnit.MILLISECONDS)) {
+ int val = brp.maxAddsInProgressCount();
+ assertTrue("writes in progress should not exceed limit, got " + val, val <= MAX_PENDING);
+ LOG.info("Waiting for all writes to succeed, left {} of {}",
+ completeLatch.getCount(), NUM_ENTRIES_TO_WRITE * NUM_OF_LEDGERS);
+ }
+
+ if (rc.get() != BKException.Code.OK) {
+ throw BKException.create(rc.get());
+ }
+
+ for (int i = 0; i < NUM_OF_LEDGERS; i++) {
+ lhs[i].close();
+ }
+ }
+
+
+ @Override
+ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+ SyncObj sync = (SyncObj) ctx;
+ sync.setReturnCode(rc);
+ synchronized (sync) {
+ sync.counter++;
+ sync.notify();
+ }
+ }
+
+ @Override
+ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+ SyncObj sync = (SyncObj) ctx;
+ sync.setLedgerEntries(seq);
+ sync.setReturnCode(rc);
+ synchronized (sync) {
+ sync.value = true;
+ sync.notify();
+ }
+ }
+
+ @Override
+ public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
+ SyncObj sync = (SyncObj) ctx;
+ sync.setReturnCode(rc);
+ synchronized (sync) {
+ sync.lastConfirmed = lastConfirmed;
+ sync.notify();
+ }
+ }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
index 90a51c8..37d4647 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
@@ -74,6 +74,7 @@ public class ForceLedgerProcessorV3Test {
bookie = mock(Bookie.class);
requestProcessor = mock(BookieRequestProcessor.class);
when(requestProcessor.getBookie()).thenReturn(bookie);
+ when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
when(requestProcessor.getForceLedgerStats())
.thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("force_ledger"));
when(requestProcessor.getForceLedgerRequestStats())
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
index 354fcaf..4ebe01c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
@@ -47,6 +47,8 @@ import org.junit.Test;
*/
public class TestBookieRequestProcessor {
+ final BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class);
+
@Test
public void testConstructLongPollThreads() throws Exception {
// long poll threads == read threads
@@ -134,7 +136,7 @@ public class TestBookieRequestProcessor {
.setBody(ByteString.copyFrom("entrydata".getBytes())).build();
Request request = Request.newBuilder().setHeader(header).setAddRequest(addRequest).build();
- WriteEntryProcessorV3 writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, null);
+ WriteEntryProcessorV3 writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, requestProcessor);
String toString = writeEntryProcessorV3.toString();
assertFalse("writeEntryProcessorV3's toString should have filtered out body", toString.contains("body"));
assertFalse("writeEntryProcessorV3's toString should have filtered out masterKey",
@@ -152,7 +154,7 @@ public class TestBookieRequestProcessor {
.setBody(ByteString.copyFrom("entrydata".getBytes())).setFlag(Flag.RECOVERY_ADD).setWriteFlags(0)
.build();
request = Request.newBuilder().setHeader(header).setAddRequest(addRequest).build();
- writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, null);
+ writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, requestProcessor);
toString = writeEntryProcessorV3.toString();
assertFalse("writeEntryProcessorV3's toString should have filtered out body", toString.contains("body"));
assertFalse("writeEntryProcessorV3's toString should have filtered out masterKey",
@@ -188,7 +190,7 @@ public class TestBookieRequestProcessor {
.setMasterKey(ByteString.copyFrom("masterKey".getBytes()))
.setBody(ByteString.copyFrom("entrydata".getBytes())).build();
request = Request.newBuilder().setHeader(header).setWriteLacRequest(writeLacRequest).build();
- WriteLacProcessorV3 writeLacProcessorV3 = new WriteLacProcessorV3(request, null, null);
+ WriteLacProcessorV3 writeLacProcessorV3 = new WriteLacProcessorV3(request, null, requestProcessor);
toString = writeLacProcessorV3.toString();
assertFalse("writeLacProcessorV3's toString should have filtered out body", toString.contains("body"));
assertFalse("writeLacProcessorV3's toString should have filtered out masterKey",
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
index 8f54ddb..df7b153 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
@@ -79,10 +79,13 @@ public class WriteEntryProcessorV3Test {
bookie = mock(Bookie.class);
requestProcessor = mock(BookieRequestProcessor.class);
when(requestProcessor.getBookie()).thenReturn(bookie);
+ when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
when(requestProcessor.getAddEntryStats())
.thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_entry"));
when(requestProcessor.getAddRequestStats())
.thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_requests"));
+ when(requestProcessor.getChannelWriteStats())
+ .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("CHANNEL_WRITE"));
processor = new WriteEntryProcessorV3(
request,
channel,
@@ -242,4 +245,41 @@ public class WriteEntryProcessorV3Test {
assertEquals(StatusCode.EOK, response.getStatus());
}
+ @Test
+ public void testWritesWithClientNotAcceptingReponses() throws Exception {
+ when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(5L);
+
+ doAnswer(invocationOnMock -> {
+ Channel ch = invocationOnMock.getArgument(0);
+ ch.close();
+ return null;
+ }).when(requestProcessor).handleNonWritableChannel(any());
+
+ when(channel.isWritable()).thenReturn(false);
+
+ when(bookie.isReadOnly()).thenReturn(false);
+ when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+ when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
+ doAnswer(invocationOnMock -> {
+ WriteCallback wc = invocationOnMock.getArgument(2);
+
+ wc.writeComplete(
+ 0,
+ request.getAddRequest().getLedgerId(),
+ request.getAddRequest().getEntryId(),
+ null,
+ null);
+ return null;
+ }).when(bookie).addEntry(
+ any(ByteBuf.class), eq(false), any(WriteCallback.class), same(channel), eq(new byte[0]));
+
+ processor.run();
+
+ verify(bookie, times(1))
+ .addEntry(any(ByteBuf.class), eq(false), any(WriteCallback.class), same(channel), eq(new byte[0]));
+ verify(requestProcessor, times(1)).handleNonWritableChannel(channel);
+ verify(channel, times(0)).writeAndFlush(any(Response.class));
+ verify(channel, times(1)).close();
+ }
+
}
diff --git a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
index 7bed0b7..83a39f4 100644
--- a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
+++ b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
@@ -62,6 +62,12 @@
<Bug pattern="EI_EXPOSE_REP" />
</Match>
<Match>
+ <!-- https://github.com/spotbugs/spotbugs/issues/259 -->
+ <Class name="org.apache.bookkeeper.bookie.InterleavedLedgerStorage" />
+ <Method name="getLastAddConfirmed" />
+ <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" />
+ </Match>
+ <Match>
<Class name="org.apache.bookkeeper.auth.AuthToken" />
<Method name="getData" />
<Bug pattern="EI_EXPOSE_REP" />
--
To stop receiving notification emails like this one, please contact
ayegorov@apache.org.