You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/10/23 20:19:17 UTC

[bookkeeper] branch master updated: Pool the V2 keys

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 70e4bbe  Pool the V2 keys
70e4bbe is described below

commit 70e4bbe437f81f2b226480d7c20999737f686787
Author: Ivan Kelly <iv...@midokura.com>
AuthorDate: Mon Oct 23 13:19:10 2017 -0700

    Pool the V2 keys
    
    To generate less garbage in the fast path.
    
    Author: Ivan Kelly <iv...@midokura.com>
    Author: Matteo Merli <mm...@yahoo-inc.com>
    
    Reviewers: Sijie Guo <si...@apache.org>
    
    This closes #648 from ivankelly/pool-keys
---
 .../org/apache/bookkeeper/proto/BookieClient.java  |  16 +--
 .../bookkeeper/proto/PerChannelBookieClient.java   | 113 ++++++++++++---------
 2 files changed, 73 insertions(+), 56 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index d763f57..c604b16 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -147,7 +147,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                 authProviderFactory, registry, pcbcPool, shFactory);
     }
 
-    private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr, Object key) {
+    private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
         PerChannelBookieClientPool clientPool = channels.get(addr);
         if (null == clientPool) {
             closeLock.readLock().lock();
@@ -180,7 +180,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
             final long lac, final ByteBuf toSend, final WriteLacCallback cb, final Object ctx) {
         closeLock.readLock().lock();
         try {
-            final PerChannelBookieClientPool client = lookupClient(addr, lac);
+            final PerChannelBookieClientPool client = lookupClient(addr);
             if (client == null) {
                 cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
                                   ledgerId, addr, ctx);
@@ -246,7 +246,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                          final int options) {
         closeLock.readLock().lock();
         try {
-            final PerChannelBookieClientPool client = lookupClient(addr, entryId);
+            final PerChannelBookieClientPool client = lookupClient(addr);
             if (client == null) {
                 completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
                             ledgerId, entryId, addr, cb, ctx);
@@ -300,7 +300,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                                         final Object ctx) {
         closeLock.readLock().lock();
         try {
-            final PerChannelBookieClientPool client = lookupClient(addr, entryId);
+            final PerChannelBookieClientPool client = lookupClient(addr);
             if (client == null) {
                 completeRead(getRc(BKException.Code.BookieHandleNotAvailableException),
                              ledgerId, entryId, null, cb, ctx);
@@ -325,7 +325,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
     public void readLac(final BookieSocketAddress addr, final long ledgerId, final ReadLacCallback cb, final Object ctx) {
         closeLock.readLock().lock();
         try {
-            final PerChannelBookieClientPool client = lookupClient(addr, BookieProtocol.LAST_ADD_CONFIRMED);
+            final PerChannelBookieClientPool client = lookupClient(addr);
             if (client == null) {
                 cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), ledgerId, null, null, ctx);
                 return;
@@ -359,7 +359,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                           final ReadEntryCallback cb, final Object ctx) {
         closeLock.readLock().lock();
         try {
-            final PerChannelBookieClientPool client = lookupClient(addr, entryId);
+            final PerChannelBookieClientPool client = lookupClient(addr);
             if (client == null) {
                 cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
                                      ledgerId, entryId, null, ctx);
@@ -392,7 +392,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                                           final Object ctx) {
         closeLock.readLock().lock();
         try {
-            final PerChannelBookieClientPool client = lookupClient(addr, entryId);
+            final PerChannelBookieClientPool client = lookupClient(addr);
             if (client == null) {
                 completeRead(BKException.Code.BookieHandleNotAvailableException,
                         ledgerId, entryId, null, cb, ctx);
@@ -418,7 +418,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
     public void getBookieInfo(final BookieSocketAddress addr, final long requested, final GetBookieInfoCallback cb, final Object ctx) {
         closeLock.readLock().lock();
         try {
-            final PerChannelBookieClientPool client = lookupClient(addr, BookkeeperProtocol.OperationType.GET_BOOKIE_INFO);
+            final PerChannelBookieClientPool client = lookupClient(addr);
             if (client == null) {
                 cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException), new BookieInfo(), ctx);
                 return;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 6eda724..c544701 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -51,6 +51,8 @@ import io.netty.handler.codec.LengthFieldPrepender;
 import io.netty.handler.codec.TooLongFrameException;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.HashedWheelTimer;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import io.netty.util.concurrent.Future;
@@ -529,7 +531,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         Object request = null;
         CompletionKey completionKey = null;
         if (useV2WireProtocol) {
-            completionKey = new V2CompletionKey(ledgerId, entryId, OperationType.ADD_ENTRY);
+            completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY);
             request = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
                     (short) options, masterKey, toSend);
         } else {
@@ -580,7 +582,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         Object request = null;
         CompletionKey completionKey = null;
         if (useV2WireProtocol) {
-            completionKey = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
+            completionKey = acquireV2Key(ledgerId, entryId, OperationType.READ_ENTRY);
             request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
                     BookieProtocol.FLAG_DO_FENCING, masterKey);
         } else {
@@ -605,12 +607,13 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                     .build();
         }
 
+        CompletionValue completion = new ReadCompletion(completionKey,
+                                                        cb, ctx,
+                                                        ledgerId, entryId);
         if (completionObjects.putIfAbsent(
-                    completionKey, new ReadCompletion(completionKey,
-                                                      cb, ctx,
-                                                      ledgerId, entryId)) != null) {
+                    completionKey, completion) != null) {
             // We cannot have more than 1 pending read on the same ledger/entry in the v2 protocol
-            cb.readEntryComplete(BKException.Code.BookieHandleNotAvailableException, ledgerId, entryId, null, ctx);
+            completion.errorOut(BKException.Code.BookieHandleNotAvailableException);
             return;
         }
 
@@ -623,7 +626,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         if (useV2WireProtocol) {
             request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
                     ledgerId, 0, (short) 0);
-            completionKey = new V2CompletionKey(ledgerId, 0, OperationType.READ_LAC);
+            completionKey = acquireV2Key(ledgerId, 0, OperationType.READ_LAC);
         } else {
             final long txnId = getTxnId();
             completionKey = new V3CompletionKey(txnId, OperationType.READ_LAC);
@@ -681,7 +684,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         if (useV2WireProtocol) {
             request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
                     ledgerId, entryId, (short) 0);
-            completionKey = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
+            completionKey = acquireV2Key(ledgerId, entryId, OperationType.READ_ENTRY);
         } else {
             final long txnId = getTxnId();
             completionKey = new V3CompletionKey(txnId, OperationType.READ_ENTRY);
@@ -726,13 +729,14 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                     .build();
         }
 
+        CompletionValue completion = new ReadCompletion(completionKey, cb,
+                                                        ctx, ledgerId, entryId);
         CompletionValue existingValue = completionObjects.putIfAbsent(
-                completionKey, new ReadCompletion(completionKey, cb,
-                                                  ctx, ledgerId, entryId));
+                completionKey, completion);
         if (existingValue != null) {
             // There's a pending read request on same ledger/entry. This is not supported in V2 protocol
             LOG.warn("Failing concurrent request to read at ledger: {} entry: {}", ledgerId, entryId);
-            cb.readEntryComplete(BKException.Code.UnexpectedConditionException, ledgerId, entryId, null, ctx);
+            completion.errorOut(BKException.Code.UnexpectedConditionException);
             return;
         }
 
@@ -832,28 +836,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         }
 
         try{
-            channel.writeAndFlush(request)
-                .addListener(new ChannelFutureListener() {
-                        @Override
-                        public void operationComplete(ChannelFuture future)
-                                throws Exception {
-                            if (future.isSuccess()) {
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("Successfully wrote request {} to {}",
-                                              requestToString(request),
-                                              channel.remoteAddress());
-                                }
-                            } else {
-                                if (!(future.cause()
-                                      instanceof ClosedChannelException)) {
-                                    LOG.warn("Writing request {} to {} failed : ",
-                                             requestToString(request),
-                                             channel, future.cause());
-                                }
-                                errorOut(key);
-                            }
-                        }
-                    });
+            channel.writeAndFlush(request, channel.voidPromise());
         } catch(Throwable e) {
             LOG.warn("Operation {} failed", requestToString(request), e);
             errorOut(key);
@@ -1009,7 +992,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         final OperationType operationType = getOperationType(response.getOpCode());
         final StatusCode status = getStatusCodeFromErrorCode(response.errorCode);
 
-        final CompletionValue completionValue = completionObjects.remove(new V2CompletionKey(ledgerId, entryId, operationType));
+        final CompletionKey key = acquireV2Key(ledgerId, entryId, operationType);
+        final CompletionValue completionValue = completionObjects.remove(key);
+        key.release();
 
         if (null == completionValue) {
             // Unexpected response, so log it. The txnId should have been present.
@@ -1286,7 +1271,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     class WriteLacCompletion extends CompletionValue {
         final WriteLacCallback cb;
 
-        public WriteLacCompletion(CompletionKey key,
+        public WriteLacCompletion(final CompletionKey key,
                                   final WriteLacCallback originalCallback,
                                   final Object originalCtx,
                                   final long ledgerId) {
@@ -1302,6 +1287,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                         cancelTimeoutAndLogOp(rc);
                         originalCallback.writeLacComplete(rc, ledgerId,
                                                           addr, originalCtx);
+                        key.release();
                     }
                 };
         }
@@ -1335,7 +1321,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     class ReadLacCompletion extends CompletionValue {
         final ReadLacCallback cb;
 
-        public ReadLacCompletion(CompletionKey key,
+        public ReadLacCompletion(final CompletionKey key,
                                  ReadLacCallback originalCallback,
                                  final Object ctx, final long ledgerId) {
             super("ReadLAC", ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
@@ -1350,6 +1336,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                         cancelTimeoutAndLogOp(rc);
                         originalCallback.readLacComplete(
                                 rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
+                        key.release();
                     }
                 };
         }
@@ -1392,7 +1379,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     class ReadCompletion extends CompletionValue {
         final ReadEntryCallback cb;
 
-        public ReadCompletion(CompletionKey key,
+        public ReadCompletion(final CompletionKey key,
                               final ReadEntryCallback originalCallback,
                               final Object originalCtx,
                               long ledgerId, final long entryId) {
@@ -1409,6 +1396,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                         originalCallback.readEntryComplete(rc,
                                                            ledgerId, entryId,
                                                            buffer, originalCtx);
+                        key.release();
                     }
                 };
         }
@@ -1494,7 +1482,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     class StartTLSCompletion extends CompletionValue {
         final StartTLSCallback cb;
 
-        public StartTLSCompletion(CompletionKey key) {
+        public StartTLSCompletion(final CompletionKey key) {
             super("StartTLS", null, -1, -1,
                   startTLSOpLogger, startTLSTimeoutOpLogger,
                   scheduleTimeout(key, startTLSTimeout));
@@ -1502,6 +1490,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 @Override
                 public void startTLSComplete(int rc, Object ctx) {
                     cancelTimeoutAndLogOp(rc);
+                    key.release();
                 }
             };
         }
@@ -1543,7 +1532,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     class GetBookieInfoCompletion extends CompletionValue {
         final GetBookieInfoCallback cb;
 
-        public GetBookieInfoCompletion(CompletionKey key,
+        public GetBookieInfoCompletion(final CompletionKey key,
                                        final GetBookieInfoCallback origCallback,
                                        final Object origCtx) {
             super("GetBookieInfo", origCtx, 0L, 0L,
@@ -1555,6 +1544,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                                                   Object ctx) {
                     cancelTimeoutAndLogOp(rc);
                     origCallback.getBookieInfoComplete(rc, bInfo, origCtx);
+                    key.release();
                 }
             };
         }
@@ -1594,7 +1584,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     class AddCompletion extends CompletionValue {
         final WriteCallback cb;
 
-        public AddCompletion(CompletionKey key,
+        public AddCompletion(final CompletionKey key,
                              final WriteCallback originalCallback,
                              final Object originalCtx,
                              final long ledgerId, final long entryId) {
@@ -1609,6 +1599,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                     cancelTimeoutAndLogOp(rc);
                     originalCallback.writeComplete(rc, ledgerId, entryId,
                                                    addr, originalCtx);
+                    key.release();
                 }
             };
         }
@@ -1693,7 +1684,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
 
     abstract class CompletionKey implements TimerTask {
         final long txnId;
-        final OperationType operationType;
+        OperationType operationType;
 
         CompletionKey(long txnId,
                       OperationType operationType) {
@@ -1711,6 +1702,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 completion.timeout();
             }
         }
+
+        public void release() {}
     }
 
 
@@ -1756,16 +1749,34 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         return txnIdGenerator.incrementAndGet();
     }
 
+    private final Recycler<V2CompletionKey> V2_KEY_RECYCLER = new Recycler<V2CompletionKey>() {
+            protected V2CompletionKey newObject(
+                    Recycler.Handle<V2CompletionKey> handle) {
+                return new V2CompletionKey(handle);
+            }
+        };
+
+    V2CompletionKey acquireV2Key(long ledgerId, long entryId,
+                             OperationType operationType) {
+        V2CompletionKey key = V2_KEY_RECYCLER.get();
+        key.reset(ledgerId, entryId, operationType);
+        return key;
+    }
+
     private class V2CompletionKey extends CompletionKey {
-        final long ledgerId;
-        final long entryId;
+        private final Handle<V2CompletionKey> recyclerHandle;
+        long ledgerId;
+        long entryId;
 
+        private V2CompletionKey(Handle<V2CompletionKey> handle) {
+            super(-1, null);
+            this.recyclerHandle = handle;
+        }
 
-        public V2CompletionKey(long ledgerId, long entryId, OperationType operationType) {
-            super(0, operationType);
+        void reset(long ledgerId, long entryId, OperationType operationType) {
             this.ledgerId = ledgerId;
             this.entryId = entryId;
-
+            this.operationType = operationType;
         }
 
         @Override
@@ -1774,18 +1785,24 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 return  false;
             }
             V2CompletionKey that = (V2CompletionKey) object;
-            return  this.entryId == that.entryId && this.ledgerId == that.ledgerId;
+            return this.entryId == that.entryId
+                && this.ledgerId == that.ledgerId;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(ledgerId, entryId);
+            return Long.hashCode(ledgerId) * 31 + Long.hashCode(entryId);
         }
 
         @Override
         public String toString() {
             return String.format("%d:%d %s", ledgerId, entryId, operationType);
         }
+
+        @Override
+        public void release() {
+            recyclerHandle.recycle(this);
+        }
     }
 
     public class ConnectionFutureListener implements ChannelFutureListener {

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].