You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by iv...@apache.org on 2017/10/30 19:49:41 UTC
[bookkeeper] branch master updated: Recycle instances of callback
to obtain a PerChannelBookie client
This is an automated email from the ASF dual-hosted git repository.
ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 1c7d103 Recycle instances of callback to obtain a PerChannelBookie client
1c7d103 is described below
commit 1c7d103431c15335cc6d6eae0e9c2197b58c1ad3
Author: Matteo Merli <mm...@yahoo-inc.com>
AuthorDate: Mon Oct 30 20:49:34 2017 +0100
Recycle instances of callback to obtain a PerChannelBookie client
This change was originally commit e2e77863 in the yahoo-4.3 branch.
Author: Matteo Merli <mm...@yahoo-inc.com>
Reviewers: Sijie Guo <si...@apache.org>
This closes #679 from ivankelly/yahoo-bp-3
---
.../org/apache/bookkeeper/proto/BookieClient.java | 86 ++++++++++++++++++----
1 file changed, 72 insertions(+), 14 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index c604b16..ae7f806 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -60,6 +60,8 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
@@ -253,26 +255,19 @@ public class BookieClient implements PerChannelBookieClientFactory {
return;
}
- // Retain the buffer, since the connection could be obtained after the PendingApp might have already
- // failed
+ // Retain the buffer, since the connection could be obtained after
+ // the PendingApp might have already failed
toSend.retain();
- client.obtain(new GenericCallback<PerChannelBookieClient>() {
- @Override
- public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
- if (rc != BKException.Code.OK) {
- completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
- } else {
- pcbc.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options);
- }
- toSend.release();
- }
- }, ledgerId);
+ client.obtain(ChannelReadyForAddEntryCallback.create(
+ this, toSend, ledgerId, entryId, addr,
+ ctx, cb, options, masterKey),
+ ledgerId);
} finally {
closeLock.readLock().unlock();
}
}
-
+
private void completeRead(final int rc,
final long ledgerId,
final long entryId,
@@ -292,6 +287,69 @@ public class BookieClient implements PerChannelBookieClientFactory {
}
}
+ private static class ChannelReadyForAddEntryCallback
+ implements GenericCallback<PerChannelBookieClient> {
+ private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
+
+ private BookieClient bookieClient;
+ private ByteBuf toSend;
+ private long ledgerId;
+ private long entryId;
+ private BookieSocketAddress addr;
+ private Object ctx;
+ private WriteCallback cb;
+ private int options;
+ private byte[] masterKey;
+
+ static ChannelReadyForAddEntryCallback create(
+ BookieClient bookieClient, ByteBuf toSend, long ledgerId,
+ long entryId, BookieSocketAddress addr, Object ctx,
+ WriteCallback cb, int options, byte[] masterKey) {
+ ChannelReadyForAddEntryCallback callback = RECYCLER.get();
+ callback.bookieClient = bookieClient;
+ callback.toSend = toSend;
+ callback.ledgerId = ledgerId;
+ callback.entryId = entryId;
+ callback.addr = addr;
+ callback.ctx = ctx;
+ callback.cb = cb;
+ callback.options = options;
+ callback.masterKey = masterKey;
+ return callback;
+ }
+
+ @Override
+ public void operationComplete(final int rc,
+ PerChannelBookieClient pcbc) {
+ if (rc != BKException.Code.OK) {
+ bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
+ } else {
+ pcbc.addEntry(ledgerId, masterKey, entryId,
+ toSend, cb, ctx, options);
+ }
+
+ toSend.release();
+ recycle();
+ }
+
+ private ChannelReadyForAddEntryCallback(
+ Handle<ChannelReadyForAddEntryCallback> recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler<ChannelReadyForAddEntryCallback> RECYCLER
+ = new Recycler<ChannelReadyForAddEntryCallback>() {
+ protected ChannelReadyForAddEntryCallback newObject(
+ Recycler.Handle<ChannelReadyForAddEntryCallback> recyclerHandle) {
+ return new ChannelReadyForAddEntryCallback(recyclerHandle);
+ }
+ };
+
+ public void recycle() {
+ recyclerHandle.recycle(this);
+ }
+ }
+
public void readEntryAndFenceLedger(final BookieSocketAddress addr,
final long ledgerId,
final byte[] masterKey,
--
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].