You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2017/10/24 10:59:00 UTC

[bookkeeper] branch master updated: Pool AddCompletions

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 4968438  Pool AddCompletions
4968438 is described below

commit 496843893dfb91c237cbd5ae1f245f90304ea34b
Author: Ivan Kelly <iv...@midokura.com>
AuthorDate: Tue Oct 24 12:58:43 2017 +0200

    Pool AddCompletions
    
    To avoid generating garbage in the fastpath.
    
    Author: Ivan Kelly <iv...@midokura.com>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>
    
    This closes #649 from ivankelly/pool-completions
---
 .../bookkeeper/proto/PerChannelBookieClient.java   | 92 ++++++++++++++--------
 1 file changed, 60 insertions(+), 32 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index c544701..ffa030b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -562,8 +562,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         }
 
         completionObjects.put(completionKey,
-                              new AddCompletion(completionKey,
-                                                cb, ctx, ledgerId, entryId));
+                              acquireAddCompletion(completionKey,
+                                                   cb, ctx, ledgerId, entryId));
         final Channel c = channel;
         if (c == null) {
             // usually checked in writeAndFlush, but we have extra check
@@ -1162,14 +1162,14 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
 
     // visible for testing
     abstract class CompletionValue {
-        final Object ctx;
-        protected final long ledgerId;
-        protected final long entryId;
-        private final long startTime;
         private final OpStatsLogger opLogger;
         private final OpStatsLogger timeoutOpLogger;
-        protected final Timeout timeout;
         private final String operationName;
+        protected Object ctx;
+        protected long ledgerId;
+        protected long entryId;
+        protected long startTime;
+        protected Timeout timeout;
 
         public CompletionValue(String operationName,
                                Object ctx,
@@ -1192,8 +1192,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         }
 
         void cancelTimeoutAndLogOp(int rc) {
-            if (null != timeout) {
-                timeout.cancel();
+            Timeout t = timeout;
+            if (null != t) {
+                t.cancel();
             }
 
             if (rc != BKException.Code.OK) {
@@ -1580,28 +1581,55 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         }
     }
 
+    private final Recycler<AddCompletion> ADD_COMPLETION_RECYCLER = new Recycler<AddCompletion>() {
+            protected AddCompletion newObject(Recycler.Handle<AddCompletion> handle) {
+                return new AddCompletion(handle);
+            }
+        };
+
+    AddCompletion acquireAddCompletion(final CompletionKey key,
+                                       final WriteCallback originalCallback,
+                                       final Object originalCtx,
+                                       final long ledgerId, final long entryId) {
+        AddCompletion completion = ADD_COMPLETION_RECYCLER.get();
+        completion.reset(key, originalCallback, originalCtx, ledgerId, entryId);
+        return completion;
+    }
+
     // visible for testing
-    class AddCompletion extends CompletionValue {
-        final WriteCallback cb;
-
-        public AddCompletion(final CompletionKey key,
-                             final WriteCallback originalCallback,
-                             final Object originalCtx,
-                             final long ledgerId, final long entryId) {
-            super("Add", originalCtx, ledgerId, entryId,
-                  addEntryOpLogger, addTimeoutOpLogger,
-                  scheduleTimeout(key, addEntryTimeout));
-            this.cb = new WriteCallback() {
-                @Override
-                public void writeComplete(int rc, long ledgerId, long entryId,
-                                          BookieSocketAddress addr,
-                                          Object ctx) {
-                    cancelTimeoutAndLogOp(rc);
-                    originalCallback.writeComplete(rc, ledgerId, entryId,
-                                                   addr, originalCtx);
-                    key.release();
-                }
-            };
+    class AddCompletion extends CompletionValue implements WriteCallback {
+        final Recycler.Handle<AddCompletion> handle;
+
+        CompletionKey key = null;
+        WriteCallback originalCallback = null;
+
+        AddCompletion(Recycler.Handle<AddCompletion> handle) {
+            super("Add", null, -1, -1,
+                  addEntryOpLogger, addTimeoutOpLogger, null);
+            this.handle = handle;
+        }
+
+        void reset(final CompletionKey key,
+                   final WriteCallback originalCallback,
+                   final Object originalCtx,
+                   final long ledgerId, final long entryId) {
+            this.key = key;
+            this.originalCallback = originalCallback;
+            this.ctx = originalCtx;
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.startTime = MathUtils.nowInNano();
+            this.timeout = scheduleTimeout(key, addEntryTimeout);
+        }
+
+        @Override
+        public void writeComplete(int rc, long ledgerId, long entryId,
+                                  BookieSocketAddress addr,
+                                  Object ctx) {
+            cancelTimeoutAndLogOp(rc);
+            originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx);
+            key.release();
+            handle.recycle(this);
         }
 
         @Override
@@ -1612,7 +1640,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         @Override
         public void errorOut(final int rc) {
             errorOutAndRunCallback(
-                    () -> cb.writeComplete(rc, ledgerId, entryId, addr, ctx));
+                    () -> writeComplete(rc, ledgerId, entryId, addr, ctx));
         }
 
         @Override
@@ -1638,7 +1666,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                                          BKException.Code.WriteException,
                                          "ledger", ledgerId,
                                          "entry", entryId);
-            cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+            writeComplete(rc, ledgerId, entryId, addr, ctx);
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].