You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2017/10/24 10:59:00 UTC
[bookkeeper] branch master updated: Pool AddCompletions
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 4968438 Pool AddCompletions
4968438 is described below
commit 496843893dfb91c237cbd5ae1f245f90304ea34b
Author: Ivan Kelly <iv...@midokura.com>
AuthorDate: Tue Oct 24 12:58:43 2017 +0200
Pool AddCompletions
To avoid generating garbage in the fastpath.
Author: Ivan Kelly <iv...@midokura.com>
Reviewers: Enrico Olivelli <eo...@apache.org>
This closes #649 from ivankelly/pool-completions
---
.../bookkeeper/proto/PerChannelBookieClient.java | 92 ++++++++++++++--------
1 file changed, 60 insertions(+), 32 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index c544701..ffa030b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -562,8 +562,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
completionObjects.put(completionKey,
- new AddCompletion(completionKey,
- cb, ctx, ledgerId, entryId));
+ acquireAddCompletion(completionKey,
+ cb, ctx, ledgerId, entryId));
final Channel c = channel;
if (c == null) {
// usually checked in writeAndFlush, but we have extra check
@@ -1162,14 +1162,14 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
// visible for testing
abstract class CompletionValue {
- final Object ctx;
- protected final long ledgerId;
- protected final long entryId;
- private final long startTime;
private final OpStatsLogger opLogger;
private final OpStatsLogger timeoutOpLogger;
- protected final Timeout timeout;
private final String operationName;
+ protected Object ctx;
+ protected long ledgerId;
+ protected long entryId;
+ protected long startTime;
+ protected Timeout timeout;
public CompletionValue(String operationName,
Object ctx,
@@ -1192,8 +1192,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
void cancelTimeoutAndLogOp(int rc) {
- if (null != timeout) {
- timeout.cancel();
+ Timeout t = timeout;
+ if (null != t) {
+ t.cancel();
}
if (rc != BKException.Code.OK) {
@@ -1580,28 +1581,55 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
}
+ private final Recycler<AddCompletion> ADD_COMPLETION_RECYCLER = new Recycler<AddCompletion>() {
+ protected AddCompletion newObject(Recycler.Handle<AddCompletion> handle) {
+ return new AddCompletion(handle);
+ }
+ };
+
+ AddCompletion acquireAddCompletion(final CompletionKey key,
+ final WriteCallback originalCallback,
+ final Object originalCtx,
+ final long ledgerId, final long entryId) {
+ AddCompletion completion = ADD_COMPLETION_RECYCLER.get();
+ completion.reset(key, originalCallback, originalCtx, ledgerId, entryId);
+ return completion;
+ }
+
// visible for testing
- class AddCompletion extends CompletionValue {
- final WriteCallback cb;
-
- public AddCompletion(final CompletionKey key,
- final WriteCallback originalCallback,
- final Object originalCtx,
- final long ledgerId, final long entryId) {
- super("Add", originalCtx, ledgerId, entryId,
- addEntryOpLogger, addTimeoutOpLogger,
- scheduleTimeout(key, addEntryTimeout));
- this.cb = new WriteCallback() {
- @Override
- public void writeComplete(int rc, long ledgerId, long entryId,
- BookieSocketAddress addr,
- Object ctx) {
- cancelTimeoutAndLogOp(rc);
- originalCallback.writeComplete(rc, ledgerId, entryId,
- addr, originalCtx);
- key.release();
- }
- };
+ class AddCompletion extends CompletionValue implements WriteCallback {
+ final Recycler.Handle<AddCompletion> handle;
+
+ CompletionKey key = null;
+ WriteCallback originalCallback = null;
+
+ AddCompletion(Recycler.Handle<AddCompletion> handle) {
+ super("Add", null, -1, -1,
+ addEntryOpLogger, addTimeoutOpLogger, null);
+ this.handle = handle;
+ }
+
+ void reset(final CompletionKey key,
+ final WriteCallback originalCallback,
+ final Object originalCtx,
+ final long ledgerId, final long entryId) {
+ this.key = key;
+ this.originalCallback = originalCallback;
+ this.ctx = originalCtx;
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
+ this.startTime = MathUtils.nowInNano();
+ this.timeout = scheduleTimeout(key, addEntryTimeout);
+ }
+
+ @Override
+ public void writeComplete(int rc, long ledgerId, long entryId,
+ BookieSocketAddress addr,
+ Object ctx) {
+ cancelTimeoutAndLogOp(rc);
+ originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx);
+ key.release();
+ handle.recycle(this);
}
@Override
@@ -1612,7 +1640,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
@Override
public void errorOut(final int rc) {
errorOutAndRunCallback(
- () -> cb.writeComplete(rc, ledgerId, entryId, addr, ctx));
+ () -> writeComplete(rc, ledgerId, entryId, addr, ctx));
}
@Override
@@ -1638,7 +1666,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
BKException.Code.WriteException,
"ledger", ledgerId,
"entry", entryId);
- cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+ writeComplete(rc, ledgerId, entryId, addr, ctx);
}
}
--
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].