You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ay...@apache.org on 2018/04/27 03:12:50 UTC
[bookkeeper] branch master updated: ISSUE #1086 (@bug W-4146427@)
Client-side backpressure in netty (Fixes:
io.netty.util.internal.OutOfDirectMemoryError under continuous heavy load)
(#1088)
This is an automated email from the ASF dual-hosted git repository.
ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 9c3e8c9 ISSUE #1086 (@bug W-4146427@) Client-side backpressure in netty (Fixes: io.netty.util.internal.OutOfDirectMemoryError under continuous heavy load) (#1088)
9c3e8c9 is described below
commit 9c3e8c9f835781507ac833473abc39e821bdbf36
Author: Andrey Yegorov <dl...@users.noreply.github.com>
AuthorDate: Thu Apr 26 20:12:47 2018 -0700
ISSUE #1086 (@bug W-4146427@) Client-side backpressure in netty (Fixes: io.netty.util.internal.OutOfDirectMemoryError under continuous heavy load) (#1088)
* ISSUE #1086 (@bug W-4146427@) Client-side backpressure in netty (Fixes: io.netty.util.internal.OutOfDirectMemoryError under continuous heavy load)
---
.../bookkeeper/client/BookKeeperClientStats.java | 1 +
.../org/apache/bookkeeper/client/LedgerHandle.java | 120 +++++++++++++++++
.../apache/bookkeeper/client/LedgerHandleAdv.java | 5 +
.../org/apache/bookkeeper/client/PendingAddOp.java | 11 +-
.../apache/bookkeeper/client/PendingReadOp.java | 8 ++
.../bookkeeper/conf/ClientConfiguration.java | 33 ++++-
.../org/apache/bookkeeper/proto/BookieClient.java | 35 ++++-
.../proto/DefaultPerChannelBookieClientPool.java | 18 ++-
.../bookkeeper/proto/PerChannelBookieClient.java | 79 +++++++++--
.../proto/PerChannelBookieClientPool.java | 11 ++
.../bookkeeper/client/MockBookKeeperTestCase.java | 38 ++++--
.../apache/bookkeeper/client/SlowBookieTest.java | 150 ++++++++++++++++++++-
.../proto/TestPerChannelBookieClient.java | 3 +-
13 files changed, 478 insertions(+), 34 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 7d0fd5d..83e6421 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -71,6 +71,7 @@ public interface BookKeeperClientStats {
String CHANNEL_TIMEOUT_START_TLS_OP = "TIMEOUT_START_TLS";
String NETTY_EXCEPTION_CNT = "NETTY_EXCEPTION_CNT";
+ String CLIENT_CHANNEL_WRITE_WAIT = "CLIENT_CHANNEL_WRITE_WAIT";
String CLIENT_CONNECT_TIMER = "CLIENT_CONNECT_TIMER";
String ADD_OP_OUTSTANDING = "ADD_OP_OUTSTANDING";
String READ_OP_OUTSTANDING = "READ_OP_OUTSTANDING";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 3c53a03..f72ab85 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -72,6 +72,7 @@ import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
@@ -83,6 +84,7 @@ import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.proto.checksum.MacDigestManager;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.OrderedGenericCallback;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.commons.collections4.IteratorUtils;
@@ -114,6 +116,8 @@ public class LedgerHandle implements WriteHandle {
final EnumSet<WriteFlag> writeFlags;
ScheduledFuture<?> timeoutFuture = null;
+ final long waitForWriteSetMs;
+
/**
* Invalid entry id. This value is returned from methods which
* should return an entry id but there is no valid entry available.
@@ -135,6 +139,7 @@ public class LedgerHandle implements WriteHandle {
final Counter ensembleChangeCounter;
final Counter lacUpdateHitsCounter;
final Counter lacUpdateMissesCounter;
+ private final OpStatsLogger clientChannelWriteWaitStats;
// This empty master key is used when an empty password is provided which is the hash of an empty string
private static final byte[] emptyLedgerKey;
@@ -155,6 +160,7 @@ public class LedgerHandle implements WriteHandle {
this.enableParallelRecoveryRead = bk.getConf().getEnableParallelRecoveryRead();
this.recoveryReadBatchSize = bk.getConf().getRecoveryReadBatchSize();
this.writeFlags = writeFlags;
+ this.waitForWriteSetMs = bk.getConf().getWaitTimeoutOnBackpressureMillis();
if (metadata.isClosed()) {
lastAddConfirmed = lastAddPushed = metadata.getLastEntryId();
@@ -206,6 +212,8 @@ public class LedgerHandle implements WriteHandle {
ensembleChangeCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.ENSEMBLE_CHANGES);
lacUpdateHitsCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_HITS);
lacUpdateMissesCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_MISSES);
+ clientChannelWriteWaitStats = bk.getStatsLogger()
+ .getOpStatsLogger(BookKeeperClientStats.CLIENT_CHANNEL_WRITE_WAIT);
bk.getStatsLogger().registerGauge(BookKeeperClientStats.PENDING_ADDS,
new Gauge<Integer>() {
@Override
@@ -842,6 +850,27 @@ public class LedgerHandle implements WriteHandle {
boolean isRecoveryRead) {
PendingReadOp op = new PendingReadOp(this, bk.getScheduler(), firstEntry, lastEntry, isRecoveryRead);
if (!bk.isClosed()) {
+ // Waiting on the first one.
+ // This is not very helpful if there are multiple ensembles or if bookie goes into unresponsive
+ // state later after N requests sent.
+ // Unfortunately it seems that alternatives are:
+ // - send reads one-by-one (up to the app)
+ // - rework LedgerHandle to send requests one-by-one (maybe later, potential perf impact)
+ // - block worker pool (not good)
+ // Even with this implementation one should be more concerned about OOME when all read responses arrive
+ // or about overloading bookies with these requests then about submission of many small requests.
+ // Naturally one of the solutions would be to submit smaller batches and in this case
+ // current implementation will prevent next batch from starting when bookie is
+ // unresponsive thus helpful enough.
+ DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(firstEntry);
+ try {
+ if (!waitForWritable(ws, firstEntry, ws.size() - 1, waitForWriteSetMs)) {
+ op.allowFailFastOnUnwritableChannel();
+ }
+ } finally {
+ ws.recycle();
+ }
+
bk.getMainWorkerPool().executeOrdered(ledgerId, op);
} else {
op.future().completeExceptionally(BKException.create(ClientClosedException));
@@ -1066,6 +1095,86 @@ public class LedgerHandle implements WriteHandle {
doAsyncAddEntry(op);
}
+ private boolean isWritesetWritable(DistributionSchedule.WriteSet writeSet,
+ long key, int allowedNonWritableCount) {
+ if (allowedNonWritableCount < 0) {
+ allowedNonWritableCount = 0;
+ }
+
+ final int sz = writeSet.size();
+ final int requiredWritable = sz - allowedNonWritableCount;
+
+ int nonWritableCount = 0;
+ for (int i = 0; i < sz; i++) {
+ if (!bk.getBookieClient().isWritable(metadata.currentEnsemble.get(i), key)) {
+ nonWritableCount++;
+ if (nonWritableCount >= allowedNonWritableCount) {
+ return false;
+ }
+ } else {
+ final int knownWritable = i - nonWritableCount;
+ if (knownWritable >= requiredWritable) {
+ return true;
+ }
+ }
+ }
+ return true;
+ }
+
+ protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet, long key,
+ int allowedNonWritableCount, long durationMs) {
+ if (durationMs < 0) {
+ return true;
+ }
+
+ final long startTime = MathUtils.nowInNano();
+ boolean success = isWritesetWritable(writeSet, key, allowedNonWritableCount);
+
+ if (!success && durationMs > 0) {
+ int backoff = 1;
+ final int maxBackoff = 4;
+ final long deadline = startTime + TimeUnit.MILLISECONDS.toNanos(durationMs);
+
+ while (!isWritesetWritable(writeSet, key, allowedNonWritableCount)) {
+ if (MathUtils.nowInNano() < deadline) {
+ long maxSleep = MathUtils.elapsedMSec(startTime);
+ if (maxSleep < 0) {
+ maxSleep = 1;
+ }
+ long sleepMs = Math.min(backoff, maxSleep);
+
+ try {
+ TimeUnit.MILLISECONDS.sleep(sleepMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ success = isWritesetWritable(writeSet, key, allowedNonWritableCount);
+ break;
+ }
+ if (backoff <= maxBackoff) {
+ backoff++;
+ }
+ } else {
+ success = false;
+ break;
+ }
+ }
+ if (backoff > 1) {
+ LOG.info("Spent {} ms waiting for {} writable channels",
+ MathUtils.elapsedMSec(startTime),
+ writeSet.size() - allowedNonWritableCount);
+ }
+ }
+
+ if (success) {
+ clientChannelWriteWaitStats.registerSuccessfulEvent(
+ MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+ } else {
+ clientChannelWriteWaitStats.registerFailedEvent(
+ MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+ }
+ return success;
+ }
+
protected void doAsyncAddEntry(final PendingAddOp op) {
if (throttler != null) {
throttler.acquire();
@@ -1110,6 +1219,15 @@ public class LedgerHandle implements WriteHandle {
return;
}
+ DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(op.getEntryId());
+ try {
+ if (!waitForWritable(ws, op.getEntryId(), 0, waitForWriteSetMs)) {
+ op.allowFailFastOnUnwritableChannel();
+ }
+ } finally {
+ ws.recycle();
+ }
+
try {
bk.getMainWorkerPool().executeOrdered(ledgerId, op);
} catch (RejectedExecutionException e) {
@@ -1155,6 +1273,7 @@ public class LedgerHandle implements WriteHandle {
cb.readLastConfirmedComplete(BKException.Code.OK, lastEntryId, ctx);
return;
}
+
ReadLastConfirmedOp.LastConfirmedDataCallback innercb = new ReadLastConfirmedOp.LastConfirmedDataCallback() {
@Override
public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
@@ -1166,6 +1285,7 @@ public class LedgerHandle implements WriteHandle {
}
}
};
+
new ReadLastConfirmedOp(this, innercb).initiate();
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 0eaf0b5..f67e1d6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -237,6 +237,11 @@ public class LedgerHandleAdv extends LedgerHandle implements WriteAdvHandle {
return;
}
+ if (!waitForWritable(distributionSchedule.getWriteSet(op.getEntryId()),
+ op.getEntryId(), 0, waitForWriteSetMs)) {
+ op.allowFailFastOnUnwritableChannel();
+ }
+
try {
bk.getMainWorkerPool().executeOrdered(ledgerId, op);
} catch (RejectedExecutionException e) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index eb95766..61037f0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -80,6 +80,8 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
boolean callbackTriggered;
boolean hasRun;
+ boolean allowFailFast = false;
+
static PendingAddOp create(LedgerHandle lh, ByteBuf payload, AddCallbackWithLatency cb, Object ctx) {
PendingAddOp op = RECYCLER.get();
op.lh = lh;
@@ -100,6 +102,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
op.callbackTriggered = false;
op.hasRun = false;
op.requestTimeNanos = Long.MAX_VALUE;
+ op.allowFailFast = false;
op.qwcLatency = 0;
return op;
}
@@ -113,6 +116,11 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
return this;
}
+ PendingAddOp allowFailFastOnUnwritableChannel() {
+ allowFailFast = true;
+ return this;
+ }
+
void setEntryId(long entryId) {
this.entryId = entryId;
}
@@ -129,7 +137,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE;
lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey,
- entryId, toSend, this, bookieIndex, flags);
+ entryId, toSend, this, bookieIndex, flags, allowFailFast);
++pendingWriteRequests;
}
@@ -444,6 +452,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
pendingWriteRequests = 0;
callbackTriggered = false;
hasRun = false;
+ allowFailFast = false;
recyclerHandle.recycle(this);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index a41207d..3852078 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -81,6 +81,7 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
boolean parallelRead = false;
final AtomicBoolean complete = new AtomicBoolean(false);
+ boolean allowFailFast = false;
abstract class LedgerEntryRequest implements SpeculativeRequestExecutor, AutoCloseable {
@@ -391,6 +392,8 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
return null;
}
+ // ToDo: pick replica with writable PCBC. ISSUE #1239
+ // https://github.com/apache/bookkeeper/issues/1239
int replica = nextReplicaIndexToReadFrom;
int bookieIndex = writeSet.get(nextReplicaIndexToReadFrom);
nextReplicaIndexToReadFrom++;
@@ -473,6 +476,7 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
this.endEntryId = endEntryId;
this.scheduler = scheduler;
this.isRecoveryRead = isRecoveryRead;
+ this.allowFailFast = false;
numPendingEntries = endEntryId - startEntryId + 1;
requiredBookiesMissingEntryForRecovery = getLedgerMetadata().getWriteQuorumSize()
- getLedgerMetadata().getAckQuorumSize() + 1;
@@ -504,6 +508,10 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
return this;
}
+ void allowFailFastOnUnwritableChannel() {
+ allowFailFast = true;
+ }
+
public void submit() {
lh.bk.getMainWorkerPool().executeOrdered(lh.ledgerId, this);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 64fbbe6..7b64d1b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -128,6 +128,7 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
protected static final String PCBC_TIMEOUT_TIMER_NUM_TICKS = "pcbcTimeoutTimerNumTicks";
protected static final String TIMEOUT_TIMER_TICK_DURATION_MS = "timeoutTimerTickDurationMs";
protected static final String TIMEOUT_TIMER_NUM_TICKS = "timeoutTimerNumTicks";
+ protected static final String WAIT_TIMEOUT_ON_BACKPRESSURE = "waitTimeoutOnBackpressureMs";
// Bookie health check settings
protected static final String BOOKIE_HEALTH_CHECK_ENABLED = "bookieHealthCheckEnabled";
@@ -418,7 +419,7 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
* @return netty channel write buffer low water mark.
*/
public int getClientWriteBufferLowWaterMark() {
- return getInt(CLIENT_WRITEBUFFER_LOW_WATER_MARK, 32 * 1024);
+ return getInt(CLIENT_WRITEBUFFER_LOW_WATER_MARK, 384 * 1024);
}
/**
@@ -439,7 +440,7 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
* @return netty channel write buffer high water mark.
*/
public int getClientWriteBufferHighWaterMark() {
- return getInt(CLIENT_WRITEBUFFER_HIGH_WATER_MARK, 64 * 1024);
+ return getInt(CLIENT_WRITEBUFFER_HIGH_WATER_MARK, 512 * 1024);
}
/**
@@ -800,6 +801,34 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
}
/**
+ * Timeout controlling wait on request send in case of unresponsive bookie(s)
+ * (i.e. bookie in long GC etc.)
+ *
+ * @return timeout value
+ * negative value disables the feature
+ * 0 to allow request to fail immediately
+ * Default is -1 (disabled)
+ */
+ public long getWaitTimeoutOnBackpressureMillis() {
+ return getLong(WAIT_TIMEOUT_ON_BACKPRESSURE, -1);
+ }
+
+ /**
+ * Timeout controlling wait on request send in case of unresponsive bookie(s)
+ * (i.e. bookie in long GC etc.)
+ *
+ * @param value
+ * negative value disables the feature
+ * 0 to allow request to fail immediately
+ * Default is -1 (disabled)
+ * @return client configuration.
+ */
+ public ClientConfiguration setWaitTimeoutOnBackpressureMillis(long value) {
+ setProperty(WAIT_TIMEOUT_ON_BACKPRESSURE, value);
+ return this;
+ }
+
+ /**
* Get the number of worker threads. This is the number of
* worker threads used by bookkeeper client to submit operations.
*
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index c160b20..31c12fc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -149,6 +149,12 @@ public class BookieClient implements PerChannelBookieClientFactory {
return faultyBookies;
}
+ public boolean isWritable(BookieSocketAddress address, long key) {
+ final PerChannelBookieClientPool pcbcPool = lookupClient(address);
+ // if null, let the write initiate connect of fail with whatever error it produces
+ return pcbcPool == null || pcbcPool.isWritable(key);
+ }
+
@Override
public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool,
SecurityHandlerFactory shFactory) throws SecurityException {
@@ -242,6 +248,18 @@ public class BookieClient implements PerChannelBookieClientFactory {
final WriteCallback cb,
final Object ctx,
final int options) {
+ addEntry(addr, ledgerId, masterKey, entryId, toSend, cb, ctx, options, false);
+ }
+
+ public void addEntry(final BookieSocketAddress addr,
+ final long ledgerId,
+ final byte[] masterKey,
+ final long entryId,
+ final ByteBufList toSend,
+ final WriteCallback cb,
+ final Object ctx,
+ final int options,
+ final boolean allowFastFail) {
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
@@ -255,7 +273,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
client.obtain(ChannelReadyForAddEntryCallback.create(
this, toSend, ledgerId, entryId, addr,
- ctx, cb, options, masterKey),
+ ctx, cb, options, masterKey, allowFastFail),
ledgerId);
}
@@ -291,11 +309,12 @@ public class BookieClient implements PerChannelBookieClientFactory {
private WriteCallback cb;
private int options;
private byte[] masterKey;
+ private boolean allowFastFail;
static ChannelReadyForAddEntryCallback create(
BookieClient bookieClient, ByteBufList toSend, long ledgerId,
long entryId, BookieSocketAddress addr, Object ctx,
- WriteCallback cb, int options, byte[] masterKey) {
+ WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail) {
ChannelReadyForAddEntryCallback callback = RECYCLER.get();
callback.bookieClient = bookieClient;
callback.toSend = toSend;
@@ -306,6 +325,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
callback.cb = cb;
callback.options = options;
callback.masterKey = masterKey;
+ callback.allowFastFail = allowFastFail;
return callback;
}
@@ -316,7 +336,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
} else {
pcbc.addEntry(ledgerId, masterKey, entryId,
- toSend, cb, ctx, options);
+ toSend, cb, ctx, options, allowFastFail);
}
toSend.release();
@@ -346,6 +366,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
cb = null;
options = -1;
masterKey = null;
+ allowFastFail = false;
recyclerHandle.recycle(this);
}
@@ -382,6 +403,12 @@ public class BookieClient implements PerChannelBookieClientFactory {
public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey) {
+ readEntry(addr, ledgerId, entryId, cb, ctx, flags, masterKey, false);
+ }
+
+ public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
+ final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey,
+ final boolean allowFastFail) {
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
@@ -393,7 +420,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
if (rc != BKException.Code.OK) {
completeRead(rc, ledgerId, entryId, null, cb, ctx);
} else {
- pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey);
+ pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey, allowFastFail);
}
}, ledgerId);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
index e57b8a7..04471d5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
@@ -84,14 +84,22 @@ class DefaultPerChannelBookieClientPool implements PerChannelBookieClientPool,
}
}
- @Override
- public void obtain(GenericCallback<PerChannelBookieClient> callback, long key) {
+ private PerChannelBookieClient getClient(long key) {
if (1 == clients.length) {
- clients[0].connectIfNeededAndDoOp(callback);
- return;
+ return clients[0];
}
int idx = MathUtils.signSafeMod(key, clients.length);
- clients[idx].connectIfNeededAndDoOp(callback);
+ return clients[idx];
+ }
+
+ @Override
+ public void obtain(GenericCallback<PerChannelBookieClient> callback, long key) {
+ getClient(key).connectIfNeededAndDoOp(callback);
+ }
+
+ @Override
+ public boolean isWritable(long key) {
+ return getClient(key).isWritable();
}
@Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 10fc253..f6ebbfa 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -214,6 +214,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
private final ClientAuthProvider.Factory authProviderFactory;
private final ExtensionRegistry extRegistry;
private final SecurityHandlerFactory shFactory;
+ private volatile boolean isWritable = true;
public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup,
BookieSocketAddress addr) throws SecurityException {
@@ -337,7 +338,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
public void disconnect() {
Channel c = channel;
if (c != null) {
- c.close();
+ c.close().addListener(x -> makeWritable());
}
LOG.info("authplugin disconnected channel {}", channel);
}
@@ -457,7 +458,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
ChannelFuture future = bootstrap.connect(bookieAddr);
future.addListener(new ConnectionFutureListener(startTime));
-
+ future.addListener(x -> makeWritable());
return future;
}
@@ -466,6 +467,22 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
close();
}
+ /**
+ *
+ * @return boolean, true is PCBC is writable
+ */
+ public boolean isWritable() {
+ return isWritable;
+ }
+
+ public void setWritable(boolean val) {
+ isWritable = val;
+ }
+
+ private void makeWritable() {
+ setWritable(true);
+ }
+
void connectIfNeededAndDoOp(GenericCallback<PerChannelBookieClient> op) {
boolean completeOpNow = false;
int opRc = BKException.Code.OK;
@@ -567,6 +584,11 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
*/
void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBufList toSend, WriteCallback cb,
Object ctx, final int options) {
+ addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options, false);
+ }
+
+ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBufList toSend, WriteCallback cb,
+ Object ctx, final int options, boolean allowFastFail) {
Object request = null;
CompletionKey completionKey = null;
if (useV2WireProtocol) {
@@ -623,7 +645,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
toSend.release();
return;
} else {
- writeAndFlush(c, completionKey, request);
+ // addEntry times out on backpressure
+ writeAndFlush(c, completionKey, request, allowFastFail);
}
}
@@ -667,7 +690,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
ReadEntryCallback cb,
Object ctx) {
readEntryInternal(ledgerId, entryId, previousLAC, timeOutInMillis,
- piggyBackEntry, cb, ctx, (short) 0, null);
+ piggyBackEntry, cb, ctx, (short) 0, null, false);
}
/**
@@ -678,9 +701,10 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
ReadEntryCallback cb,
Object ctx,
int flags,
- byte[] masterKey) {
+ byte[] masterKey,
+ boolean allowFastFail) {
readEntryInternal(ledgerId, entryId, null, null, false,
- cb, ctx, (short) flags, masterKey);
+ cb, ctx, (short) flags, masterKey, allowFastFail);
}
private void readEntryInternal(final long ledgerId,
@@ -691,7 +715,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
final ReadEntryCallback cb,
final Object ctx,
int flags,
- byte[] masterKey) {
+ byte[] masterKey,
+ boolean allowFastFail) {
Object request = null;
CompletionKey completionKey = null;
if (useV2WireProtocol) {
@@ -765,7 +790,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
}
- writeAndFlush(channel, completionKey, request);
+ writeAndFlush(channel, completionKey, request, allowFastFail);
}
public void getBookieInfo(final long requested, GetBookieInfoCallback cb, Object ctx) {
@@ -867,6 +892,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
toClose = channel;
channel = null;
+ makeWritable();
}
if (toClose != null) {
ChannelFuture cf = closeChannel(toClose);
@@ -880,18 +906,48 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
if (LOG.isDebugEnabled()) {
LOG.debug("Closing channel {}", c);
}
+ return c.close().addListener(x -> makeWritable());
+ }
- return c.close();
+ @Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+ final Channel c = channel;
+ if (c == null || c.isWritable()) {
+ makeWritable();
+ }
+ super.channelWritabilityChanged(ctx);
}
private void writeAndFlush(final Channel channel,
final CompletionKey key,
final Object request) {
+ writeAndFlush(channel, key, request, false);
+ }
+
+ private void writeAndFlush(final Channel channel,
+ final CompletionKey key,
+ final Object request,
+ final boolean allowFastFail) {
if (channel == null) {
+ LOG.warn("Operation {} failed: channel == null", requestToString(request));
errorOut(key);
return;
}
+ final boolean isChannelWritable = channel.isWritable();
+ if (isWritable != isChannelWritable) {
+ // isWritable is volatile so simple "isWritable = channel.isWritable()" would be slower
+ isWritable = isChannelWritable;
+ }
+
+ if (allowFastFail && !isWritable) {
+ LOG.warn("Operation {} failed: TooManyRequestsException",
+ requestToString(request));
+
+ errorOut(key, BKException.Code.TooManyRequestsException);
+ return;
+ }
+
try {
final long startTime = MathUtils.nowInNano();
@@ -1321,6 +1377,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
pendingOps = new ArrayDeque<>();
}
+ makeWritable();
+
for (GenericCallback<PerChannelBookieClient> pendingOp : oldPendingOps) {
pendingOp.operationComplete(rc, PerChannelBookieClient.this);
}
@@ -2035,6 +2093,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
rc = BKException.Code.OK;
channel = future.channel();
if (shFactory != null) {
+ makeWritable();
initiateTLS();
return;
} else {
@@ -2088,6 +2147,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
for (GenericCallback<PerChannelBookieClient> pendingOp : oldPendingOps) {
pendingOp.operationComplete(rc, PerChannelBookieClient.this);
}
+
+ makeWritable();
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
index 97a6a26..aa7a5e9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
@@ -41,6 +41,17 @@ public interface PerChannelBookieClientPool {
void obtain(GenericCallback<PerChannelBookieClient> callback, long key);
/**
+ * Returns status of a client.
+ * It is suggested to delay/throttle requests to this channel if isWritable is false.
+ *
+ * @param key
+ * @return
+ */
+ default boolean isWritable(long key) {
+ return true;
+ }
+
+ /**
* record any read/write error on {@link PerChannelBookieClientPool}.
*/
void recordError();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index f7ff436..433591c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -19,6 +19,7 @@ package org.apache.bookkeeper.client;
import static org.apache.bookkeeper.client.api.BKException.Code.NoBookieAvailableException;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -68,6 +69,7 @@ import org.junit.After;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.Stubber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -159,6 +161,7 @@ public abstract class MockBookKeeperTestCase {
when(bk.getLedgerManager()).thenReturn(ledgerManager);
when(bk.getLedgerIdGenerator()).thenReturn(ledgerIdGenerator);
when(bk.getReturnRc(anyInt())).thenAnswer(invocationOnMock -> invocationOnMock.getArgument(0));
+ when(bookieClient.isWritable(any(), anyLong())).thenReturn(true);
setupLedgerIdGenerator();
setupCreateLedgerMetadata();
@@ -186,6 +189,7 @@ public abstract class MockBookKeeperTestCase {
when(bk.getCreateOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
when(bk.getRecoverAddCountLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
when(bk.getRecoverReadCountLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
+ when(bk.getAddOpUrCounter()).thenReturn(nullStatsLogger.getCounter("mock"));
return nullStatsLogger;
}
@@ -396,7 +400,7 @@ public abstract class MockBookKeeperTestCase {
@SuppressWarnings("unchecked")
protected void setupBookieClientReadEntry() {
- Answer<Void> answer = invokation -> {
+ final Stubber stub = doAnswer(invokation -> {
Object[] args = invokation.getArguments();
BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) args[0];
long ledgerId = (Long) args[1];
@@ -439,13 +443,19 @@ public abstract class MockBookKeeperTestCase {
}
});
return null;
- };
- doAnswer(answer).when(bookieClient).readEntry(any(), anyLong(), anyLong(),
+ });
+
+ stub.when(bookieClient).readEntry(any(), anyLong(), anyLong(),
any(BookkeeperInternalCallbacks.ReadEntryCallback.class),
any(), anyInt());
- doAnswer(answer).when(bookieClient).readEntry(any(), anyLong(), anyLong(),
+
+ stub.when(bookieClient).readEntry(any(), anyLong(), anyLong(),
any(BookkeeperInternalCallbacks.ReadEntryCallback.class),
any(), anyInt(), any());
+
+ stub.when(bookieClient).readEntry(any(), anyLong(), anyLong(),
+ any(BookkeeperInternalCallbacks.ReadEntryCallback.class),
+ any(), anyInt(), any(), anyBoolean());
}
private byte[] extractEntryPayload(long ledgerId, long entryId, ByteBufList toSend)
@@ -468,7 +478,7 @@ public abstract class MockBookKeeperTestCase {
@SuppressWarnings("unchecked")
protected void setupBookieClientAddEntry() {
- doAnswer(invokation -> {
+ final Stubber stub = doAnswer(invokation -> {
Object[] args = invokation.getArguments();
BookkeeperInternalCallbacks.WriteCallback callback = (BookkeeperInternalCallbacks.WriteCallback) args[5];
BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) args[0];
@@ -506,11 +516,19 @@ public abstract class MockBookKeeperTestCase {
}
});
return null;
- }).when(bookieClient).addEntry(any(BookieSocketAddress.class),
- anyLong(), any(byte[].class),
- anyLong(), any(ByteBufList.class),
- any(BookkeeperInternalCallbacks.WriteCallback.class),
- any(), anyInt());
+ });
+
+ stub.when(bookieClient).addEntry(any(BookieSocketAddress.class),
+ anyLong(), any(byte[].class),
+ anyLong(), any(ByteBufList.class),
+ any(BookkeeperInternalCallbacks.WriteCallback.class),
+ any(), anyInt());
+
+ stub.when(bookieClient).addEntry(any(BookieSocketAddress.class),
+ anyLong(), any(byte[].class),
+ anyLong(), any(ByteBufList.class),
+ any(BookkeeperInternalCallbacks.WriteCallback.class),
+ any(), anyInt(), anyBoolean());
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
index 9cf22f3..7c6f9a0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.util.List;
import java.util.Set;
@@ -46,6 +47,8 @@ import org.slf4j.LoggerFactory;
public class SlowBookieTest extends BookKeeperClusterTestCase {
private static final Logger LOG = LoggerFactory.getLogger(SlowBookieTest.class);
+ final byte[] entry = "Test Entry".getBytes();
+
public SlowBookieTest() {
super(4);
baseConf.setNumAddWorkerThreads(0);
@@ -111,7 +114,6 @@ public class SlowBookieTest extends BookKeeperClusterTestCase {
final LedgerHandle lh = bkc.createLedger(4, 3, 2, BookKeeper.DigestType.CRC32, pwd);
final AtomicBoolean finished = new AtomicBoolean(false);
final AtomicBoolean failTest = new AtomicBoolean(false);
- final byte[] entry = "Test Entry".getBytes();
Thread t = new Thread() {
public void run() {
try {
@@ -126,10 +128,13 @@ public class SlowBookieTest extends BookKeeperClusterTestCase {
};
t.start();
final CountDownLatch b0latch = new CountDownLatch(1);
+
startNewBookie();
sleepBookie(getBookie(0), b0latch);
+
Thread.sleep(10000);
b0latch.countDown();
+
finished.set(true);
t.join();
@@ -155,6 +160,148 @@ public class SlowBookieTest extends BookKeeperClusterTestCase {
}
@Test
+ public void testSlowBookieAndBackpressureOn() throws Exception {
+ final ClientConfiguration conf = new ClientConfiguration();
+ conf.setReadTimeout(5)
+ .setAddEntryTimeout(1)
+ .setAddEntryQuorumTimeout(1)
+ .setNumChannelsPerBookie(1)
+ .setZkServers(zkUtil.getZooKeeperConnectString())
+ .setClientWriteBufferLowWaterMark(1)
+ .setClientWriteBufferHighWaterMark(entry.length - 1)
+ .setWaitTimeoutOnBackpressureMillis(5000);
+
+ final boolean expectWriteError = false;
+ final boolean expectFailedTest = false;
+
+ LedgerHandle lh = doBackpressureTest(entry, conf, expectWriteError, expectFailedTest, 2000);
+ assertTrue(lh.readLastConfirmed() < 5);
+ }
+
+ @Test
+ public void testSlowBookieAndFastFailOn() throws Exception {
+ final ClientConfiguration conf = new ClientConfiguration();
+ conf.setReadTimeout(5)
+ .setAddEntryTimeout(1)
+ .setAddEntryQuorumTimeout(1)
+ .setNumChannelsPerBookie(1)
+ .setZkServers(zkUtil.getZooKeeperConnectString())
+ .setClientWriteBufferLowWaterMark(1)
+ .setClientWriteBufferHighWaterMark(2)
+ .setWaitTimeoutOnBackpressureMillis(0);
+
+ final boolean expectWriteError = true;
+ final boolean expectFailedTest = false;
+
+ LedgerHandle lh = doBackpressureTest(entry, conf, expectWriteError, expectFailedTest, 1000);
+ assertTrue(lh.readLastConfirmed() < 5);
+ }
+
+ @Test
+ public void testSlowBookieAndNoBackpressure() throws Exception {
+ final ClientConfiguration conf = new ClientConfiguration();
+ conf.setReadTimeout(5)
+ .setAddEntryTimeout(1)
+ .setAddEntryQuorumTimeout(1)
+ .setNumChannelsPerBookie(1)
+ .setZkServers(zkUtil.getZooKeeperConnectString())
+ .setClientWriteBufferLowWaterMark(1)
+ .setClientWriteBufferHighWaterMark(entry.length - 1)
+ .setWaitTimeoutOnBackpressureMillis(-1);
+
+ final boolean expectWriteError = false;
+ final boolean expectFailedTest = false;
+
+ LedgerHandle lh = doBackpressureTest(entry, conf, expectWriteError, expectFailedTest, 4000);
+
+ assertTrue(lh.readLastConfirmed() > 90);
+ }
+
+ private LedgerHandle doBackpressureTest(byte[] entry, ClientConfiguration conf,
+ boolean expectWriteError, boolean expectFailedTest,
+ long sleepInMillis) throws Exception {
+ BookKeeper bkc = new BookKeeper(conf);
+
+ byte[] pwd = new byte[] {};
+ final LedgerHandle lh = bkc.createLedger(4, 3, 1, BookKeeper.DigestType.CRC32, pwd);
+ lh.addEntry(entry);
+
+ final AtomicBoolean finished = new AtomicBoolean(false);
+ final AtomicBoolean failTest = new AtomicBoolean(false);
+ final AtomicBoolean writeError = new AtomicBoolean(false);
+ Thread t = new Thread(() -> {
+ try {
+ int count = 0;
+ while (!finished.get()) {
+ lh.asyncAddEntry(entry, (rc, lh1, entryId, ctx) -> {
+ if (rc != BKException.Code.OK) {
+ finished.set(true);
+ writeError.set(true);
+ }
+ }, null);
+ if (++count > 100) {
+ finished.set(true);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Exception in add entry thread", e);
+ failTest.set(true);
+ }
+ });
+ final CountDownLatch b0latch = new CountDownLatch(1);
+ final CountDownLatch b0latch2 = new CountDownLatch(1);
+
+
+ sleepBookie(getBookie(0), b0latch);
+ sleepBookie(getBookie(1), b0latch2);
+
+ setTargetChannelState(bkc, getBookie(0), 0, false);
+ setTargetChannelState(bkc, getBookie(1), 0, false);
+
+ t.start();
+
+ Thread.sleep(sleepInMillis);
+
+ finished.set(true);
+
+ b0latch.countDown();
+ b0latch2.countDown();
+ setTargetChannelState(bkc, getBookie(0), 0, true);
+ setTargetChannelState(bkc, getBookie(1), 0, true);
+
+ t.join();
+
+ assertEquals("write error", expectWriteError, writeError.get());
+ assertEquals("test failure", expectFailedTest, failTest.get());
+
+ lh.close();
+
+ LedgerHandle lh2 = bkc.openLedger(lh.getId(), BookKeeper.DigestType.CRC32, pwd);
+ LedgerChecker lc = new LedgerChecker(bkc);
+ final CountDownLatch checklatch = new CountDownLatch(1);
+ final AtomicInteger numFragments = new AtomicInteger(-1);
+ lc.checkLedger(lh2, (rc, fragments) -> {
+ LOG.debug("Checked ledgers returned {} {}", rc, fragments);
+ if (rc == BKException.Code.OK) {
+ numFragments.set(fragments.size());
+ LOG.error("Checked ledgers returned {} {}", rc, fragments);
+ }
+ checklatch.countDown();
+ });
+ checklatch.await();
+ assertEquals("There should be no missing fragments", 0, numFragments.get());
+
+ return lh2;
+ }
+
+ private void setTargetChannelState(BookKeeper bkc, BookieSocketAddress address,
+ long key, boolean state) throws Exception {
+ bkc.getBookieClient().lookupClient(address).obtain((rc, pcbc) -> {
+ pcbc.setWritable(state);
+ }, key);
+ }
+
+ @Test
public void testManyBookieFailureWithSlowBookies() throws Exception {
ClientConfiguration conf = new ClientConfiguration();
conf.setReadTimeout(5)
@@ -166,7 +313,6 @@ public class SlowBookieTest extends BookKeeperClusterTestCase {
final LedgerHandle lh = bkc.createLedger(4, 3, 1, BookKeeper.DigestType.CRC32, pwd);
final AtomicBoolean finished = new AtomicBoolean(false);
final AtomicBoolean failTest = new AtomicBoolean(false);
- final byte[] entry = "Test Entry".getBytes();
Thread t = new Thread() {
public void run() {
try {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
index d96dce1..7dc6277 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
@@ -277,7 +277,8 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
return;
}
- client.readEntry(1, 1, cb, null, BookieProtocol.FLAG_DO_FENCING, "00000111112222233333".getBytes());
+ client.readEntry(1, 1, cb, null, BookieProtocol.FLAG_DO_FENCING,
+ "00000111112222233333".getBytes(), false);
}
});
--
To stop receiving notification emails like this one, please contact
ayegorov@apache.org.