You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by iv...@apache.org on 2018/08/14 09:56:23 UTC
[bookkeeper] branch master updated: Make bookie client an interface
This is an automated email from the ASF dual-hosted git repository.
ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 36d8590 Make bookie client an interface
36d8590 is described below
commit 36d8590e42f8f34f79568dad7494c38c28adecb4
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Tue Aug 14 11:56:15 2018 +0200
Make bookie client an interface
So that it can be mocked easily for testing.
This patch also contains a simple mock.
Author: Ivan Kelly <iv...@apache.org>
Reviewers: Sijie Guo <si...@apache.org>
This closes #1595 from ivankelly/bookie-client-iface
---
.../apache/bookkeeper/benchmark/BenchBookie.java | 3 +-
.../org/apache/bookkeeper/bookie/BookieShell.java | 3 +-
.../org/apache/bookkeeper/client/BookKeeper.java | 5 +-
.../org/apache/bookkeeper/client/LedgerHandle.java | 12 +-
.../org/apache/bookkeeper/proto/BookieClient.java | 713 +++++----------------
.../{BookieClient.java => BookieClientImpl.java} | 37 +-
.../apache/bookkeeper/client/SlowBookieTest.java | 3 +-
.../client/TestGetBookieInfoTimeout.java | 3 +-
.../apache/bookkeeper/proto/MockBookieClient.java | 288 +++++++++
.../apache/bookkeeper/test/BookieClientTest.java | 13 +-
10 files changed, 503 insertions(+), 577 deletions(-)
diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index e1c5979..216aff9 100644
--- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -174,7 +175,7 @@ public class BenchBookie {
new DefaultThreadFactory("BookKeeperClientScheduler"));
ClientConfiguration conf = new ClientConfiguration();
- BookieClient bc = new BookieClient(conf, eventLoop, executor, scheduler, NullStatsLogger.INSTANCE);
+ BookieClient bc = new BookieClientImpl(conf, eventLoop, executor, scheduler, NullStatsLogger.INSTANCE);
LatencyCallback lc = new LatencyCallback();
ThroughputCallback tc = new ThroughputCallback();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 5b4eff8..4434cfc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -100,6 +100,7 @@ import org.apache.bookkeeper.meta.UnderreplicatedLedger;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
@@ -829,7 +830,7 @@ public class BookieShell implements Tool {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("BookKeeperClientSchedulerPool"));
- BookieClient bookieClient = new BookieClient(conf, eventLoopGroup, executor,
+ BookieClient bookieClient = new BookieClientImpl(conf, eventLoopGroup, executor,
scheduler, NullStatsLogger.INSTANCE);
LongStream.range(firstEntry, lastEntry).forEach(entryId -> {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 09d2526..1d909b6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -74,6 +74,7 @@ import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.DataFormats;
import org.apache.bookkeeper.stats.Counter;
@@ -507,8 +508,8 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
this.readLACSpeculativeRequestPolicy = Optional.<SpeculativeRequestExecutionPolicy>absent();
}
// initialize bookie client
- this.bookieClient = new BookieClient(conf, this.eventLoopGroup, this.mainWorkerPool,
- scheduler, statsLogger);
+ this.bookieClient = new BookieClientImpl(conf, this.eventLoopGroup, this.mainWorkerPool,
+ scheduler, statsLogger);
this.bookieWatcher = new BookieWatcherImpl(
conf, this.placementPolicy, metadataDriver.getRegistrationClient(),
this.statsLogger.scope(WATCHER_SCOPE));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index a646651..535584b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -80,7 +80,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.TimedGenericCallback;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
-import org.apache.bookkeeper.proto.PerChannelBookieClientPool;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.proto.checksum.MacDigestManager;
import org.apache.bookkeeper.stats.Counter;
@@ -99,8 +98,6 @@ import org.slf4j.LoggerFactory;
public class LedgerHandle implements WriteHandle {
static final Logger LOG = LoggerFactory.getLogger(LedgerHandle.class);
- static final long PENDINGREQ_NOTWRITABLE_MASK = 0x01L << 62;
-
final byte[] ledgerKey;
private LedgerMetadata metadata;
final BookKeeper bk;
@@ -225,14 +222,7 @@ public class LedgerHandle implements WriteHandle {
@Override
public long getBookiePendingRequests(BookieSocketAddress bookieSocketAddress) {
- PerChannelBookieClientPool pcbcPool = bk.bookieClient.lookupClient(bookieSocketAddress);
- if (pcbcPool == null) {
- return 0;
- } else if (pcbcPool.isWritable(ledgerId)) {
- return pcbcPool.getNumPendingCompletionRequests();
- } else {
- return pcbcPool.getNumPendingCompletionRequests() | PENDINGREQ_NOTWRITABLE_MASK;
- }
+ return bk.bookieClient.getNumPendingRequests(bookieSocketAddress, ledgerId);
}
};
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 197483d..85a4ef9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -20,579 +20,208 @@
*/
package org.apache.bookkeeper.proto;
-import static com.google.common.base.Charsets.UTF_8;
-import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
-
-import com.google.common.collect.Lists;
-import com.google.protobuf.ExtensionRegistry;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
-import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
-import org.apache.bookkeeper.auth.ClientAuthProvider;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.api.WriteFlag;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.common.util.SafeRunnable;
-import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.tls.SecurityException;
-import org.apache.bookkeeper.tls.SecurityHandlerFactory;
import org.apache.bookkeeper.util.ByteBufList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * Implements the client-side part of the BookKeeper protocol.
- *
+ * Low level client for talking to bookies.
*/
-public class BookieClient implements PerChannelBookieClientFactory {
- static final Logger LOG = LoggerFactory.getLogger(BookieClient.class);
-
- // This is global state that should be across all BookieClients
- AtomicLong totalBytesOutstanding = new AtomicLong();
-
- OrderedExecutor executor;
- ScheduledExecutorService scheduler;
- ScheduledFuture<?> timeoutFuture;
-
- EventLoopGroup eventLoopGroup;
- final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool> channels =
- new ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool>();
-
- private final ClientAuthProvider.Factory authProviderFactory;
- private final ExtensionRegistry registry;
-
- private final ClientConfiguration conf;
- private volatile boolean closed;
- private final ReentrantReadWriteLock closeLock;
- private final StatsLogger statsLogger;
- private final int numConnectionsPerBookie;
-
- private final long bookieErrorThresholdPerInterval;
-
- public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
- OrderedExecutor executor, ScheduledExecutorService scheduler,
- StatsLogger statsLogger) throws IOException {
- this.conf = conf;
- this.eventLoopGroup = eventLoopGroup;
- this.executor = executor;
- this.closed = false;
- this.closeLock = new ReentrantReadWriteLock();
-
- this.registry = ExtensionRegistry.newInstance();
- this.authProviderFactory = AuthProviderFactoryFactory.newClientAuthProviderFactory(conf);
-
- this.statsLogger = statsLogger;
- this.numConnectionsPerBookie = conf.getNumChannelsPerBookie();
- this.bookieErrorThresholdPerInterval = conf.getBookieErrorThresholdPerInterval();
-
- this.scheduler = scheduler;
- if (conf.getAddEntryTimeout() > 0 || conf.getReadEntryTimeout() > 0) {
- SafeRunnable monitor = safeRun(() -> {
- monitorPendingOperations();
- });
- this.timeoutFuture = this.scheduler.scheduleAtFixedRate(monitor,
- conf.getTimeoutMonitorIntervalSec(),
- conf.getTimeoutMonitorIntervalSec(),
- TimeUnit.SECONDS);
- }
- }
-
- private int getRc(int rc) {
- if (BKException.Code.OK == rc) {
- return rc;
- } else {
- if (closed) {
- return BKException.Code.ClientClosedException;
- } else {
- return rc;
- }
- }
- }
-
- public List<BookieSocketAddress> getFaultyBookies() {
- List<BookieSocketAddress> faultyBookies = Lists.newArrayList();
- for (PerChannelBookieClientPool channelPool : channels.values()) {
- if (channelPool instanceof DefaultPerChannelBookieClientPool) {
- DefaultPerChannelBookieClientPool pool = (DefaultPerChannelBookieClientPool) channelPool;
- if (pool.errorCounter.getAndSet(0) >= bookieErrorThresholdPerInterval) {
- faultyBookies.add(pool.address);
- }
- }
- }
- return faultyBookies;
- }
-
- public boolean isWritable(BookieSocketAddress address, long key) {
- final PerChannelBookieClientPool pcbcPool = lookupClient(address);
- // if null, let the write initiate connect of fail with whatever error it produces
- return pcbcPool == null || pcbcPool.isWritable(key);
- }
-
- @Override
- public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool,
- SecurityHandlerFactory shFactory) throws SecurityException {
- return new PerChannelBookieClient(conf, executor, eventLoopGroup, address, statsLogger,
- authProviderFactory, registry, pcbcPool, shFactory);
- }
-
- public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
- PerChannelBookieClientPool clientPool = channels.get(addr);
- if (null == clientPool) {
- closeLock.readLock().lock();
- try {
- if (closed) {
- return null;
- }
- PerChannelBookieClientPool newClientPool =
- new DefaultPerChannelBookieClientPool(conf, this, addr, numConnectionsPerBookie);
- PerChannelBookieClientPool oldClientPool = channels.putIfAbsent(addr, newClientPool);
- if (null == oldClientPool) {
- clientPool = newClientPool;
- // initialize the pool only after we put the pool into the map
- clientPool.intialize();
- } else {
- clientPool = oldClientPool;
- newClientPool.close(false);
- }
- } catch (SecurityException e) {
- LOG.error("Security Exception in creating new default PCBC pool: ", e);
- return null;
- } finally {
- closeLock.readLock().unlock();
- }
- }
- return clientPool;
- }
-
- public void forceLedger(final BookieSocketAddress addr, final long ledgerId,
- final ForceLedgerCallback cb, final Object ctx) {
- final PerChannelBookieClientPool client = lookupClient(addr);
- if (client == null) {
- cb.forceLedgerComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
- ledgerId, addr, ctx);
- return;
- }
-
- client.obtain((rc, pcbc) -> {
- if (rc != BKException.Code.OK) {
- try {
- executor.executeOrdered(ledgerId, safeRun(() -> {
- cb.forceLedgerComplete(rc, ledgerId, addr, ctx);
- }));
- } catch (RejectedExecutionException re) {
- cb.forceLedgerComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
- }
- } else {
- pcbc.forceLedger(ledgerId, cb, ctx);
- }
- }, ledgerId);
- }
-
- public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
- final long lac, final ByteBufList toSend, final WriteLacCallback cb, final Object ctx) {
- final PerChannelBookieClientPool client = lookupClient(addr);
- if (client == null) {
- cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
- ledgerId, addr, ctx);
- return;
- }
-
- toSend.retain();
- client.obtain((rc, pcbc) -> {
- if (rc != BKException.Code.OK) {
- try {
- executor.executeOrdered(ledgerId, safeRun(() -> {
- cb.writeLacComplete(rc, ledgerId, addr, ctx);
- }));
- } catch (RejectedExecutionException re) {
- cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
- }
- } else {
- pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
- }
-
- toSend.release();
- }, ledgerId);
- }
-
- private void completeAdd(final int rc,
- final long ledgerId,
- final long entryId,
- final BookieSocketAddress addr,
- final WriteCallback cb,
- final Object ctx) {
- try {
- executor.executeOrdered(ledgerId, new SafeRunnable() {
- @Override
- public void safeRun() {
- cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
- }
- @Override
- public String toString() {
- return String.format("CompleteWrite(ledgerId=%d, entryId=%d, addr=%s)", ledgerId, entryId, addr);
- }
- });
- } catch (RejectedExecutionException ree) {
- cb.writeComplete(getRc(BKException.Code.InterruptedException), ledgerId, entryId, addr, ctx);
- }
- }
-
- public void addEntry(final BookieSocketAddress addr,
- final long ledgerId,
- final byte[] masterKey,
- final long entryId,
- final ByteBufList toSend,
- final WriteCallback cb,
- final Object ctx,
- final int options,
- final boolean allowFastFail,
- final EnumSet<WriteFlag> writeFlags) {
- final PerChannelBookieClientPool client = lookupClient(addr);
- if (client == null) {
- completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
- ledgerId, entryId, addr, cb, ctx);
- return;
- }
-
- // Retain the buffer, since the connection could be obtained after
- // the PendingApp might have already failed
- toSend.retain();
-
- client.obtain(ChannelReadyForAddEntryCallback.create(
- this, toSend, ledgerId, entryId, addr,
- ctx, cb, options, masterKey, allowFastFail, writeFlags),
- ledgerId);
- }
-
- private void completeRead(final int rc,
- final long ledgerId,
- final long entryId,
- final ByteBuf entry,
- final ReadEntryCallback cb,
- final Object ctx) {
- try {
- executor.executeOrdered(ledgerId, new SafeRunnable() {
- @Override
- public void safeRun() {
- cb.readEntryComplete(rc, ledgerId, entryId, entry, ctx);
- }
- });
- } catch (RejectedExecutionException ree) {
- cb.readEntryComplete(getRc(BKException.Code.InterruptedException),
- ledgerId, entryId, entry, ctx);
- }
- }
-
- private static class ChannelReadyForAddEntryCallback
- implements GenericCallback<PerChannelBookieClient> {
- private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
-
- private BookieClient bookieClient;
- private ByteBufList toSend;
- private long ledgerId;
- private long entryId;
- private BookieSocketAddress addr;
- private Object ctx;
- private WriteCallback cb;
- private int options;
- private byte[] masterKey;
- private boolean allowFastFail;
- private EnumSet<WriteFlag> writeFlags;
-
- static ChannelReadyForAddEntryCallback create(
- BookieClient bookieClient, ByteBufList toSend, long ledgerId,
- long entryId, BookieSocketAddress addr, Object ctx,
- WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail,
- EnumSet<WriteFlag> writeFlags) {
- ChannelReadyForAddEntryCallback callback = RECYCLER.get();
- callback.bookieClient = bookieClient;
- callback.toSend = toSend;
- callback.ledgerId = ledgerId;
- callback.entryId = entryId;
- callback.addr = addr;
- callback.ctx = ctx;
- callback.cb = cb;
- callback.options = options;
- callback.masterKey = masterKey;
- callback.allowFastFail = allowFastFail;
- callback.writeFlags = writeFlags;
- return callback;
- }
-
- @Override
- public void operationComplete(final int rc,
- PerChannelBookieClient pcbc) {
- if (rc != BKException.Code.OK) {
- bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
- } else {
- pcbc.addEntry(ledgerId, masterKey, entryId,
- toSend, cb, ctx, options, allowFastFail, writeFlags);
- }
-
- toSend.release();
- recycle();
- }
-
- private ChannelReadyForAddEntryCallback(
- Handle<ChannelReadyForAddEntryCallback> recyclerHandle) {
- this.recyclerHandle = recyclerHandle;
- }
+public interface BookieClient {
+ long PENDINGREQ_NOTWRITABLE_MASK = 0x01L << 62;
- private static final Recycler<ChannelReadyForAddEntryCallback> RECYCLER =
- new Recycler<ChannelReadyForAddEntryCallback>() {
- protected ChannelReadyForAddEntryCallback newObject(
- Recycler.Handle<ChannelReadyForAddEntryCallback> recyclerHandle) {
- return new ChannelReadyForAddEntryCallback(recyclerHandle);
- }
- };
-
- public void recycle() {
- bookieClient = null;
- toSend = null;
- ledgerId = -1;
- entryId = -1;
- addr = null;
- ctx = null;
- cb = null;
- options = -1;
- masterKey = null;
- allowFastFail = false;
- writeFlags = null;
- recyclerHandle.recycle(this);
- }
- }
-
- public void readLac(final BookieSocketAddress addr, final long ledgerId, final ReadLacCallback cb,
- final Object ctx) {
- final PerChannelBookieClientPool client = lookupClient(addr);
- if (client == null) {
- cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), ledgerId, null, null,
- ctx);
- return;
- }
- client.obtain((rc, pcbc) -> {
- if (rc != BKException.Code.OK) {
- try {
- executor.executeOrdered(ledgerId, safeRun(() -> {
- cb.readLacComplete(rc, ledgerId, null, null, ctx);
- }));
- } catch (RejectedExecutionException re) {
- cb.readLacComplete(getRc(BKException.Code.InterruptedException),
- ledgerId, null, null, ctx);
- }
- } else {
- pcbc.readLac(ledgerId, cb, ctx);
- }
- }, ledgerId);
- }
-
- public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId,
- ReadEntryCallback cb, Object ctx, int flags) {
- readEntry(addr, ledgerId, entryId, cb, ctx, flags, null);
- }
-
- public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
- final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey) {
- readEntry(addr, ledgerId, entryId, cb, ctx, flags, masterKey, false);
- }
-
- public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
- final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey,
- final boolean allowFastFail) {
- final PerChannelBookieClientPool client = lookupClient(addr);
- if (client == null) {
- cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
- ledgerId, entryId, null, ctx);
- return;
- }
+ /**
+ * Get the list of bookies which have exhibited more error responses
+ * than a configured threshold.
+ *
+ * @return the list of faulty bookies
+ */
+ List<BookieSocketAddress> getFaultyBookies();
- client.obtain((rc, pcbc) -> {
- if (rc != BKException.Code.OK) {
- completeRead(rc, ledgerId, entryId, null, cb, ctx);
- } else {
- pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey, allowFastFail);
- }
- }, ledgerId);
- }
+ /**
+ * Check whether the channel used to write to a bookie channel is writable.
+ * A channel becomes non-writable when its buffer become full, and will stay
+ * non-writable until some of the buffer is cleared.
+ *
+ * <p>This can be used to apply backpressure. If a channel is not writable,
+ * requests will end up queuing.
+ *
+ * <p>As as we use pooling, there may be multiple channels per bookie, so
+ * we also pass the ledger ID to check the writability of the correct
+ * channel.
+ *
+ * <p>This has nothing to do with the bookie read-only status.
+ *
+ * @param address the address of the bookie
+ * @param ledgerId the ledger we wish to send a request to
+ *
+ */
+ boolean isWritable(BookieSocketAddress address, long ledgerId);
+ /**
+ * Get the number of outstanding requests on the channel used to connect
+ * to a bookie at {@code address} for a ledger with {@code ledgerId}.
+ * It is necessary to specify the ledgerId as there may be multiple
+ * channels for a single bookie if pooling is in use.
+ * If the bookie is not {@link #isWritable(BookieSocketAddress,long) writable},
+ * then the {@link #PENDINGREQ_NOTWRITABLE_MASK} will be logically or'd with
+ * the returned value.
+ *
+ * @param address the address of the bookie
+ * @param ledgerId the ledger whose channel we wish to query
+ * @return the number of requests currently outstanding
+ */
+ long getNumPendingRequests(BookieSocketAddress address, long ledgerId);
- public void readEntryWaitForLACUpdate(final BookieSocketAddress addr,
- final long ledgerId,
- final long entryId,
- final long previousLAC,
- final long timeOutInMillis,
- final boolean piggyBackEntry,
- final ReadEntryCallback cb,
- final Object ctx) {
- final PerChannelBookieClientPool client = lookupClient(addr);
- if (client == null) {
- completeRead(BKException.Code.BookieHandleNotAvailableException,
- ledgerId, entryId, null, cb, ctx);
- return;
- }
+ /**
+ * Send a force request to the server. When complete all entries which have
+ * been written for {@code ledgerId} to this bookie will be persisted on disk.
+ * This is for use with {@link org.apache.bookkeeper.client.api.WriteFlag#DEFERRED_SYNC}.
+ *
+ * @param address the address of the bookie
+ * @param ledgerId the ledger whose entries we want persisted
+ * @param cb the callback notified when the request completes
+ * @param ctx a context object passed to the callback on completion
+ */
+ void forceLedger(BookieSocketAddress address, long ledgerId,
+ ForceLedgerCallback cb, Object ctx);
- client.obtain((rc, pcbc) -> {
- if (rc != BKException.Code.OK) {
- completeRead(rc, ledgerId, entryId, null, cb, ctx);
- } else {
- pcbc.readEntryWaitForLACUpdate(ledgerId, entryId, previousLAC, timeOutInMillis, piggyBackEntry, cb,
- ctx);
- }
- }, ledgerId);
- }
+ /**
+ * Read the last add confirmed for ledger {@code ledgerId} from the bookie at
+ * {@code address}.
+ *
+ * @param address the address of the bookie
+ * @param ledgerId the ledger whose last add confirm we wish to know
+ * @param cb the callback notified when the request completes
+ * @param ctx a context object passed to the callback on completion
+ */
+ void readLac(BookieSocketAddress address, long ledgerId, ReadLacCallback cb, Object ctx);
- public void getBookieInfo(final BookieSocketAddress addr, final long requested, final GetBookieInfoCallback cb,
- final Object ctx) {
- final PerChannelBookieClientPool client = lookupClient(addr);
- if (client == null) {
- cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException), new BookieInfo(),
- ctx);
- return;
- }
- client.obtain((rc, pcbc) -> {
- if (rc != BKException.Code.OK) {
- try {
- executor.submit(safeRun(() -> {
- cb.getBookieInfoComplete(rc, new BookieInfo(), ctx);
- }));
- } catch (RejectedExecutionException re) {
- cb.getBookieInfoComplete(getRc(BKException.Code.InterruptedException),
- new BookieInfo(), ctx);
- }
- } else {
- pcbc.getBookieInfo(requested, cb, ctx);
- }
- }, requested);
- }
+ /**
+ * Explicitly write the last add confirmed for ledger {@code ledgerId} to the bookie at
+ * {@code address}.
+ *
+ * @param address the address of the bookie
+ * @param ledgerId the ledger whose last add confirm we wish to know
+ * @param masterKey the master key of the ledger
+ * @param lac the last add confirmed we wish to write
+ * @param toSend a buffer also containing the lac, along with a digest
+ * @param cb the callback notified when the request completes
+ * @param ctx a context object passed to the callback on completion
+ */
+ void writeLac(BookieSocketAddress address, long ledgerId, byte[] masterKey,
+ long lac, ByteBufList toSend, WriteLacCallback cb, Object ctx);
- private void monitorPendingOperations() {
- for (PerChannelBookieClientPool clientPool : channels.values()) {
- clientPool.checkTimeoutOnPendingOperations();
- }
- }
+ /**
+ * Add an entry for ledger {@code ledgerId} on the bookie at address {@code address}.
+ *
+ * @param address the address of the bookie
+ * @param ledgerId the ledger to which we wish to add the entry
+ * @param entryId the id of the entry we wish to add
+ * @param toSend a buffer containing the entry and its digest
+ * @param cb the callback notified when the request completes
+ * @param ctx a context object passed to the callback on completion
+ * @param options a bit mask of flags from BookieProtocol.FLAG_*
+ * {@link org.apache.bookkeeper.proto.BookieProtocol}
+ * @param allowFastFail fail the add immediately if the channel is non-writable
+ * {@link #isWritable(BookieSocketAddress,long)}
+ * @param writeFlags a set of write flags
+ * {@link org.apache.bookkeeper.client.api.WriteFlags}
+ */
+ void addEntry(BookieSocketAddress address, long ledgerId, byte[] masterKey,
+ long entryId, ByteBufList toSend, WriteCallback cb, Object ctx,
+ int options, boolean allowFastFail, EnumSet<WriteFlag> writeFlags);
- public boolean isClosed() {
- return closed;
+ /**
+ * Read entry with a null masterkey, disallowing failfast.
+ * @see #readEntry(BookieSocketAddress,long,long,ReadEntryCallback,Object,int,byte[],boolean)
+ */
+ default void readEntry(BookieSocketAddress address, long ledgerId, long entryId,
+ ReadEntryCallback cb, Object ctx, int flags) {
+ readEntry(address, ledgerId, entryId, cb, ctx, flags, null);
}
- public void close() {
- closeLock.writeLock().lock();
- try {
- closed = true;
- for (PerChannelBookieClientPool pool : channels.values()) {
- pool.close(true);
- }
- channels.clear();
- authProviderFactory.close();
-
- if (timeoutFuture != null) {
- timeoutFuture.cancel(false);
- }
- } finally {
- closeLock.writeLock().unlock();
- }
+ /**
+ * Read entry, disallowing failfast.
+ * @see #readEntry(BookieSocketAddress,long,long,ReadEntryCallback,Object,int,byte[],boolean)
+ */
+ default void readEntry(BookieSocketAddress address, long ledgerId, long entryId,
+ ReadEntryCallback cb, Object ctx, int flags, byte[] masterKey) {
+ readEntry(address, ledgerId, entryId, cb, ctx, flags, masterKey, false);
}
- private static class Counter {
- int i;
- int total;
-
- synchronized void inc() {
- i++;
- total++;
- }
-
- synchronized void dec() {
- i--;
- notifyAll();
- }
-
- synchronized void wait(int limit) throws InterruptedException {
- while (i > limit) {
- wait();
- }
- }
+ /**
+ * Read an entry from bookie at address {@code address}.
+ *
+ * @param address address of the bookie to read from
+ * @param ledgerId id of the ledger the entry belongs to
+ * @param entryId id of the entry we wish to read
+ * @param cb the callback notified when the request completes
+ * @param ctx a context object passed to the callback on completion
+ * @param flags a bit mask of flags from BookieProtocol.FLAG_*
+ * {@link org.apache.bookkeeper.proto.BookieProtocol}
+ * @param masterKey the master key of the ledger being read from. This is only required
+ * if the FLAG_DO_FENCING is specified.
+ * @param allowFastFail fail the read immediately if the channel is non-writable
+ * {@link #isWritable(BookieSocketAddress,long)}
+ */
+ void readEntry(BookieSocketAddress address, long ledgerId, long entryId,
+ ReadEntryCallback cb, Object ctx, int flags, byte[] masterKey,
+ boolean allowFastFail);
- synchronized int total() {
- return total;
- }
- }
+ /**
+ * Send a long poll request to bookie, waiting for the last add confirmed
+ * to be updated. The client can also request that the full entry is returned
+ * with the new last add confirmed.
+ *
+ * @param address address of bookie to send the long poll address to
+ * @param ledgerId ledger whose last add confirmed we are interested in
+ * @param entryId the id of the entry we expect to read
+ * @param previousLAC the previous lac value
+ * @param timeOutInMillis number of millis to wait for LAC update
+ * @param piggyBackEntry whether to read the requested entry when LAC is updated
+ * @param cb the callback notified when the request completes
+ * @param ctx a context object passed to the callback on completion
+ */
+ void readEntryWaitForLACUpdate(BookieSocketAddress address,
+ long ledgerId,
+ long entryId,
+ long previousLAC,
+ long timeOutInMillis,
+ boolean piggyBackEntry,
+ ReadEntryCallback cb,
+ Object ctx);
/**
- * @param args
- * @throws IOException
- * @throws NumberFormatException
- * @throws InterruptedException
+ * Read information about the bookie, from the bookie.
+ *
+ * @param address the address of the bookie to request information from
+ * @param requested a bitset specifying which pieces of information to request
+ * {@link org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest}
+ * @param cb the callback notified when the request completes
+ * @param ctx a context object passed to the callback on completion
+ *
+ * @see org.apache.bookkeeper.client.BookieInfoReader.BookieInfo
*/
- public static void main(String[] args) throws NumberFormatException, IOException, InterruptedException {
- if (args.length != 3) {
- System.err.println("USAGE: BookieClient bookieHost port ledger#");
- return;
- }
- WriteCallback cb = new WriteCallback() {
+ void getBookieInfo(BookieSocketAddress address, long requested,
+ GetBookieInfoCallback cb, Object ctx);
- public void writeComplete(int rc, long ledger, long entry, BookieSocketAddress addr, Object ctx) {
- Counter counter = (Counter) ctx;
- counter.dec();
- if (rc != 0) {
- System.out.println("rc = " + rc + " for " + entry + "@" + ledger);
- }
- }
- };
- Counter counter = new Counter();
- byte hello[] = "hello".getBytes(UTF_8);
- long ledger = Long.parseLong(args[2]);
- EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
- OrderedExecutor executor = OrderedExecutor.newBuilder()
- .name("BookieClientWorker")
- .numThreads(1)
- .build();
- ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
- new DefaultThreadFactory("BookKeeperClientScheduler"));
- BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor,
- scheduler, NullStatsLogger.INSTANCE);
- BookieSocketAddress addr = new BookieSocketAddress(args[0], Integer.parseInt(args[1]));
+ /**
+ * @return whether bookie client object has been closed
+ */
+ boolean isClosed();
- for (int i = 0; i < 100000; i++) {
- counter.inc();
- bc.addEntry(addr, ledger, new byte[0], i,
- ByteBufList.get(Unpooled.wrappedBuffer(hello)), cb, counter, 0, false,
- WriteFlag.NONE);
- }
- counter.wait(0);
- System.out.println("Total = " + counter.total());
- scheduler.shutdown();
- eventLoopGroup.shutdownGracefully();
- executor.shutdown();
- }
+ /**
+ * Close the bookie client object.
+ */
+ void close();
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
similarity index 95%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
copy to bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index 197483d..50dd85f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -43,7 +43,6 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
@@ -74,12 +73,9 @@ import org.slf4j.LoggerFactory;
* Implements the client-side part of the BookKeeper protocol.
*
*/
-public class BookieClient implements PerChannelBookieClientFactory {
+public class BookieClientImpl implements BookieClient, PerChannelBookieClientFactory {
static final Logger LOG = LoggerFactory.getLogger(BookieClient.class);
- // This is global state that should be across all BookieClients
- AtomicLong totalBytesOutstanding = new AtomicLong();
-
OrderedExecutor executor;
ScheduledExecutorService scheduler;
ScheduledFuture<?> timeoutFuture;
@@ -99,9 +95,9 @@ public class BookieClient implements PerChannelBookieClientFactory {
private final long bookieErrorThresholdPerInterval;
- public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
- OrderedExecutor executor, ScheduledExecutorService scheduler,
- StatsLogger statsLogger) throws IOException {
+ public BookieClientImpl(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
+ OrderedExecutor executor, ScheduledExecutorService scheduler,
+ StatsLogger statsLogger) throws IOException {
this.conf = conf;
this.eventLoopGroup = eventLoopGroup;
this.executor = executor;
@@ -139,6 +135,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
}
}
+ @Override
public List<BookieSocketAddress> getFaultyBookies() {
List<BookieSocketAddress> faultyBookies = Lists.newArrayList();
for (PerChannelBookieClientPool channelPool : channels.values()) {
@@ -152,6 +149,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
return faultyBookies;
}
+ @Override
public boolean isWritable(BookieSocketAddress address, long key) {
final PerChannelBookieClientPool pcbcPool = lookupClient(address);
// if null, let the write initiate connect of fail with whatever error it produces
@@ -159,6 +157,18 @@ public class BookieClient implements PerChannelBookieClientFactory {
}
@Override
+ public long getNumPendingRequests(BookieSocketAddress address, long ledgerId) {
+ PerChannelBookieClientPool pcbcPool = lookupClient(address);
+ if (pcbcPool == null) {
+ return 0;
+ } else if (pcbcPool.isWritable(ledgerId)) {
+ return pcbcPool.getNumPendingCompletionRequests();
+ } else {
+ return pcbcPool.getNumPendingCompletionRequests() | PENDINGREQ_NOTWRITABLE_MASK;
+ }
+ }
+
+ @Override
public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool,
SecurityHandlerFactory shFactory) throws SecurityException {
return new PerChannelBookieClient(conf, executor, eventLoopGroup, address, statsLogger,
@@ -194,6 +204,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
return clientPool;
}
+ @Override
public void forceLedger(final BookieSocketAddress addr, final long ledgerId,
final ForceLedgerCallback cb, final Object ctx) {
final PerChannelBookieClientPool client = lookupClient(addr);
@@ -218,6 +229,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
}, ledgerId);
}
+ @Override
public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
final long lac, final ByteBufList toSend, final WriteLacCallback cb, final Object ctx) {
final PerChannelBookieClientPool client = lookupClient(addr);
@@ -267,6 +279,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
}
}
+ @Override
public void addEntry(final BookieSocketAddress addr,
final long ledgerId,
final byte[] masterKey,
@@ -317,7 +330,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
implements GenericCallback<PerChannelBookieClient> {
private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
- private BookieClient bookieClient;
+ private BookieClientImpl bookieClient;
private ByteBufList toSend;
private long ledgerId;
private long entryId;
@@ -330,7 +343,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
private EnumSet<WriteFlag> writeFlags;
static ChannelReadyForAddEntryCallback create(
- BookieClient bookieClient, ByteBufList toSend, long ledgerId,
+ BookieClientImpl bookieClient, ByteBufList toSend, long ledgerId,
long entryId, BookieSocketAddress addr, Object ctx,
WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail,
EnumSet<WriteFlag> writeFlags) {
@@ -579,8 +592,8 @@ public class BookieClient implements PerChannelBookieClientFactory {
.build();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("BookKeeperClientScheduler"));
- BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor,
- scheduler, NullStatsLogger.INSTANCE);
+ BookieClientImpl bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
+ scheduler, NullStatsLogger.INSTANCE);
BookieSocketAddress addr = new BookieSocketAddress(args[0], Integer.parseInt(args[1]));
for (int i = 0; i < 100000; i++) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
index 7c6f9a0..71a9b11 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Test;
@@ -296,7 +297,7 @@ public class SlowBookieTest extends BookKeeperClusterTestCase {
private void setTargetChannelState(BookKeeper bkc, BookieSocketAddress address,
long key, boolean state) throws Exception {
- bkc.getBookieClient().lookupClient(address).obtain((rc, pcbc) -> {
+ ((BookieClientImpl) bkc.getBookieClient()).lookupClient(address).obtain((rc, pcbc) -> {
pcbc.setWritable(state);
}, key);
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
index a91dffa..6291413 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
@@ -38,6 +38,7 @@ import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol;
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -107,7 +108,7 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase {
// try to get bookie info from the sleeping bookie. It should fail with timeout error
BookieSocketAddress addr = new BookieSocketAddress(bookieToSleep.getSocketAddress().getHostString(),
bookieToSleep.getPort());
- BookieClient bc = new BookieClient(cConf, eventLoopGroup, executor, scheduler, NullStatsLogger.INSTANCE);
+ BookieClient bc = new BookieClientImpl(cConf, eventLoopGroup, executor, scheduler, NullStatsLogger.INSTANCE);
long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
| BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
new file mode 100644
index 0000000..79cc5ba
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
@@ -0,0 +1,288 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.apache.bookkeeper.util.SafeRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mock implementation of BookieClient.
+ */
+public class MockBookieClient implements BookieClient {
+ static final Logger LOG = LoggerFactory.getLogger(MockBookieClient.class);
+
+ final OrderedExecutor executor;
+ final ConcurrentHashMap<BookieSocketAddress, ConcurrentHashMap<Long, LedgerData>> data = new ConcurrentHashMap<>();
+ final Set<BookieSocketAddress> errorBookies =
+ Collections.newSetFromMap(new ConcurrentHashMap<BookieSocketAddress, Boolean>());
+
+ final Map<BookieSocketAddress, Boolean> stalledBookies = new HashMap<>();
+ final Map<BookieSocketAddress, List<Consumer<Integer>>> stalledRequests = new HashMap<>();
+
+ public MockBookieClient(OrderedExecutor executor) {
+ this.executor = executor;
+ }
+
+ public void stallBookie(BookieSocketAddress bookie) {
+ synchronized (this) {
+ stalledBookies.put(bookie, true);
+ }
+ }
+
+ public void releaseStalledBookie(BookieSocketAddress bookie, int rc) {
+ synchronized (this) {
+ stalledBookies.remove(bookie);
+ stalledRequests.remove(bookie).forEach((r) -> r.accept(rc));
+ }
+ }
+
+ public void errorBookies(BookieSocketAddress... bookies) {
+ for (BookieSocketAddress b : bookies) {
+ errorBookies.add(b);
+ }
+ }
+
+ public void removeErrors(BookieSocketAddress... bookies) {
+ for (BookieSocketAddress b : bookies) {
+ errorBookies.remove(b);
+ }
+ }
+
+ @Override
+ public List<BookieSocketAddress> getFaultyBookies() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean isWritable(BookieSocketAddress address, long ledgerId) {
+ return true;
+ }
+
+ @Override
+ public long getNumPendingRequests(BookieSocketAddress address, long ledgerId) {
+ return 0;
+ }
+
+ @Override
+ public void forceLedger(BookieSocketAddress addr, long ledgerId,
+ ForceLedgerCallback cb, Object ctx) {
+ executor.executeOrdered(ledgerId,
+ safeRun(() -> {
+ cb.forceLedgerComplete(BKException.Code.IllegalOpException,
+ ledgerId, addr, ctx);
+ }));
+ }
+
+ @Override
+ public void writeLac(BookieSocketAddress addr, long ledgerId, byte[] masterKey,
+ long lac, ByteBufList toSend, WriteLacCallback cb, Object ctx) {
+ executor.executeOrdered(ledgerId,
+ safeRun(() -> {
+ cb.writeLacComplete(BKException.Code.IllegalOpException,
+ ledgerId, addr, ctx);
+ }));
+ }
+
+ @Override
+ public void addEntry(BookieSocketAddress addr, long ledgerId, byte[] masterKey,
+ long entryId, ByteBufList toSend, WriteCallback cb, Object ctx,
+ int options, boolean allowFastFail, EnumSet<WriteFlag> writeFlags) {
+ SafeRunnable write = safeRun(() -> {
+ LOG.info("[{};L{}] write entry {}", addr, ledgerId, entryId);
+ if (errorBookies.contains(addr)) {
+ LOG.warn("[{};L{}] erroring write {}", addr, ledgerId, entryId);
+ cb.writeComplete(BKException.Code.WriteException, ledgerId, entryId, addr, ctx);
+ return;
+ }
+ LedgerData ledger = getBookieData(addr).computeIfAbsent(ledgerId, LedgerData::new);
+ ledger.addEntry(entryId, copyData(toSend));
+ cb.writeComplete(BKException.Code.OK, ledgerId, entryId, addr, ctx);
+ toSend.release();
+ });
+
+ toSend.retain();
+ synchronized (this) {
+ if (stalledBookies.getOrDefault(addr, false)) {
+ LOG.info("[{};{};{}] Stalling write {}", addr, ledgerId, System.identityHashCode(write), entryId);
+ stalledRequests.computeIfAbsent(addr, (key) -> new ArrayList<>())
+ .add((rc) -> {
+ LOG.info("[{};{};{}] Unstalled write {}",
+ addr, ledgerId, System.identityHashCode(write), entryId);
+ if (rc == BKException.Code.OK) {
+ executor.executeOrdered(ledgerId, write);
+ } else {
+ executor.executeOrdered(
+ ledgerId, safeRun(() -> {
+ cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+ toSend.release();
+ }));
+ }
+ });
+ } else {
+ executor.executeOrdered(ledgerId, write);
+ }
+ }
+ }
+
+ @Override
+ public void readLac(BookieSocketAddress addr, long ledgerId, ReadLacCallback cb, Object ctx) {
+ executor.executeOrdered(ledgerId,
+ safeRun(() -> {
+ cb.readLacComplete(BKException.Code.IllegalOpException,
+ ledgerId, null, null, ctx);
+ }));
+ }
+
+ @Override
+ public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId,
+ ReadEntryCallback cb, Object ctx, int flags, byte[] masterKey,
+ boolean allowFastFail) {
+ executor.executeOrdered(ledgerId,
+ safeRun(() -> {
+ LOG.info("[{};L{}] read entry {}", addr, ledgerId, entryId);
+ if (errorBookies.contains(addr)) {
+ LOG.warn("[{};L{}] erroring read {}", addr, ledgerId, entryId);
+ cb.readEntryComplete(BKException.Code.ReadException, ledgerId, entryId, null, ctx);
+ return;
+ }
+
+ LedgerData ledger = getBookieData(addr).get(ledgerId);
+ if (ledger == null) {
+ LOG.warn("[{};L{}] ledger not found", addr, ledgerId);
+ cb.readEntryComplete(BKException.Code.NoSuchLedgerExistsException,
+ ledgerId, entryId, null, ctx);
+ return;
+ }
+
+ ByteBuf entry = ledger.getEntry(entryId);
+ if (entry == null) {
+ LOG.warn("[{};L{}] entry({}) not found", addr, ledgerId, entryId);
+ cb.readEntryComplete(BKException.Code.NoSuchEntryException,
+ ledgerId, entryId, null, ctx);
+ return;
+ }
+
+ cb.readEntryComplete(BKException.Code.OK,
+ ledgerId, entryId, entry.slice(), ctx);
+ }));
+ }
+
+ @Override
+ public void readEntryWaitForLACUpdate(BookieSocketAddress addr,
+ long ledgerId,
+ long entryId,
+ long previousLAC,
+ long timeOutInMillis,
+ boolean piggyBackEntry,
+ ReadEntryCallback cb,
+ Object ctx) {
+ executor.executeOrdered(ledgerId,
+ safeRun(() -> {
+ cb.readEntryComplete(BKException.Code.IllegalOpException,
+ ledgerId, entryId, null, ctx);
+ }));
+ }
+
+ @Override
+ public void getBookieInfo(BookieSocketAddress addr, long requested,
+ GetBookieInfoCallback cb, Object ctx) {
+ executor.executeOrdered(addr,
+ safeRun(() -> {
+ cb.getBookieInfoComplete(BKException.Code.IllegalOpException,
+ null, ctx);
+ }));
+ }
+
+ @Override
+ public boolean isClosed() {
+ return false;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ private ConcurrentHashMap<Long, LedgerData> getBookieData(BookieSocketAddress addr) {
+ return data.computeIfAbsent(addr, (key) -> new ConcurrentHashMap<>());
+ }
+
+ private static ByteBuf copyData(ByteBufList list) {
+ ByteBuf buf = Unpooled.buffer(list.readableBytes());
+ for (int i = 0; i < list.size(); i++) {
+ buf.writeBytes(list.getBuffer(i).slice());
+ }
+ return buf;
+ }
+
+ private static class LedgerData {
+ final long ledgerId;
+ private TreeMap<Long, ByteBuf> entries = new TreeMap<>();
+ LedgerData(long ledgerId) {
+ this.ledgerId = ledgerId;
+ }
+
+ void addEntry(long entryId, ByteBuf entry) {
+ entries.put(entryId, entry);
+ }
+
+ ByteBuf getEntry(long entryId) {
+ if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
+ Map.Entry<Long, ByteBuf> lastEntry = entries.lastEntry();
+ if (lastEntry != null) {
+ return lastEntry.getValue();
+ } else {
+ return null;
+ }
+ } else {
+ return entries.get(entryId);
+ }
+ }
+ }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 7ff6386..c6cc72b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -47,6 +47,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
@@ -156,8 +157,8 @@ public class BookieClientTest {
BookieSocketAddress addr = bs.getLocalAddress();
ResultStruct arc = new ResultStruct();
- BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor,
- scheduler, NullStatsLogger.INSTANCE);
+ BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
+ scheduler, NullStatsLogger.INSTANCE);
ByteBufList bb = createByteBuffer(1, 1, 1);
bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
synchronized (arc) {
@@ -257,8 +258,8 @@ public class BookieClientTest {
public void testNoLedger() throws Exception {
ResultStruct arc = new ResultStruct();
BookieSocketAddress addr = bs.getLocalAddress();
- BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor,
- scheduler, NullStatsLogger.INSTANCE);
+ BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
+ scheduler, NullStatsLogger.INSTANCE);
synchronized (arc) {
bc.readEntry(addr, 2, 13, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
@@ -269,8 +270,8 @@ public class BookieClientTest {
@Test
public void testGetBookieInfo() throws IOException, InterruptedException {
BookieSocketAddress addr = bs.getLocalAddress();
- BookieClient bc = new BookieClient(new ClientConfiguration(), new NioEventLoopGroup(), executor,
- scheduler, NullStatsLogger.INSTANCE);
+ BookieClient bc = new BookieClientImpl(new ClientConfiguration(), new NioEventLoopGroup(), executor,
+ scheduler, NullStatsLogger.INSTANCE);
long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
| BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;