You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by iv...@apache.org on 2018/03/02 09:08:54 UTC
[bookkeeper] branch master updated: Refactored ReadResponse ref
count handling
This is an automated email from the ASF dual-hosted git repository.
ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 8d048ab Refactored ReadResponse ref count handling
8d048ab is described below
commit 8d048abce486c63d428041f77ee9a506756f4d1e
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Fri Mar 2 10:08:48 2018 +0100
Refactored ReadResponse ref count handling
This contains a number of changes.
- V2 bookie protocol
- Add retain and release methods to all responses. For read response
it handles the data buffer.
- ReadResponses always have a buffer now, even if empty.
- Server side
- In the v2 read handler, releasing of the buffer in the case of
error is left to the very end.
- Client side
- Per channel bookie clients own the buffer for read responses.
If a ReadCallback want it to live past the lifetime of the call
it must call retain.
This change was originally e8643140 in the yahoo-4.3 branch.
Author: Ivan Kelly <iv...@apache.org>
Reviewers: Matteo Merli <mm...@apache.org>, Sijie Guo <si...@apache.org>
This closes #1221 from ivankelly/yahoo-bp-11
---
.../apache/bookkeeper/client/PendingReadOp.java | 5 ++--
.../client/ReadLastConfirmedAndEntryOp.java | 4 +++-
.../bookkeeper/client/ReadLastConfirmedOp.java | 3 ---
.../bookkeeper/proto/BookieProtoEncoding.java | 8 ++-----
.../apache/bookkeeper/proto/BookieProtocol.java | 28 ++++++++++++++--------
.../bookkeeper/proto/PerChannelBookieClient.java | 15 ++++--------
.../bookkeeper/proto/ReadEntryProcessor.java | 9 -------
7 files changed, 31 insertions(+), 41 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 7f2860b..d596eee 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -139,7 +139,6 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
} catch (BKDigestMatchException e) {
readOpDmCounter.inc();
logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException);
- buffer.release();
return false;
}
@@ -154,7 +153,6 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
writeSet.recycle();
return true;
} else {
- buffer.release();
return false;
}
}
@@ -589,12 +587,15 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
heardFromHosts.add(rctx.to);
heardFromHostsBitSet.set(rctx.bookieIndex, true);
+ buffer.retain();
if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) {
if (!isRecoveryRead) {
// do not advance LastAddConfirmed for recovery reads
lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L);
}
submitCallback(BKException.Code.OK);
+ } else {
+ buffer.release();
}
if (numPendingEntries < 0) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index a8bc84f..5327639 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -121,7 +121,6 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
content = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
} catch (BKException.BKDigestMatchException e) {
logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException);
- buffer.release();
return false;
}
@@ -555,6 +554,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
hasValidResponse = true;
if (entryId != BookieProtocol.LAST_ADD_CONFIRMED) {
+ buffer.retain();
if (request.complete(rCtx.getBookieIndex(), bookie, buffer, entryId)) {
// callback immediately
if (rCtx.getLacUpdateTimestamp().isPresent()) {
@@ -568,6 +568,8 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
submitCallback(BKException.Code.OK);
requestComplete.set(true);
heardFromHostsBitSet.set(rCtx.getBookieIndex(), true);
+ } else {
+ buffer.release();
}
} else {
emptyResponsesFromHostsBitSet.set(rCtx.getBookieIndex(), true);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
index 32d7ffe..2cb6152 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
@@ -18,7 +18,6 @@
package org.apache.bookkeeper.client;
import io.netty.buffer.ByteBuf;
-import io.netty.util.ReferenceCountUtil;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
import org.apache.bookkeeper.proto.BookieProtocol;
@@ -101,8 +100,6 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
}
}
- ReferenceCountUtil.release(buffer);
-
if (rc == BKException.Code.NoSuchLedgerExistsException || rc == BKException.Code.NoSuchEntryException) {
// this still counts as a valid response, e.g., if the client crashed without writing any entry
heardValidResponse = true;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 4594ef1..a5b2141 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -292,12 +292,8 @@ public class BookieProtoEncoding {
ledgerId = buffer.readLong();
entryId = buffer.readLong();
- if (rc == BookieProtocol.EOK) {
- return new BookieProtocol.ReadResponse(version, rc,
- ledgerId, entryId, buffer.retainedSlice());
- } else {
- return new BookieProtocol.ReadResponse(version, rc, ledgerId, entryId);
- }
+ return new BookieProtocol.ReadResponse(
+ version, rc, ledgerId, entryId, buffer.retainedSlice());
case BookieProtocol.AUTH:
ByteBufInputStream bufStream = new ByteBufInputStream(buffer);
BookkeeperProtocol.AuthMessage.Builder builder = BookkeeperProtocol.AuthMessage.newBuilder();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 9ae6316..86c93e1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -428,7 +428,14 @@ public interface BookieProtocol {
opCode, ledgerId, entryId, errorCode);
}
- abstract void recycle();
+ void retain() {
+ }
+
+ void release() {
+ }
+
+ void recycle() {
+ }
}
/**
@@ -438,8 +445,7 @@ public interface BookieProtocol {
final ByteBuf data;
ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) {
- init(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
- this.data = Unpooled.EMPTY_BUFFER;
+ this(protocolVersion, errorCode, ledgerId, entryId, Unpooled.EMPTY_BUFFER);
}
ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, ByteBuf data) {
@@ -455,7 +461,14 @@ public interface BookieProtocol {
return data;
}
- void recycle() {
+ @Override
+ public void retain() {
+ data.retain();
+ }
+
+ @Override
+ public void release() {
+ data.release();
}
}
@@ -480,6 +493,7 @@ public interface BookieProtocol {
}
};
+ @Override
public void recycle() {
recyclerHandle.recycle(this);
}
@@ -493,9 +507,6 @@ public interface BookieProtocol {
long ledgerId, long entryId) {
init(protocolVersion, opCode, errorCode, ledgerId, entryId);
}
-
- void recycle() {
- }
}
/**
@@ -512,9 +523,6 @@ public interface BookieProtocol {
AuthMessage getAuthMessage() {
return authMessage;
}
-
- void recycle() {
- }
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 7735b98..5adc7cf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -1105,6 +1105,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
LOG.debug("Unexpected response received from bookie : " + addr + " for type : " + operationType
+ " and ledger:entry : " + response.ledgerId + ":" + response.entryId);
}
+ response.release();
} else {
long orderingKey = completionValue.ledgerId;
executor.submitOrdered(orderingKey,
@@ -1582,11 +1583,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
return;
}
BookieProtocol.ReadResponse readResponse = (BookieProtocol.ReadResponse) response;
- ByteBuf data = null;
- if (readResponse.hasData()) {
- data = readResponse.getData();
- }
- handleReadResponse(ledgerId, entryId, status, data,
+ handleReadResponse(ledgerId, entryId, status, readResponse.getData(),
INVALID_ENTRY_ID, -1L);
}
@@ -1611,6 +1608,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
handleReadResponse(readResponse.getLedgerId(),
readResponse.getEntryId(),
status, buffer, maxLAC, lacUpdateTimestamp);
+ buffer.release(); // meaningless using unpooled, but client may expect to hold the last reference
}
private void handleReadResponse(long ledgerId,
@@ -1619,23 +1617,20 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
ByteBuf buffer,
long maxLAC, // max known lac piggy-back from bookies
long lacUpdateTimestamp) { // the timestamp when the lac is updated.
- int readableBytes = buffer == null ? 0 : buffer.readableBytes();
+ int readableBytes = buffer.readableBytes();
int rc = logAndConvertStatus(status,
BKException.Code.ReadException,
"ledger", ledgerId,
"entry", entryId,
"entryLength", readableBytes);
- if (buffer != null) {
- buffer = buffer.slice();
- }
if (maxLAC > INVALID_ENTRY_ID && (ctx instanceof ReadEntryCallbackCtx)) {
((ReadEntryCallbackCtx) ctx).setLastAddConfirmed(maxLAC);
}
if (lacUpdateTimestamp > -1L && (ctx instanceof ReadLastConfirmedAndEntryContext)) {
((ReadLastConfirmedAndEntryContext) ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
}
- cb.readEntryComplete(rc, ledgerId, entryId, buffer, ctx);
+ cb.readEntryComplete(rc, ledgerId, entryId, buffer.slice(), ctx);
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index 2d4c540..edeb8a6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -84,8 +84,6 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
if (null == fenced || !fenced) {
// if failed to fence, fail the read request to make it retry.
errorCode = BookieProtocol.EIO;
- data.release();
- data = null;
} else {
errorCode = BookieProtocol.EOK;
}
@@ -93,18 +91,12 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
Thread.currentThread().interrupt();
LOG.error("Interrupting fence read entry {}", request, ie);
errorCode = BookieProtocol.EIO;
- data.release();
- data = null;
} catch (ExecutionException ee) {
LOG.error("Failed to fence read entry {}", request, ee);
errorCode = BookieProtocol.EIO;
- data.release();
- data = null;
} catch (TimeoutException te) {
LOG.error("Timeout to fence read entry {}", request, te);
errorCode = BookieProtocol.EIO;
- data.release();
- data = null;
}
} else {
errorCode = BookieProtocol.EOK;
@@ -141,7 +133,6 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
TimeUnit.NANOSECONDS);
sendResponse(errorCode, ResponseBuilder.buildReadResponse(data, request),
requestProcessor.readRequestStats);
-
} else {
ReferenceCountUtil.release(data);
--
To stop receiving notification emails like this one, please contact
ivank@apache.org.