You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2017/11/02 06:55:25 UTC

[GitHub] ivankelly closed pull request #665: Recycle AddRequest/AddResponse objects

ivankelly closed pull request #665: Recycle AddRequest/AddResponse objects
URL: https://github.com/apache/bookkeeper/pull/665
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 28324b1b4..0ad9a9d35 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -175,7 +175,6 @@ protected void doAsyncAddEntry(final PendingAddOp op) {
             throttler.acquire();
         }
 
-        final long currentLength;
         boolean wasClosed = false;
         synchronized (this) {
             // synchronized on this to ensure that
@@ -183,12 +182,11 @@ protected void doAsyncAddEntry(final PendingAddOp op) {
             // updating lastAddPushed
             if (metadata.isClosed()) {
                 wasClosed = true;
-                currentLength = 0;
             } else {
-                currentLength = addToLength(op.payload.readableBytes());
+                long currentLength = addToLength(op.payload.readableBytes());
+                op.setLedgerLength(currentLength);
                 pendingAddOps.add(op);
             }
-            op.setLedgerLength(currentLength);
         }
 
         if (wasClosed) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index 409fe4b4a..07807858d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -103,7 +103,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
                 BookieProtocol.Request req = (BookieProtocol.Request) msg;
                 if (req.getOpCode() == BookieProtocol.ADDENTRY) {
                     ctx.channel().writeAndFlush(
-                            new BookieProtocol.AddResponse(
+                            BookieProtocol.AddResponse.create(
                                     req.getProtocolVersion(), BookieProtocol.EUA,
                                     req.getLedgerId(), req.getEntryId()));
                 } else if (req.getOpCode() == BookieProtocol.READENTRY) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 0fece29c5..df7f3b271 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -102,7 +102,9 @@ public Object encode(Object msg, ByteBufAllocator allocator)
                 ByteBuf buf = allocator.buffer(totalHeaderSize);
                 buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), r.getFlags()));
                 buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
-                return DoubleByteBuf.get(buf, ar.getData());
+                ByteBuf data = ar.getData();
+                ar.recycle();
+                return DoubleByteBuf.get(buf, data);
             } else if (r instanceof BookieProtocol.ReadRequest) {
                 int totalHeaderSize = 4 // for request type
                     + 8 // for ledgerId
@@ -155,7 +157,9 @@ public Object decode(ByteBuf packet)
                 // Read ledger and entry id without advancing the reader index
                 ledgerId = packet.getLong(packet.readerIndex());
                 entryId = packet.getLong(packet.readerIndex() + 8);
-                return new BookieProtocol.AddRequest(version, ledgerId, entryId, flags, masterKey, packet.retain());
+                return BookieProtocol.AddRequest.create(
+                        version, ledgerId, entryId, flags,
+                        masterKey, packet.retain());
             }
 
             case BookieProtocol.READENTRY:
@@ -223,29 +227,33 @@ public Object encode(Object msg, ByteBufAllocator allocator)
             buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), (short) 0));
 
             ServerStats.getInstance().incrementPacketsSent();
-            if (msg instanceof BookieProtocol.ReadResponse) {
-                buf.writeInt(r.getErrorCode());
-                buf.writeLong(r.getLedgerId());
-                buf.writeLong(r.getEntryId());
+            try {
+                if (msg instanceof BookieProtocol.ReadResponse) {
+                    buf.writeInt(r.getErrorCode());
+                    buf.writeLong(r.getLedgerId());
+                    buf.writeLong(r.getEntryId());
+
+                    BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse) r;
+                    if (rr.hasData()) {
+                        return DoubleByteBuf.get(buf, rr.getData());
+                    } else {
+                        return buf;
+                    }
+                } else if (msg instanceof BookieProtocol.AddResponse) {
+                    buf.writeInt(r.getErrorCode());
+                    buf.writeLong(r.getLedgerId());
+                    buf.writeLong(r.getEntryId());
 
-                BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse)r;
-                if (rr.hasData()) {
-                    return DoubleByteBuf.get(buf, rr.getData());
-                } else {
                     return buf;
+                } else if (msg instanceof BookieProtocol.AuthResponse) {
+                    BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse) r).getAuthMessage();
+                    return DoubleByteBuf.get(buf, Unpooled.wrappedBuffer(am.toByteArray()));
+                } else {
+                    LOG.error("Cannot encode unknown response type {}", msg.getClass().getName());
+                    return msg;
                 }
-            } else if (msg instanceof BookieProtocol.AddResponse) {
-                buf.writeInt(r.getErrorCode());
-                buf.writeLong(r.getLedgerId());
-                buf.writeLong(r.getEntryId());
-
-                return buf;
-            } else if (msg instanceof BookieProtocol.AuthResponse) {
-                BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse)r).getAuthMessage();
-                return DoubleByteBuf.get(buf, Unpooled.wrappedBuffer(am.toByteArray()));
-            } else {
-                LOG.error("Cannot encode unknown response type {}", msg.getClass().getName());
-                return msg;
+            } finally {
+                r.recycle();
             }
         }
         @Override
@@ -263,7 +271,7 @@ public Object decode(ByteBuf buffer)
                 rc = buffer.readInt();
                 ledgerId = buffer.readLong();
                 entryId = buffer.readLong();
-                return new BookieProtocol.AddResponse(version, rc, ledgerId, entryId);
+                return BookieProtocol.AddResponse.create(version, rc, ledgerId, entryId);
             case BookieProtocol.READENTRY:
                 rc = buffer.readInt();
                 ledgerId = buffer.readLong();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 6fc91e51e..2c7a82835 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -23,6 +23,9 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.ReferenceCounted;
 
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
 
@@ -40,7 +43,7 @@
     public static final byte LOWEST_COMPAT_PROTOCOL_VERSION = 0;
 
     /**
-     * Current version of the protocol, which client will use. 
+     * Current version of the protocol, which client will use.
      */
     public static final byte CURRENT_PROTOCOL_VERSION = 2;
 
@@ -62,19 +65,19 @@
      */
     public static final int MASTER_KEY_LENGTH = 20;
 
-    /** 
+    /**
      * The first int of a packet is the header.
      * It contains the version, opCode and flags.
      * The initial versions of BK didn't have this structure
-     * and just had an int representing the opCode as the 
-     * first int. This handles that case also. 
+     * and just had an int representing the opCode as the
+     * first int. This handles that case also.
      */
     final static class PacketHeader {
         public static int toInt(byte version, byte opCode, short flags) {
             if (version == 0) {
                 return (int)opCode;
             } else {
-                return ((version & 0xFF) << 24) 
+                return ((version & 0xFF) << 24)
                     | ((opCode & 0xFF) << 16)
                     | (flags & 0xFFFF);
             }
@@ -177,20 +180,14 @@ public static short getFlags(int packetHeader) {
     public static final short FLAG_RECOVERY_ADD = 0x0002;
 
     static class Request {
-
-        final byte protocolVersion;
-        final byte opCode;
-        final long ledgerId;
-        final long entryId;
-        final short flags;
-        final byte[] masterKey;
-
-        protected Request(byte protocolVersion, byte opCode, long ledgerId,
-                          long entryId, short flags) {
-            this(protocolVersion, opCode, ledgerId, entryId, flags, null);
-        }
-
-        protected Request(byte protocolVersion, byte opCode, long ledgerId,
+        byte protocolVersion;
+        byte opCode;
+        long ledgerId;
+        long entryId;
+        short flags;
+        byte[] masterKey;
+
+        protected void init(byte protocolVersion, byte opCode, long ledgerId,
                           long entryId, short flags, byte[] masterKey) {
             this.protocolVersion = protocolVersion;
             this.opCode = opCode;
@@ -233,15 +230,25 @@ boolean hasMasterKey() {
         public String toString() {
             return String.format("Op(%d)[Ledger:%d,Entry:%d]", opCode, ledgerId, entryId);
         }
+
+        public void recycle() {}
     }
 
     static class AddRequest extends Request {
-        final ByteBuf data;
-
-        public AddRequest(byte protocolVersion, long ledgerId, long entryId,
-                          short flags, byte[] masterKey, ByteBuf data) {
-            super(protocolVersion, ADDENTRY, ledgerId, entryId, flags, masterKey);
-            this.data = data.retain();
+        ByteBuf data;
+
+        static AddRequest create(byte protocolVersion, long ledgerId,
+                                 long entryId, short flags, byte[] masterKey,
+                                 ByteBuf data) {
+            AddRequest add = RECYCLER.get();
+            add.protocolVersion = protocolVersion;
+            add.opCode = ADDENTRY;
+            add.ledgerId = ledgerId;
+            add.entryId = entryId;
+            add.flags = flags;
+            add.masterKey = masterKey;
+            add.data = data.retain();
+            return add;
         }
 
         ByteBuf getData() {
@@ -255,16 +262,36 @@ boolean isRecoveryAdd() {
         void release() {
             data.release();
         }
+
+        private final Handle<AddRequest> recyclerHandle;
+        private AddRequest(Handle<AddRequest> recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private static final Recycler<AddRequest> RECYCLER = new Recycler<AddRequest>() {
+            protected AddRequest newObject(Handle<AddRequest> handle) {
+                return new AddRequest(handle);
+            }
+        };
+
+        @Override
+        public void recycle() {
+            ledgerId = -1;
+            entryId = -1;
+            masterKey = null;
+            data = null;
+            recyclerHandle.recycle(this);
+        }
     }
 
     static class ReadRequest extends Request {
         ReadRequest(byte protocolVersion, long ledgerId, long entryId, short flags) {
-            super(protocolVersion, READENTRY, ledgerId, entryId, flags);
+            init(protocolVersion, READENTRY, ledgerId, entryId, flags, null);
         }
 
         ReadRequest(byte protocolVersion, long ledgerId, long entryId,
                     short flags, byte[] masterKey) {
-            super(protocolVersion, READENTRY, ledgerId, entryId, flags, masterKey);
+            init(protocolVersion, READENTRY, ledgerId, entryId, flags, masterKey);
         }
 
         boolean isFencingRequest() {
@@ -276,7 +303,7 @@ boolean isFencingRequest() {
         final AuthMessage authMessage;
 
         AuthRequest(byte protocolVersion, AuthMessage authMessage) {
-            super(protocolVersion, AUTH, -1, -1, FLAG_NONE, null);
+            init(protocolVersion, AUTH, -1, -1, FLAG_NONE, null);
             this.authMessage = authMessage;
         }
 
@@ -285,14 +312,14 @@ AuthMessage getAuthMessage() {
         }
     }
 
-    static class Response {
-        final byte protocolVersion;
-        final byte opCode;
-        final int errorCode;
-        final long ledgerId;
-        final long entryId;
+    static abstract class Response {
+        byte protocolVersion;
+        byte opCode;
+        int errorCode;
+        long ledgerId;
+        long entryId;
 
-        protected Response(byte protocolVersion, byte opCode,
+        protected void init(byte protocolVersion, byte opCode,
                            int errorCode, long ledgerId, long entryId) {
             this.protocolVersion = protocolVersion;
             this.opCode = opCode;
@@ -326,18 +353,20 @@ public String toString() {
             return String.format("Op(%d)[Ledger:%d,Entry:%d,errorCode=%d]",
                                  opCode, ledgerId, entryId, errorCode);
         }
+
+        abstract void recycle();
     }
 
     static class ReadResponse extends Response {
         final ByteBuf data;
 
         ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) {
-            super(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
+            init(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
             this.data = Unpooled.EMPTY_BUFFER;
         }
 
         ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, ByteBuf data) {
-            super(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
+            init(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
             this.data = data;
         }
 
@@ -348,18 +377,41 @@ boolean hasData() {
         ByteBuf getData() {
             return data;
         }
+
+        void recycle() {
+        }
     }
 
     static class AddResponse extends Response {
-        AddResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) {
-            super(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId);
+        static AddResponse create(byte protocolVersion, int errorCode, long ledgerId, long entryId) {
+            AddResponse response = RECYCLER.get();
+            response.init(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId);
+            return response;
+        }
+
+        private final Handle<AddResponse> recyclerHandle;
+        private AddResponse(Handle<AddResponse> recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private static final Recycler<AddResponse> RECYCLER = new Recycler<AddResponse>() {
+            protected AddResponse newObject(Handle<AddResponse> handle) {
+                return new AddResponse(handle);
+            }
+        };
+
+        public void recycle() {
+            recyclerHandle.recycle(this);
         }
     }
-    
+
     static class ErrorResponse extends Response {
         ErrorResponse(byte protocolVersion, byte opCode, int errorCode,
                       long ledgerId, long entryId) {
-            super(protocolVersion, opCode, errorCode, ledgerId, entryId);
+            init(protocolVersion, opCode, errorCode, ledgerId, entryId);
+        }
+
+        void recycle() {
         }
     }
 
@@ -367,13 +419,16 @@ ByteBuf getData() {
         final AuthMessage authMessage;
 
         AuthResponse(byte protocolVersion, AuthMessage authMessage) {
-            super(protocolVersion, AUTH, EOK, -1, -1);
+            init(protocolVersion, AUTH, EOK, -1, -1);
             this.authMessage = authMessage;
         }
 
         AuthMessage getAuthMessage() {
             return authMessage;
         }
+
+        void recycle() {
+        }
     }
 
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index ffa030b56..0c8f25ad9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -2,6 +2,7 @@
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
+
  * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
@@ -532,11 +533,13 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf
         CompletionKey completionKey = null;
         if (useV2WireProtocol) {
             completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY);
-            request = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
+            request = BookieProtocol.AddRequest.create(
+                    BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
                     (short) options, masterKey, toSend);
         } else {
             final long txnId = getTxnId();
             completionKey = new V3CompletionKey(txnId, OperationType.ADD_ENTRY);
+
             // Build the request and calculate the total size to be included in the packet.
             BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
                     .setVersion(ProtocolVersion.VERSION_THREE)
@@ -854,6 +857,7 @@ private static String requestToString(Object request) {
             return request.toString();
         }
     }
+
     void errorOut(final CompletionKey key) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Removing completion key: {}", key);
@@ -1010,6 +1014,7 @@ private void readV2Response(final BookieProtocol.Response response) {
                     public void safeRun() {
                         completionValue.handleV2Response(ledgerId, entryId,
                                                          status, response);
+                        response.recycle();
                     }
                 });
         }
@@ -1734,7 +1739,6 @@ public void run(Timeout timeout) throws Exception {
         public void release() {}
     }
 
-
     /**
      * Note : Helper functions follow
      */
@@ -1814,7 +1818,8 @@ public boolean equals(Object object) {
             }
             V2CompletionKey that = (V2CompletionKey) object;
             return this.entryId == that.entryId
-                && this.ledgerId == that.ledgerId;
+                && this.ledgerId == that.ledgerId
+                && this.operationType == that.operationType;
         }
 
         @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
index c0be16247..342acd5df 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
@@ -25,7 +25,7 @@
 class ResponseBuilder {
     static BookieProtocol.Response buildErrorResponse(int errorCode, BookieProtocol.Request r) {
         if (r.getOpCode() == BookieProtocol.ADDENTRY) {
-            return new BookieProtocol.AddResponse(r.getProtocolVersion(), errorCode,
+            return BookieProtocol.AddResponse.create(r.getProtocolVersion(), errorCode,
                                                   r.getLedgerId(), r.getEntryId());
         } else {
             assert(r.getOpCode() == BookieProtocol.READENTRY);
@@ -35,7 +35,7 @@
     }
 
     static BookieProtocol.Response buildAddResponse(BookieProtocol.Request r) {
-        return new BookieProtocol.AddResponse(r.getProtocolVersion(), BookieProtocol.EOK, r.getLedgerId(),
+        return BookieProtocol.AddResponse.create(r.getProtocolVersion(), BookieProtocol.EOK, r.getLedgerId(),
                                               r.getEntryId());
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index c4b28406b..416a478e9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.bookkeeper.proto;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
@@ -70,11 +71,12 @@ protected void processPacket() {
 
         startTimeNanos = MathUtils.nowInNano();
         int rc = BookieProtocol.EOK;
+        ByteBuf addData = add.getData();
         try {
             if (add.isRecoveryAdd()) {
-                requestProcessor.bookie.recoveryAddEntry(add.getData(), this, channel, add.getMasterKey());
+                requestProcessor.bookie.recoveryAddEntry(addData, this, channel, add.getMasterKey());
             } else {
-                requestProcessor.bookie.addEntry(add.getData(), this, channel, add.getMasterKey());
+                requestProcessor.bookie.addEntry(addData, this, channel, add.getMasterKey());
             }
         } catch (IOException e) {
             LOG.error("Error writing " + add, e);
@@ -86,7 +88,7 @@ protected void processPacket() {
             LOG.error("Unauthorized access to ledger " + add.getLedgerId(), e);
             rc = BookieProtocol.EUA;
         } finally {
-            add.release();
+            addData.release();
         }
 
         if (rc != BookieProtocol.EOK) {
@@ -95,6 +97,7 @@ protected void processPacket() {
             sendResponse(rc,
                          ResponseBuilder.buildErrorResponse(rc, add),
                          requestProcessor.addRequestStats);
+            add.recycle();
         }
     }
 
@@ -111,6 +114,7 @@ public void writeComplete(int rc, long ledgerId, long entryId,
         sendResponse(rc,
                      ResponseBuilder.buildAddResponse(request),
                      requestProcessor.addRequestStats);
+        request.recycle();
         recycle();
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services