You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2023/03/07 17:50:08 UTC

[bookkeeper] branch master updated: Make read entry request recyclable (#3842)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 09365dfdd8 Make read entry request recyclable (#3842)
09365dfdd8 is described below

commit 09365dfdd8aa20c603d7a134d7333b907ce50d24
Author: Penghui Li <pe...@apache.org>
AuthorDate: Wed Mar 8 01:50:00 2023 +0800

    Make read entry request recyclable (#3842)
    
    * Make read entry request recyclable
    
    * Move recycle to finally block
    
    * Fix test and comments
    
    * Fix test
---
 .../bookkeeper/proto/BookieProtoEncoding.java      |  6 ++--
 .../apache/bookkeeper/proto/BookieProtocol.java    | 35 ++++++++++++++++++++--
 .../bookkeeper/proto/PerChannelBookieClient.java   |  5 ++--
 .../bookkeeper/proto/ReadEntryProcessor.java       |  1 +
 .../bookkeeper/proto/ReadEntryProcessorTest.java   |  6 ++--
 .../bookkeeper/proto/TestBackwardCompatCMS42.java  |  5 ++--
 6 files changed, 44 insertions(+), 14 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 5f886dbe0d..8cf40d2b8e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -142,7 +142,7 @@ public class BookieProtoEncoding {
                 if (r.hasMasterKey()) {
                     buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
                 }
-
+                r.recycle();
                 return buf;
             } else if (r instanceof BookieProtocol.AuthRequest) {
                 BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthRequest) r).getAuthMessage();
@@ -193,9 +193,9 @@ public class BookieProtoEncoding {
                 if ((flags & BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING
                     && version >= 2) {
                     byte[] masterKey = readMasterKey(packet);
-                    return new BookieProtocol.ReadRequest(version, ledgerId, entryId, flags, masterKey);
+                    return BookieProtocol.ReadRequest.create(version, ledgerId, entryId, flags, masterKey);
                 } else {
-                    return new BookieProtocol.ReadRequest(version, ledgerId, entryId, flags, null);
+                    return BookieProtocol.ReadRequest.create(version, ledgerId, entryId, flags, null);
                 }
             case BookieProtocol.AUTH:
                 BookkeeperProtocol.AuthMessage.Builder builder = BookkeeperProtocol.AuthMessage.newBuilder();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 89654449aa..86c3ed5469 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -362,14 +362,43 @@ public interface BookieProtocol {
      * A Request that reads data.
      */
     class ReadRequest extends Request {
-        ReadRequest(byte protocolVersion, long ledgerId, long entryId,
-                    short flags, byte[] masterKey) {
-            init(protocolVersion, READENTRY, ledgerId, entryId, flags, masterKey);
+
+        static ReadRequest create(byte protocolVersion, long ledgerId, long entryId,
+                  short flags, byte[] masterKey) {
+            ReadRequest read = RECYCLER.get();
+            read.protocolVersion = protocolVersion;
+            read.opCode = READENTRY;
+            read.ledgerId = ledgerId;
+            read.entryId = entryId;
+            read.flags = flags;
+            read.masterKey = masterKey;
+            return read;
         }
 
         boolean isFencing() {
             return (flags & FLAG_DO_FENCING) == FLAG_DO_FENCING;
         }
+
+        private final Handle<ReadRequest> recyclerHandle;
+
+        private ReadRequest(Handle<ReadRequest> recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private static final Recycler<ReadRequest> RECYCLER = new Recycler<ReadRequest>() {
+            @Override
+            protected ReadRequest newObject(Handle<ReadRequest> handle) {
+                return new ReadRequest(handle);
+            }
+        };
+
+        @Override
+        public void recycle() {
+            ledgerId = -1;
+            entryId = -1;
+            masterKey = null;
+            recyclerHandle.recycle(this);
+        }
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 47d5303bb3..4d08faad20 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -849,7 +849,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         Object request = null;
         CompletionKey completionKey = null;
         if (useV2WireProtocol) {
-            request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
+            request = BookieProtocol.ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION,
                                                      ledgerId, 0, (short) 0, null);
             completionKey = acquireV2Key(ledgerId, 0, OperationType.READ_LAC);
         } else {
@@ -933,7 +933,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         Object request = null;
         CompletionKey completionKey = null;
         if (useV2WireProtocol) {
-            request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
+            request = BookieProtocol.ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION,
                     ledgerId, entryId, (short) flags, masterKey);
             completionKey = acquireV2Key(ledgerId, entryId, OperationType.READ_ENTRY);
         } else {
@@ -1168,7 +1168,6 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                     nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
                 }
             });
-
             channel.writeAndFlush(request, promise);
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index 6935ca8be6..c44216a028 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -189,6 +189,7 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
     }
 
     private void recycle() {
+        request.recycle();
         super.reset();
         this.recyclerHandle.recycle(this);
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
index 91e100d809..251f900c09 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
@@ -107,7 +107,7 @@ public class ReadEntryProcessorTest {
 
         ExecutorService service = Executors.newCachedThreadPool();
         long ledgerId = System.currentTimeMillis();
-        ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
+        ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
                 1, BookieProtocol.FLAG_DO_FENCING, new byte[]{});
         ReadEntryProcessor processor = ReadEntryProcessor.create(
                 request, requestHandler, requestProcessor, service, true);
@@ -150,7 +150,7 @@ public class ReadEntryProcessorTest {
         }).when(channel).writeAndFlush(any(Response.class));
 
         long ledgerId = System.currentTimeMillis();
-        ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
+        ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
                 1, BookieProtocol.FLAG_DO_FENCING, new byte[]{});
         ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true);
         fenceResult.complete(result);
@@ -180,7 +180,7 @@ public class ReadEntryProcessorTest {
         }).when(channel).writeAndFlush(any(Response.class));
 
         long ledgerId = System.currentTimeMillis();
-        ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
+        ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
                 1, (short) 0, new byte[]{});
         ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true);
         processor.run();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
index 23997348e1..1edd74c1fc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
@@ -161,8 +161,9 @@ public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase {
 
         }
 
-        client.sendRequest(new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
-                                           1L, 1L, (short) 0, null));
+        ReadRequest read = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION,
+                1L, 1L, (short) 0, null);
+        client.sendRequest(read);
         Response response = client.takeResponse();
         assertEquals("Should have failed",
                      response.getErrorCode(), BookieProtocol.EUA);