You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by iv...@apache.org on 2018/03/02 09:08:54 UTC

[bookkeeper] branch master updated: Refactored ReadResponse ref count handling

This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d048ab  Refactored ReadResponse ref count handling
8d048ab is described below

commit 8d048abce486c63d428041f77ee9a506756f4d1e
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Fri Mar 2 10:08:48 2018 +0100

    Refactored ReadResponse ref count handling
    
    This contains a number of changes.
    
    - V2 bookie protocol
      - Add retain and release methods to all responses. For read response
        it handles the data buffer.
      - ReadResponses always have a buffer now, even if empty.
    
    - Server side
      - In the v2 read handler, releasing of the buffer in the case of
        error is left to the very end.
    
    - Client side
      - Per channel bookie clients own the buffer for read responses.
        If a ReadCallback want it to live past the lifetime of the call
        it must call retain.
    
    This change was originally e8643140 in the yahoo-4.3 branch.
    
    Author: Ivan Kelly <iv...@apache.org>
    
    Reviewers: Matteo Merli <mm...@apache.org>, Sijie Guo <si...@apache.org>
    
    This closes #1221 from ivankelly/yahoo-bp-11
---
 .../apache/bookkeeper/client/PendingReadOp.java    |  5 ++--
 .../client/ReadLastConfirmedAndEntryOp.java        |  4 +++-
 .../bookkeeper/client/ReadLastConfirmedOp.java     |  3 ---
 .../bookkeeper/proto/BookieProtoEncoding.java      |  8 ++-----
 .../apache/bookkeeper/proto/BookieProtocol.java    | 28 ++++++++++++++--------
 .../bookkeeper/proto/PerChannelBookieClient.java   | 15 ++++--------
 .../bookkeeper/proto/ReadEntryProcessor.java       |  9 -------
 7 files changed, 31 insertions(+), 41 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 7f2860b..d596eee 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -139,7 +139,6 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
             } catch (BKDigestMatchException e) {
                 readOpDmCounter.inc();
                 logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException);
-                buffer.release();
                 return false;
             }
 
@@ -154,7 +153,6 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
                 writeSet.recycle();
                 return true;
             } else {
-                buffer.release();
                 return false;
             }
         }
@@ -589,12 +587,15 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
         heardFromHosts.add(rctx.to);
         heardFromHostsBitSet.set(rctx.bookieIndex, true);
 
+        buffer.retain();
         if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) {
             if (!isRecoveryRead) {
                 // do not advance LastAddConfirmed for recovery reads
                 lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L);
             }
             submitCallback(BKException.Code.OK);
+        } else {
+            buffer.release();
         }
 
         if (numPendingEntries < 0) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index a8bc84f..5327639 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -121,7 +121,6 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
                 content = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
             } catch (BKException.BKDigestMatchException e) {
                 logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException);
-                buffer.release();
                 return false;
             }
 
@@ -555,6 +554,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
             hasValidResponse = true;
 
             if (entryId != BookieProtocol.LAST_ADD_CONFIRMED) {
+                buffer.retain();
                 if (request.complete(rCtx.getBookieIndex(), bookie, buffer, entryId)) {
                     // callback immediately
                     if (rCtx.getLacUpdateTimestamp().isPresent()) {
@@ -568,6 +568,8 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
                     submitCallback(BKException.Code.OK);
                     requestComplete.set(true);
                     heardFromHostsBitSet.set(rCtx.getBookieIndex(), true);
+                } else {
+                    buffer.release();
                 }
             } else {
                 emptyResponsesFromHostsBitSet.set(rCtx.getBookieIndex(), true);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
index 32d7ffe..2cb6152 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
@@ -18,7 +18,6 @@
 package org.apache.bookkeeper.client;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.util.ReferenceCountUtil;
 
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
 import org.apache.bookkeeper.proto.BookieProtocol;
@@ -101,8 +100,6 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
             }
         }
 
-        ReferenceCountUtil.release(buffer);
-
         if (rc == BKException.Code.NoSuchLedgerExistsException || rc == BKException.Code.NoSuchEntryException) {
             // this still counts as a valid response, e.g., if the client crashed without writing any entry
             heardValidResponse = true;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 4594ef1..a5b2141 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -292,12 +292,8 @@ public class BookieProtoEncoding {
                 ledgerId = buffer.readLong();
                 entryId = buffer.readLong();
 
-                if (rc == BookieProtocol.EOK) {
-                    return new BookieProtocol.ReadResponse(version, rc,
-                                                           ledgerId, entryId, buffer.retainedSlice());
-                } else {
-                    return new BookieProtocol.ReadResponse(version, rc, ledgerId, entryId);
-                }
+                return new BookieProtocol.ReadResponse(
+                        version, rc, ledgerId, entryId, buffer.retainedSlice());
             case BookieProtocol.AUTH:
                 ByteBufInputStream bufStream = new ByteBufInputStream(buffer);
                 BookkeeperProtocol.AuthMessage.Builder builder = BookkeeperProtocol.AuthMessage.newBuilder();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 9ae6316..86c93e1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -428,7 +428,14 @@ public interface BookieProtocol {
                                  opCode, ledgerId, entryId, errorCode);
         }
 
-        abstract void recycle();
+        void retain() {
+        }
+
+        void release() {
+        }
+
+        void recycle() {
+        }
     }
 
     /**
@@ -438,8 +445,7 @@ public interface BookieProtocol {
         final ByteBuf data;
 
         ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) {
-            init(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
-            this.data = Unpooled.EMPTY_BUFFER;
+            this(protocolVersion, errorCode, ledgerId, entryId, Unpooled.EMPTY_BUFFER);
         }
 
         ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, ByteBuf data) {
@@ -455,7 +461,14 @@ public interface BookieProtocol {
             return data;
         }
 
-        void recycle() {
+        @Override
+        public void retain() {
+            data.retain();
+        }
+
+        @Override
+        public void release() {
+            data.release();
         }
     }
 
@@ -480,6 +493,7 @@ public interface BookieProtocol {
             }
         };
 
+        @Override
         public void recycle() {
             recyclerHandle.recycle(this);
         }
@@ -493,9 +507,6 @@ public interface BookieProtocol {
                       long ledgerId, long entryId) {
             init(protocolVersion, opCode, errorCode, ledgerId, entryId);
         }
-
-        void recycle() {
-        }
     }
 
     /**
@@ -512,9 +523,6 @@ public interface BookieProtocol {
         AuthMessage getAuthMessage() {
             return authMessage;
         }
-
-        void recycle() {
-        }
     }
 
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 7735b98..5adc7cf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -1105,6 +1105,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 LOG.debug("Unexpected response received from bookie : " + addr + " for type : " + operationType
                         + " and ledger:entry : " + response.ledgerId + ":" + response.entryId);
             }
+            response.release();
         } else {
             long orderingKey = completionValue.ledgerId;
             executor.submitOrdered(orderingKey,
@@ -1582,11 +1583,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 return;
             }
             BookieProtocol.ReadResponse readResponse = (BookieProtocol.ReadResponse) response;
-            ByteBuf data = null;
-            if (readResponse.hasData()) {
-                data = readResponse.getData();
-            }
-            handleReadResponse(ledgerId, entryId, status, data,
+            handleReadResponse(ledgerId, entryId, status, readResponse.getData(),
                                INVALID_ENTRY_ID, -1L);
         }
 
@@ -1611,6 +1608,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             handleReadResponse(readResponse.getLedgerId(),
                                readResponse.getEntryId(),
                                status, buffer, maxLAC, lacUpdateTimestamp);
+            buffer.release(); // meaningless using unpooled, but client may expect to hold the last reference
         }
 
         private void handleReadResponse(long ledgerId,
@@ -1619,23 +1617,20 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                                         ByteBuf buffer,
                                         long maxLAC, // max known lac piggy-back from bookies
                                         long lacUpdateTimestamp) { // the timestamp when the lac is updated.
-            int readableBytes = buffer == null ? 0 : buffer.readableBytes();
+            int readableBytes = buffer.readableBytes();
             int rc = logAndConvertStatus(status,
                                          BKException.Code.ReadException,
                                          "ledger", ledgerId,
                                          "entry", entryId,
                                          "entryLength", readableBytes);
 
-            if (buffer != null) {
-                buffer = buffer.slice();
-            }
             if (maxLAC > INVALID_ENTRY_ID && (ctx instanceof ReadEntryCallbackCtx)) {
                 ((ReadEntryCallbackCtx) ctx).setLastAddConfirmed(maxLAC);
             }
             if (lacUpdateTimestamp > -1L && (ctx instanceof ReadLastConfirmedAndEntryContext)) {
                 ((ReadLastConfirmedAndEntryContext) ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
             }
-            cb.readEntryComplete(rc, ledgerId, entryId, buffer, ctx);
+            cb.readEntryComplete(rc, ledgerId, entryId, buffer.slice(), ctx);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index 2d4c540..edeb8a6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -84,8 +84,6 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
                     if (null == fenced || !fenced) {
                         // if failed to fence, fail the read request to make it retry.
                         errorCode = BookieProtocol.EIO;
-                        data.release();
-                        data = null;
                     } else {
                         errorCode = BookieProtocol.EOK;
                     }
@@ -93,18 +91,12 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
                     Thread.currentThread().interrupt();
                     LOG.error("Interrupting fence read entry {}", request, ie);
                     errorCode = BookieProtocol.EIO;
-                    data.release();
-                    data = null;
                 } catch (ExecutionException ee) {
                     LOG.error("Failed to fence read entry {}", request, ee);
                     errorCode = BookieProtocol.EIO;
-                    data.release();
-                    data = null;
                 } catch (TimeoutException te) {
                     LOG.error("Timeout to fence read entry {}", request, te);
                     errorCode = BookieProtocol.EIO;
-                    data.release();
-                    data = null;
                 }
             } else {
                 errorCode = BookieProtocol.EOK;
@@ -141,7 +133,6 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
                     TimeUnit.NANOSECONDS);
             sendResponse(errorCode, ResponseBuilder.buildReadResponse(data, request),
                          requestProcessor.readRequestStats);
-
         } else {
             ReferenceCountUtil.release(data);
 

-- 
To stop receiving notification emails like this one, please contact
ivank@apache.org.