You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2017/12/06 08:28:59 UTC

[GitHub] sijie closed pull request #809: Recycle responses callbacks

sijie closed pull request #809: Recycle responses callbacks
URL: https://github.com/apache/bookkeeper/pull/809
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 183bb7b01..ceac6978a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -346,6 +346,16 @@ protected ChannelReadyForAddEntryCallback newObject(
                 };
 
         public void recycle() {
+            bookieClient = null;
+            toSend = null;
+            ledgerId = -1;
+            entryId = -1;
+            addr = null;
+            ctx = null;
+            cb = null;
+            options = -1;
+            masterKey = null;
+
             recyclerHandle.recycle(this);
         }
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index e6dad782b..9333c1e2a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -1001,13 +1001,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
     }
 
     private void readV2Response(final BookieProtocol.Response response) {
-        final long ledgerId = response.ledgerId;
-        final long entryId = response.entryId;
+        OperationType operationType = getOperationType(response.getOpCode());
+        StatusCode status = getStatusCodeFromErrorCode(response.errorCode);
 
-        final OperationType operationType = getOperationType(response.getOpCode());
-        final StatusCode status = getStatusCodeFromErrorCode(response.errorCode);
-
-        final CompletionKey key = acquireV2Key(ledgerId, entryId, operationType);
+        CompletionKey key = acquireV2Key(response.ledgerId, response.entryId, operationType);
         CompletionValue completionValue = completionObjects.remove(key);
         key.release();
         if (completionValue == null) {
@@ -1024,20 +1021,84 @@ private void readV2Response(final BookieProtocol.Response response) {
             // Unexpected response, so log it. The txnId should have been present.
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Unexpected response received from bookie : " + addr + " for type : " + operationType
-                        + " and ledger:entry : " + ledgerId + ":" + entryId);
+                        + " and ledger:entry : " + response.ledgerId + ":" + response.entryId);
             }
         } else {
             long orderingKey = completionValue.ledgerId;
-            final CompletionValue finalCompletionValue = completionValue;
+            executor.submitOrdered(orderingKey,
+                    ReadV2ResponseCallback.create(completionValue, response.ledgerId, response.entryId,
+                                                  status, response));
+        }
+    }
+
+    private static class ReadV2ResponseCallback extends SafeRunnable {
+        CompletionValue completionValue;
+        long ledgerId;
+        long entryId;
+        StatusCode status;
+        BookieProtocol.Response response;
+
+        static ReadV2ResponseCallback create(CompletionValue completionValue, long ledgerId, long entryId,
+                                             StatusCode status, BookieProtocol.Response response) {
+            ReadV2ResponseCallback callback = RECYCLER.get();
+            callback.completionValue = completionValue;
+            callback.ledgerId = ledgerId;
+            callback.entryId = entryId;
+            callback.status = status;
+            callback.response = response;
+            return callback;
+        }
+
+        @Override
+        public void safeRun() {
+            completionValue.handleV2Response(ledgerId, entryId, status, response);
+            response.recycle();
+            recycle();
+        }
+
+        void recycle() {
+            completionValue = null;
+            ledgerId = -1;
+            entryId = -1;
+            status = null;
+            response = null;
+            recyclerHandle.recycle(this);
+        }
+
+        private final Handle<ReadV2ResponseCallback> recyclerHandle;
 
-            executor.submitOrdered(orderingKey, () -> {
-                    finalCompletionValue.handleV2Response(ledgerId, entryId, status, response);
-                    response.recycle();
-                });
+        private ReadV2ResponseCallback(Handle<ReadV2ResponseCallback> recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
         }
+
+        private static final Recycler<ReadV2ResponseCallback> RECYCLER = new Recycler<ReadV2ResponseCallback>() {
+            @Override
+            protected ReadV2ResponseCallback newObject(Handle<ReadV2ResponseCallback> handle) {
+                return new ReadV2ResponseCallback(handle);
+            }
+        };
     }
 
-    private StatusCode getStatusCodeFromErrorCode(int errorCode) {
+    private static OperationType getOperationType(byte opCode) {
+        switch (opCode) {
+            case BookieProtocol.ADDENTRY:
+                return  OperationType.ADD_ENTRY;
+            case BookieProtocol.READENTRY:
+                return OperationType.READ_ENTRY;
+            case BookieProtocol.AUTH:
+                return OperationType.AUTH;
+            case BookieProtocol.READ_LAC:
+                return OperationType.READ_LAC;
+            case BookieProtocol.WRITE_LAC:
+                return OperationType.WRITE_LAC;
+            case BookieProtocol.GET_BOOKIE_INFO:
+                return OperationType.GET_BOOKIE_INFO;
+            default:
+                throw new IllegalArgumentException("Invalid operation type " + opCode);
+        }
+    }
+
+    private static StatusCode getStatusCodeFromErrorCode(int errorCode) {
         switch (errorCode) {
             case BookieProtocol.EOK:
                 return StatusCode.EOK;
@@ -1064,25 +1125,6 @@ private StatusCode getStatusCodeFromErrorCode(int errorCode) {
         }
     }
 
-    private OperationType getOperationType(byte opCode) {
-        switch (opCode) {
-            case BookieProtocol.ADDENTRY:
-                return  OperationType.ADD_ENTRY;
-            case BookieProtocol.READENTRY:
-                return OperationType.READ_ENTRY;
-            case BookieProtocol.AUTH:
-                return OperationType.AUTH;
-            case BookieProtocol.READ_LAC:
-                return OperationType.READ_LAC;
-            case BookieProtocol.WRITE_LAC:
-                return OperationType.WRITE_LAC;
-            case BookieProtocol.GET_BOOKIE_INFO:
-                return OperationType.GET_BOOKIE_INFO;
-            default:
-                throw new IllegalArgumentException("Invalid operation type");
-        }
-    }
-
     private void readV3Response(final Response response) {
         final BKPacketHeader header = response.getHeader();
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services