You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/10/23 20:19:17 UTC
[bookkeeper] branch master updated: Pool the V2 keys
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 70e4bbe Pool the V2 keys
70e4bbe is described below
commit 70e4bbe437f81f2b226480d7c20999737f686787
Author: Ivan Kelly <iv...@midokura.com>
AuthorDate: Mon Oct 23 13:19:10 2017 -0700
Pool the V2 keys
To generate less garbage in the fast path.
Author: Ivan Kelly <iv...@midokura.com>
Author: Matteo Merli <mm...@yahoo-inc.com>
Reviewers: Sijie Guo <si...@apache.org>
This closes #648 from ivankelly/pool-keys
---
.../org/apache/bookkeeper/proto/BookieClient.java | 16 +--
.../bookkeeper/proto/PerChannelBookieClient.java | 113 ++++++++++++---------
2 files changed, 73 insertions(+), 56 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index d763f57..c604b16 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -147,7 +147,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
authProviderFactory, registry, pcbcPool, shFactory);
}
- private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr, Object key) {
+ private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
PerChannelBookieClientPool clientPool = channels.get(addr);
if (null == clientPool) {
closeLock.readLock().lock();
@@ -180,7 +180,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
final long lac, final ByteBuf toSend, final WriteLacCallback cb, final Object ctx) {
closeLock.readLock().lock();
try {
- final PerChannelBookieClientPool client = lookupClient(addr, lac);
+ final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, addr, ctx);
@@ -246,7 +246,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
final int options) {
closeLock.readLock().lock();
try {
- final PerChannelBookieClientPool client = lookupClient(addr, entryId);
+ final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, entryId, addr, cb, ctx);
@@ -300,7 +300,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
final Object ctx) {
closeLock.readLock().lock();
try {
- final PerChannelBookieClientPool client = lookupClient(addr, entryId);
+ final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
completeRead(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, entryId, null, cb, ctx);
@@ -325,7 +325,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
public void readLac(final BookieSocketAddress addr, final long ledgerId, final ReadLacCallback cb, final Object ctx) {
closeLock.readLock().lock();
try {
- final PerChannelBookieClientPool client = lookupClient(addr, BookieProtocol.LAST_ADD_CONFIRMED);
+ final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), ledgerId, null, null, ctx);
return;
@@ -359,7 +359,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
final ReadEntryCallback cb, final Object ctx) {
closeLock.readLock().lock();
try {
- final PerChannelBookieClientPool client = lookupClient(addr, entryId);
+ final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, entryId, null, ctx);
@@ -392,7 +392,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
final Object ctx) {
closeLock.readLock().lock();
try {
- final PerChannelBookieClientPool client = lookupClient(addr, entryId);
+ final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
completeRead(BKException.Code.BookieHandleNotAvailableException,
ledgerId, entryId, null, cb, ctx);
@@ -418,7 +418,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
public void getBookieInfo(final BookieSocketAddress addr, final long requested, final GetBookieInfoCallback cb, final Object ctx) {
closeLock.readLock().lock();
try {
- final PerChannelBookieClientPool client = lookupClient(addr, BookkeeperProtocol.OperationType.GET_BOOKIE_INFO);
+ final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException), new BookieInfo(), ctx);
return;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 6eda724..c544701 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -51,6 +51,8 @@ import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.HashedWheelTimer;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
@@ -529,7 +531,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
Object request = null;
CompletionKey completionKey = null;
if (useV2WireProtocol) {
- completionKey = new V2CompletionKey(ledgerId, entryId, OperationType.ADD_ENTRY);
+ completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY);
request = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
(short) options, masterKey, toSend);
} else {
@@ -580,7 +582,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
Object request = null;
CompletionKey completionKey = null;
if (useV2WireProtocol) {
- completionKey = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
+ completionKey = acquireV2Key(ledgerId, entryId, OperationType.READ_ENTRY);
request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
BookieProtocol.FLAG_DO_FENCING, masterKey);
} else {
@@ -605,12 +607,13 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
.build();
}
+ CompletionValue completion = new ReadCompletion(completionKey,
+ cb, ctx,
+ ledgerId, entryId);
if (completionObjects.putIfAbsent(
- completionKey, new ReadCompletion(completionKey,
- cb, ctx,
- ledgerId, entryId)) != null) {
+ completionKey, completion) != null) {
// We cannot have more than 1 pending read on the same ledger/entry in the v2 protocol
- cb.readEntryComplete(BKException.Code.BookieHandleNotAvailableException, ledgerId, entryId, null, ctx);
+ completion.errorOut(BKException.Code.BookieHandleNotAvailableException);
return;
}
@@ -623,7 +626,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
if (useV2WireProtocol) {
request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
ledgerId, 0, (short) 0);
- completionKey = new V2CompletionKey(ledgerId, 0, OperationType.READ_LAC);
+ completionKey = acquireV2Key(ledgerId, 0, OperationType.READ_LAC);
} else {
final long txnId = getTxnId();
completionKey = new V3CompletionKey(txnId, OperationType.READ_LAC);
@@ -681,7 +684,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
if (useV2WireProtocol) {
request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
ledgerId, entryId, (short) 0);
- completionKey = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
+ completionKey = acquireV2Key(ledgerId, entryId, OperationType.READ_ENTRY);
} else {
final long txnId = getTxnId();
completionKey = new V3CompletionKey(txnId, OperationType.READ_ENTRY);
@@ -726,13 +729,14 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
.build();
}
+ CompletionValue completion = new ReadCompletion(completionKey, cb,
+ ctx, ledgerId, entryId);
CompletionValue existingValue = completionObjects.putIfAbsent(
- completionKey, new ReadCompletion(completionKey, cb,
- ctx, ledgerId, entryId));
+ completionKey, completion);
if (existingValue != null) {
// There's a pending read request on same ledger/entry. This is not supported in V2 protocol
LOG.warn("Failing concurrent request to read at ledger: {} entry: {}", ledgerId, entryId);
- cb.readEntryComplete(BKException.Code.UnexpectedConditionException, ledgerId, entryId, null, ctx);
+ completion.errorOut(BKException.Code.UnexpectedConditionException);
return;
}
@@ -832,28 +836,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
try{
- channel.writeAndFlush(request)
- .addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future)
- throws Exception {
- if (future.isSuccess()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully wrote request {} to {}",
- requestToString(request),
- channel.remoteAddress());
- }
- } else {
- if (!(future.cause()
- instanceof ClosedChannelException)) {
- LOG.warn("Writing request {} to {} failed : ",
- requestToString(request),
- channel, future.cause());
- }
- errorOut(key);
- }
- }
- });
+ channel.writeAndFlush(request, channel.voidPromise());
} catch(Throwable e) {
LOG.warn("Operation {} failed", requestToString(request), e);
errorOut(key);
@@ -1009,7 +992,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
final OperationType operationType = getOperationType(response.getOpCode());
final StatusCode status = getStatusCodeFromErrorCode(response.errorCode);
- final CompletionValue completionValue = completionObjects.remove(new V2CompletionKey(ledgerId, entryId, operationType));
+ final CompletionKey key = acquireV2Key(ledgerId, entryId, operationType);
+ final CompletionValue completionValue = completionObjects.remove(key);
+ key.release();
if (null == completionValue) {
// Unexpected response, so log it. The txnId should have been present.
@@ -1286,7 +1271,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
class WriteLacCompletion extends CompletionValue {
final WriteLacCallback cb;
- public WriteLacCompletion(CompletionKey key,
+ public WriteLacCompletion(final CompletionKey key,
final WriteLacCallback originalCallback,
final Object originalCtx,
final long ledgerId) {
@@ -1302,6 +1287,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
cancelTimeoutAndLogOp(rc);
originalCallback.writeLacComplete(rc, ledgerId,
addr, originalCtx);
+ key.release();
}
};
}
@@ -1335,7 +1321,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
class ReadLacCompletion extends CompletionValue {
final ReadLacCallback cb;
- public ReadLacCompletion(CompletionKey key,
+ public ReadLacCompletion(final CompletionKey key,
ReadLacCallback originalCallback,
final Object ctx, final long ledgerId) {
super("ReadLAC", ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
@@ -1350,6 +1336,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
cancelTimeoutAndLogOp(rc);
originalCallback.readLacComplete(
rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
+ key.release();
}
};
}
@@ -1392,7 +1379,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
class ReadCompletion extends CompletionValue {
final ReadEntryCallback cb;
- public ReadCompletion(CompletionKey key,
+ public ReadCompletion(final CompletionKey key,
final ReadEntryCallback originalCallback,
final Object originalCtx,
long ledgerId, final long entryId) {
@@ -1409,6 +1396,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
originalCallback.readEntryComplete(rc,
ledgerId, entryId,
buffer, originalCtx);
+ key.release();
}
};
}
@@ -1494,7 +1482,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
class StartTLSCompletion extends CompletionValue {
final StartTLSCallback cb;
- public StartTLSCompletion(CompletionKey key) {
+ public StartTLSCompletion(final CompletionKey key) {
super("StartTLS", null, -1, -1,
startTLSOpLogger, startTLSTimeoutOpLogger,
scheduleTimeout(key, startTLSTimeout));
@@ -1502,6 +1490,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
@Override
public void startTLSComplete(int rc, Object ctx) {
cancelTimeoutAndLogOp(rc);
+ key.release();
}
};
}
@@ -1543,7 +1532,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
class GetBookieInfoCompletion extends CompletionValue {
final GetBookieInfoCallback cb;
- public GetBookieInfoCompletion(CompletionKey key,
+ public GetBookieInfoCompletion(final CompletionKey key,
final GetBookieInfoCallback origCallback,
final Object origCtx) {
super("GetBookieInfo", origCtx, 0L, 0L,
@@ -1555,6 +1544,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
Object ctx) {
cancelTimeoutAndLogOp(rc);
origCallback.getBookieInfoComplete(rc, bInfo, origCtx);
+ key.release();
}
};
}
@@ -1594,7 +1584,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
class AddCompletion extends CompletionValue {
final WriteCallback cb;
- public AddCompletion(CompletionKey key,
+ public AddCompletion(final CompletionKey key,
final WriteCallback originalCallback,
final Object originalCtx,
final long ledgerId, final long entryId) {
@@ -1609,6 +1599,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
cancelTimeoutAndLogOp(rc);
originalCallback.writeComplete(rc, ledgerId, entryId,
addr, originalCtx);
+ key.release();
}
};
}
@@ -1693,7 +1684,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
abstract class CompletionKey implements TimerTask {
final long txnId;
- final OperationType operationType;
+ OperationType operationType;
CompletionKey(long txnId,
OperationType operationType) {
@@ -1711,6 +1702,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
completion.timeout();
}
}
+
+ public void release() {}
}
@@ -1756,16 +1749,34 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
return txnIdGenerator.incrementAndGet();
}
+ private final Recycler<V2CompletionKey> V2_KEY_RECYCLER = new Recycler<V2CompletionKey>() {
+ protected V2CompletionKey newObject(
+ Recycler.Handle<V2CompletionKey> handle) {
+ return new V2CompletionKey(handle);
+ }
+ };
+
+ V2CompletionKey acquireV2Key(long ledgerId, long entryId,
+ OperationType operationType) {
+ V2CompletionKey key = V2_KEY_RECYCLER.get();
+ key.reset(ledgerId, entryId, operationType);
+ return key;
+ }
+
private class V2CompletionKey extends CompletionKey {
- final long ledgerId;
- final long entryId;
+ private final Handle<V2CompletionKey> recyclerHandle;
+ long ledgerId;
+ long entryId;
+ private V2CompletionKey(Handle<V2CompletionKey> handle) {
+ super(-1, null);
+ this.recyclerHandle = handle;
+ }
- public V2CompletionKey(long ledgerId, long entryId, OperationType operationType) {
- super(0, operationType);
+ void reset(long ledgerId, long entryId, OperationType operationType) {
this.ledgerId = ledgerId;
this.entryId = entryId;
-
+ this.operationType = operationType;
}
@Override
@@ -1774,18 +1785,24 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
return false;
}
V2CompletionKey that = (V2CompletionKey) object;
- return this.entryId == that.entryId && this.ledgerId == that.ledgerId;
+ return this.entryId == that.entryId
+ && this.ledgerId == that.ledgerId;
}
@Override
public int hashCode() {
- return Objects.hash(ledgerId, entryId);
+ return Long.hashCode(ledgerId) * 31 + Long.hashCode(entryId);
}
@Override
public String toString() {
return String.format("%d:%d %s", ledgerId, entryId, operationType);
}
+
+ @Override
+ public void release() {
+ recyclerHandle.recycle(this);
+ }
}
public class ConnectionFutureListener implements ChannelFutureListener {
--
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].