You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2017/10/10 09:28:54 UTC
[bookkeeper] branch master updated: ISSUE #525: Refactor
PerChannelBookieClient
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 79f359f ISSUE #525: Refactor PerChannelBookieClient
79f359f is described below
commit 79f359ff9b77cfce23d52d7a99715768497ee00f
Author: Ivan Kelly <iv...@ivankelly.net>
AuthorDate: Tue Oct 10 11:28:39 2017 +0200
ISSUE #525: Refactor PerChannelBookieClient
There's a lot of duplicate code in PerChannelBookieClient, particularly in the completions. It makes it more difficult to add anything to the class.
Concretely the patch:
- Factors out common code from PCBC completions
- Makes erroring out a member of completion classes
- Refactors out writing and flushing messages, so all RPCs use same code path
- Moves timeout handling into CompletionValue
- Moves response handling into completions
- Moves logging and status conversion into a common method
Author: Ivan Kelly <iv...@ivankelly.net>
Reviewers: Enrico Olivelli <eo...@apache.org>, Sijie Guo <si...@apache.org>
This closes #526 from ivankelly/refactor-pcbc, closes #525
---
.../bookkeeper/client/BookKeeperClientStats.java | 1 +
.../bookkeeper/proto/PerChannelBookieClient.java | 1304 ++++++++------------
2 files changed, 514 insertions(+), 791 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index a0389c1..091b920 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -63,4 +63,5 @@ public interface BookKeeperClientStats {
public final static String CHANNEL_TIMEOUT_READ_LAC = "TIMEOUT_READ_LAC";
public final static String TIMEOUT_GET_BOOKIE_INFO = "TIMEOUT_GET_BOOKIE_INFO";
public final static String CHANNEL_START_TLS_OP = "START_TLS";
+ public final static String CHANNEL_TIMEOUT_START_TLS_OP = "TIMEOUT_START_TLS";
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 20f601c..dcc58f2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.proto;
import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
import com.google.common.collect.Sets;
+import com.google.common.base.Joiner;
import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistry;
import io.netty.bootstrap.Bootstrap;
@@ -170,6 +171,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
private final OpStatsLogger getBookieInfoOpLogger;
private final OpStatsLogger getBookieInfoTimeoutOpLogger;
private final OpStatsLogger startTLSOpLogger;
+ private final OpStatsLogger startTLSTimeoutOpLogger;
private final boolean useV2WireProtocol;
@@ -269,6 +271,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
readLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ_LAC);
getBookieInfoTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO);
startTLSOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_START_TLS_OP);
+ startTLSTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_START_TLS_OP);
this.pcbcPool = pcbcPool;
@@ -477,10 +480,12 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteBuf toSend, WriteLacCallback cb,
Object ctx) {
final long txnId = getTxnId();
- final CompletionKey completionKey = new V3CompletionKey(txnId, OperationType.WRITE_LAC);
+ final CompletionKey completionKey = new V3CompletionKey(txnId,
+ OperationType.WRITE_LAC);
// writeLac is mostly like addEntry hence uses addEntryTimeout
completionObjects.put(completionKey,
- new WriteLacCompletion(writeLacOpLogger, cb, ctx, lac, scheduleTimeout(completionKey, addEntryTimeout)));
+ new WriteLacCompletion(completionKey, cb,
+ ctx, lac));
// Build the request
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
@@ -497,35 +502,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
.setHeader(headerBuilder)
.setWriteLacRequest(writeLacBuilder)
.build();
-
- final Channel c = channel;
- if (c == null) {
- errorOutWriteLacKey(completionKey);
- return;
- }
- try {
- ChannelFuture future = c.writeAndFlush(writeLacRequest);
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully wrote request for writeLac LedgerId: {} bookie: {}",
- ledgerId, c.remoteAddress());
- }
- } else {
- if (!(future.cause() instanceof ClosedChannelException)) {
- LOG.warn("Writing Lac(lid={} to channel {} failed : ",
- new Object[] { ledgerId, c, future.cause() });
- }
- errorOutWriteLacKey(completionKey);
- }
- }
- });
- } catch (Throwable e) {
- LOG.warn("writeLac operation failed", e);
- errorOutWriteLacKey(completionKey);
- }
+ writeAndFlush(channel, completionKey, writeLacRequest);
}
/**
@@ -550,14 +527,14 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf toSend, WriteCallback cb,
Object ctx, final int options) {
Object request = null;
- CompletionKey completion = null;
+ CompletionKey completionKey = null;
if (useV2WireProtocol) {
- completion = new V2CompletionKey(ledgerId, entryId, OperationType.ADD_ENTRY);
+ completionKey = new V2CompletionKey(ledgerId, entryId, OperationType.ADD_ENTRY);
request = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
(short) options, masterKey, toSend);
} else {
final long txnId = getTxnId();
- completion = new V3CompletionKey(txnId, OperationType.ADD_ENTRY);
+ completionKey = new V3CompletionKey(txnId, OperationType.ADD_ENTRY);
// Build the request and calculate the total size to be included in the packet.
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
.setVersion(ProtocolVersion.VERSION_THREE)
@@ -582,43 +559,18 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
.build();
}
- final Object addRequest = request;
- final CompletionKey completionKey = completion;
-
- completionObjects.put(completionKey, new AddCompletion(this,
- addEntryOpLogger, cb, ctx, ledgerId, entryId, scheduleTimeout(completion, addEntryTimeout)));
-
- final int entrySize = toSend.readableBytes();
-
+ completionObjects.put(completionKey,
+ new AddCompletion(completionKey,
+ cb, ctx, ledgerId, entryId));
final Channel c = channel;
if (c == null) {
- errorOutAddKey(completionKey);
+ // usually checked in writeAndFlush, but we have extra check
+ // because we need to release toSend.
+ errorOut(completionKey);
toSend.release();
return;
- }
- try {
- ChannelFuture future = c.writeAndFlush(addRequest);
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully wrote request for adding entry: " + entryId + " ledger-id: " + ledgerId
- + " bookie: " + c.remoteAddress() + " entry length: " + entrySize);
- }
- // totalBytesOutstanding.addAndGet(entrySize);
- } else {
- if (!(future.cause() instanceof ClosedChannelException)) {
- LOG.warn("Writing addEntry(lid={}, eid={}) to channel {} failed : ",
- new Object[] { ledgerId, entryId, c, future.cause() });
- }
- errorOutAddKey(completionKey);
- }
- }
- });
- } catch (Throwable e) {
- LOG.warn("Add entry operation failed", e);
- errorOutAddKey(completionKey);
+ } else {
+ writeAndFlush(c, completionKey, request);
}
}
@@ -626,14 +578,14 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
final long entryId,
ReadEntryCallback cb, Object ctx) {
Object request = null;
- CompletionKey completion = null;
+ CompletionKey completionKey = null;
if (useV2WireProtocol) {
- completion = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
+ completionKey = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
BookieProtocol.FLAG_DO_FENCING, masterKey);
} else {
final long txnId = getTxnId();
- completion = new V3CompletionKey(txnId, OperationType.READ_ENTRY);
+ completionKey = new V3CompletionKey(txnId, OperationType.READ_ENTRY);
// Build the request and calculate the total size to be included in the packet.
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
@@ -653,56 +605,28 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
.build();
}
- final CompletionKey completionKey = completion;
- if (completionObjects.putIfAbsent(completionKey, new ReadCompletion(this, readEntryOpLogger, cb,
- ctx, ledgerId, entryId, scheduleTimeout(completionKey, readEntryTimeout))) != null) {
+ if (completionObjects.putIfAbsent(
+ completionKey, new ReadCompletion(completionKey,
+ cb, ctx,
+ ledgerId, entryId)) != null) {
// We cannot have more than 1 pending read on the same ledger/entry in the v2 protocol
cb.readEntryComplete(BKException.Code.BookieHandleNotAvailableException, ledgerId, entryId, null, ctx);
return;
}
- final Channel c = channel;
- if (c == null) {
- errorOutReadKey(completionKey);
- return;
- }
-
- final Object readRequest = request;
- try {
- ChannelFuture future = c.writeAndFlush(readRequest);
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully wrote request {} to {}",
- readRequest, c.remoteAddress());
- }
- } else {
- if (!(future.cause() instanceof ClosedChannelException)) {
- LOG.warn("Writing readEntryAndFenceLedger(lid={}, eid={}) to channel {} failed : ",
- new Object[] { ledgerId, entryId, c, future.cause() });
- }
- errorOutReadKey(completionKey);
- }
- }
- });
- } catch(Throwable e) {
- LOG.warn("Read entry operation {} failed", completionKey, e);
- errorOutReadKey(completionKey);
- }
+ writeAndFlush(channel, completionKey, request);
}
public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
Object request = null;
- CompletionKey completion = null;
+ CompletionKey completionKey = null;
if (useV2WireProtocol) {
request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
ledgerId, 0, (short) 0);
- completion = new V2CompletionKey(ledgerId, 0, OperationType.READ_LAC);
+ completionKey = new V2CompletionKey(ledgerId, 0, OperationType.READ_LAC);
} else {
final long txnId = getTxnId();
- completion = new V3CompletionKey(txnId, OperationType.READ_LAC);
+ completionKey = new V3CompletionKey(txnId, OperationType.READ_LAC);
// Build the request and calculate the total size to be included in the packet.
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
@@ -716,40 +640,10 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
.setReadLacRequest(readLacBuilder)
.build();
}
- final Object readLacRequest = request;
- final CompletionKey completionKey = completion;
-
completionObjects.put(completionKey,
- new ReadLacCompletion(readLacOpLogger, cb, ctx, ledgerId,
- scheduleTimeout(completionKey, readEntryTimeout)));
- final Channel c = channel;
- if (c == null) {
- errorOutReadLacKey(completionKey);
- return;
- }
-
- try {
- ChannelFuture future = c.writeAndFlush(readLacRequest);
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Succssfully wrote request {} to {}", readLacRequest, c.remoteAddress());
- }
- } else {
- if (!(future.cause() instanceof ClosedChannelException)) {
- LOG.warn("Writing readLac(lid = {}) to channel {} failed : ",
- new Object[] { ledgerId, c, future.cause() });
- }
- errorOutReadLacKey(completionKey);
- }
- }
- });
- } catch(Throwable e) {
- LOG.warn("Read LAC operation {} failed", readLacRequest, e);
- errorOutReadLacKey(completionKey);
- }
+ new ReadLacCompletion(completionKey, cb,
+ ctx, ledgerId));
+ writeAndFlush(channel, completionKey, request);
}
/**
@@ -783,14 +677,14 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
final ReadEntryCallback cb,
final Object ctx) {
Object request = null;
- CompletionKey completion = null;
+ CompletionKey completionKey = null;
if (useV2WireProtocol) {
request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
ledgerId, entryId, (short) 0);
- completion = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
+ completionKey = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
} else {
final long txnId = getTxnId();
- completion = new V3CompletionKey(txnId, OperationType.READ_ENTRY);
+ completionKey = new V3CompletionKey(txnId, OperationType.READ_ENTRY);
// Build the request and calculate the total size to be included in the packet.
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
@@ -831,49 +725,19 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
.setReadRequest(readBuilder)
.build();
}
- final Object readRequest = request;
- final CompletionKey completionKey = completion;
completionObjects.put(completionKey,
- new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
- scheduleTimeout(completionKey, readEntryTimeout)));
- final Channel c = channel;
- if (c == null) {
- errorOutReadKey(completionKey);
- return;
- }
-
- try{
- ChannelFuture future = c.writeAndFlush(readRequest);
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully wrote request {} to {}",
- readRequest, c.remoteAddress());
- }
- } else {
- if (!(future.cause() instanceof ClosedChannelException)) {
- LOG.warn("Writing readEntry(lid={}, eid={}) to channel {} failed : ",
- new Object[] { ledgerId, entryId, c, future.cause() });
- }
- errorOutReadKey(completionKey);
- }
- }
- });
- } catch(Throwable e) {
- LOG.warn("Read entry operation {} failed", readRequest, e);
- errorOutReadKey(completionKey);
- }
+ new ReadCompletion(completionKey, cb,
+ ctx, ledgerId, entryId));
+ writeAndFlush(channel, completionKey, request);
}
public void getBookieInfo(final long requested, GetBookieInfoCallback cb, Object ctx) {
final long txnId = getTxnId();
final CompletionKey completionKey = new V3CompletionKey(txnId, OperationType.GET_BOOKIE_INFO);
completionObjects.put(completionKey,
- new GetBookieInfoCompletion(this, getBookieInfoOpLogger, cb, ctx,
- scheduleTimeout(completionKey, getBookieInfoTimeout)));
+ new GetBookieInfoCompletion(
+ completionKey, cb, ctx));
// Build the request and calculate the total size to be included in the packet.
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
@@ -889,35 +753,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
.setGetBookieInfoRequest(getBookieInfoBuilder)
.build();
- final Channel c = channel;
- if (c == null) {
- errorOutGetBookieInfoKey(completionKey);
- return;
- }
-
- try{
- ChannelFuture future = c.writeAndFlush(getBookieInfoRequest);
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully wrote request {} to {}",
- getBookieInfoRequest, c.remoteAddress());
- }
- } else {
- if (!(future.cause() instanceof ClosedChannelException)) {
- LOG.warn("Writing GetBookieInfoRequest(flags={}) to channel {} failed : ",
- new Object[] { requested, c, future.cause() });
- }
- errorOutGetBookieInfoKey(completionKey);
- }
- }
- });
- } catch(Throwable e) {
- LOG.warn("Get metadata operation {} failed", getBookieInfoRequest, e);
- errorOutGetBookieInfoKey(completionKey);
- }
+ writeAndFlush(channel, completionKey, getBookieInfoRequest);
}
/**
@@ -980,163 +816,72 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
return c.close();
}
- void errorStartTLS(int rc) {
- failTLS(rc);
- }
-
- void errorOutReadKey(final CompletionKey key) {
- errorOutReadKey(key, BKException.Code.BookieHandleNotAvailableException);
- }
-
- void errorOutReadKey(final CompletionKey key, final int rc) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removing completion key: {}", key);
- }
- final ReadCompletion readCompletion = (ReadCompletion)completionObjects.remove(key);
- if (null == readCompletion) {
+ private void writeAndFlush(final Channel channel,
+ final CompletionKey key,
+ final Object request) {
+ if (channel == null) {
+ errorOut(key);
return;
}
- executor.submitOrdered(readCompletion.ledgerId, new SafeRunnable() {
- @Override
- public void safeRun() {
- String bAddress = "null";
- Channel c = channel;
- if (c != null && c.remoteAddress() != null) {
- bAddress = c.remoteAddress().toString();
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Could not write request for reading entry: {} ledger-id: {} bookie: {} rc: {}",
- new Object[] { readCompletion.entryId, readCompletion.ledgerId, bAddress, rc });
- }
-
- readCompletion.cb.readEntryComplete(rc, readCompletion.ledgerId, readCompletion.entryId,
- null, readCompletion.ctx);
- }
-
- @Override
- public String toString() {
- return String.format("ErrorOutReadKey(%s)", key);
- }
- });
+ try{
+ channel.writeAndFlush(request)
+ .addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future)
+ throws Exception {
+ if (future.isSuccess()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Successfully wrote request {} to {}",
+ requestToString(request),
+ channel.remoteAddress());
+ }
+ } else {
+ if (!(future.cause()
+ instanceof ClosedChannelException)) {
+ LOG.warn("Writing request {} to {} failed : ",
+ requestToString(request),
+ channel, future.cause());
+ }
+ errorOut(key);
+ }
+ }
+ });
+ } catch(Throwable e) {
+ LOG.warn("Operation {} failed", requestToString(request), e);
+ errorOut(key);
+ }
}
- void errorOutWriteLacKey(final CompletionKey key) {
- errorOutWriteLacKey(key, BKException.Code.BookieHandleNotAvailableException);
+ private static String requestToString(Object request) {
+ if (request instanceof BookkeeperProtocol.Request) {
+ BookkeeperProtocol.BKPacketHeader header
+ = ((BookkeeperProtocol.Request)request).getHeader();
+ return String.format("Req(txnId=%d,op=%s,version=%s)",
+ header.getTxnId(), header.getOperation(),
+ header.getVersion());
+ } else {
+ return request.toString();
+ }
}
-
- void errorOutWriteLacKey(final CompletionKey key, final int rc) {
+ void errorOut(final CompletionKey key) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing completion key: {}", key);
}
- final WriteLacCompletion writeLacCompletion = (WriteLacCompletion)completionObjects.remove(key);
- if (null == writeLacCompletion) {
- return;
+ CompletionValue completion = completionObjects.remove(key);
+ if (completion != null) {
+ completion.errorOut();
}
- executor.submitOrdered(writeLacCompletion.ledgerId, new SafeRunnable() {
- @Override
- public void safeRun() {
- String bAddress = "null";
- Channel c = channel;
- if (c != null) {
- bAddress = c.remoteAddress().toString();
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Could not write request writeLac for ledgerId: {} bookie: {}",
- writeLacCompletion.ledgerId, bAddress);
- }
- writeLacCompletion.cb.writeLacComplete(rc, writeLacCompletion.ledgerId, addr, writeLacCompletion.ctx);
- }
- });
}
- void errorOutReadLacKey(final CompletionKey key) {
- errorOutReadLacKey(key, BKException.Code.BookieHandleNotAvailableException);
- }
-
- void errorOutReadLacKey(final CompletionKey key, final int rc) {
+ void errorOut(final CompletionKey key, final int rc) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing completion key: {}", key);
}
- final ReadLacCompletion readLacCompletion = (ReadLacCompletion)completionObjects.remove(key);
- if (null == readLacCompletion) {
- return;
- }
- executor.submitOrdered(readLacCompletion.ledgerId, new SafeRunnable() {
- @Override
- public void safeRun() {
- String bAddress = "null";
- Channel c = channel;
- if (c != null) {
- bAddress = c.remoteAddress().toString();
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Could not write request readLac for ledgerId: {} bookie: {}", readLacCompletion.ledgerId,
- bAddress);
- }
- readLacCompletion.cb.readLacComplete(rc, readLacCompletion.ledgerId, null, null, readLacCompletion.ctx);
- }
- });
- }
-
- void errorOutAddKey(final CompletionKey key) {
- errorOutAddKey(key, BKException.Code.BookieHandleNotAvailableException);
- }
-
- void errorOutAddKey(final CompletionKey key, final int rc) {
- final AddCompletion addCompletion = (AddCompletion)completionObjects.remove(key);
- if (null == addCompletion) {
- return;
- }
- executor.submitOrdered(addCompletion.ledgerId, new SafeRunnable() {
- @Override
- public void safeRun() {
- String bAddress = "null";
- Channel c = channel;
- if (c != null && c.remoteAddress() != null) {
- bAddress = c.remoteAddress().toString();
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie: {} rc: {}",
- new Object[] { addCompletion.entryId, addCompletion.ledgerId, bAddress, rc });
- }
-
- addCompletion.cb.writeComplete(rc, addCompletion.ledgerId, addCompletion.entryId,
- addr, addCompletion.ctx);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Invoked callback method: {}", addCompletion.entryId);
- }
- }
-
- @Override
- public String toString() {
- return String.format("ErrorOutAddKey(%s)", key);
- }
- });
- }
-
- void errorOutGetBookieInfoKey(final CompletionKey key) {
- errorOutGetBookieInfoKey(key, BKException.Code.BookieHandleNotAvailableException);
- }
-
- void errorOutGetBookieInfoKey(final CompletionKey key, final int rc) {
- final GetBookieInfoCompletion getBookieInfoCompletion = (GetBookieInfoCompletion)completionObjects.remove(key);
- if (null == getBookieInfoCompletion) {
- return;
+ CompletionValue completion = completionObjects.remove(key);
+ if (completion != null) {
+ completion.errorOut(rc);
}
- executor.submit(new SafeRunnable() {
- @Override
- public void safeRun() {
- String bAddress = "null";
- Channel c = channel;
- if (c != null) {
- bAddress = c.remoteAddress().toString();
- }
- LOG.debug("Could not write getBookieInfo request for bookie: {}", new Object[] {bAddress});
- getBookieInfoCompletion.cb.getBookieInfoComplete(rc, new BookieInfo(), getBookieInfoCompletion.ctx);
- }
- });
}
/**
@@ -1155,22 +900,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
// successfully removes the key from the map is the one responsible for
// calling the application callback.
for (CompletionKey key : completionObjects.keySet()) {
- switch (key.operationType) {
- case ADD_ENTRY:
- errorOutAddKey(key, rc);
- break;
- case READ_ENTRY:
- errorOutReadKey(key, rc);
- break;
- case GET_BOOKIE_INFO:
- errorOutGetBookieInfoKey(key, rc);
- break;
- case START_TLS:
- errorStartTLS(rc);
- break;
- default:
- break;
- }
+ errorOut(key, rc);
}
}
@@ -1261,7 +991,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
Response response = (Response) msg;
readV3Response(response);
} else {
- ctx.fireChannelRead(msg);
+ ctx.fireChannelRead(msg);
}
}
@@ -1284,28 +1014,12 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
long orderingKey = completionValue.ledgerId;
executor.submitOrdered(orderingKey, new SafeRunnable() {
- @Override
- public void safeRun() {
- switch (operationType) {
- case ADD_ENTRY: {
- handleAddResponse(ledgerId, entryId, status, completionValue);
- break;
- }
- case READ_ENTRY: {
- BookieProtocol.ReadResponse readResponse = (BookieProtocol.ReadResponse) response;
- ByteBuf data = null;
- if (readResponse.hasData()) {
- data = readResponse.getData();
- }
- handleReadResponse(ledgerId, entryId, status, data, INVALID_ENTRY_ID, -1L, completionValue);
- break;
- }
- default:
- LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring", operationType, addr);
- break;
+ @Override
+ public void safeRun() {
+ completionValue.handleV2Response(ledgerId, entryId,
+ status, response);
}
- }
- });
+ });
}
}
@@ -1370,71 +1084,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
executor.submitOrdered(orderingKey, new SafeRunnable() {
@Override
public void safeRun() {
- OperationType type = header.getOperation();
- switch (type) {
- case ADD_ENTRY: {
- AddResponse addResponse = response.getAddResponse();
- StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus();
- handleAddResponse(addResponse.getLedgerId(), addResponse.getEntryId(), status, completionValue);
- break;
- }
- case READ_ENTRY: {
- ReadResponse readResponse = response.getReadResponse();
- StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus();
- ByteBuf buffer = Unpooled.EMPTY_BUFFER;
- if (readResponse.hasBody()) {
- buffer = Unpooled.wrappedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
- }
- long maxLAC = INVALID_ENTRY_ID;
- if (readResponse.hasMaxLAC()) {
- maxLAC = readResponse.getMaxLAC();
- }
- long lacUpdateTimestamp = -1L;
- if (readResponse.hasLacUpdateTimestamp()) {
- lacUpdateTimestamp = readResponse.getLacUpdateTimestamp();
- }
- handleReadResponse(readResponse.getLedgerId(), readResponse.getEntryId(), status, buffer, maxLAC, lacUpdateTimestamp, completionValue);
- break;
- }
- case WRITE_LAC: {
- WriteLacResponse writeLacResponse = response.getWriteLacResponse();
- StatusCode status = response.getStatus() == StatusCode.EOK ? writeLacResponse.getStatus() : response.getStatus();
- handleWriteLacResponse(writeLacResponse.getLedgerId(), status, completionValue);
- break;
- }
- case READ_LAC: {
- ReadLacResponse readLacResponse = response.getReadLacResponse();
- ByteBuf lacBuffer = Unpooled.EMPTY_BUFFER;
- ByteBuf lastEntryBuffer = Unpooled.EMPTY_BUFFER;
- StatusCode status = response.getStatus() == StatusCode.EOK ? readLacResponse.getStatus() : response.getStatus();
- // Thread.dumpStack();
-
- if (readLacResponse.hasLacBody()) {
- lacBuffer = Unpooled.wrappedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
- }
-
- if (readLacResponse.hasLastEntryBody()) {
- lastEntryBuffer = Unpooled.wrappedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
- }
- handleReadLacResponse(readLacResponse.getLedgerId(), status, lacBuffer, lastEntryBuffer, completionValue);
- break;
- }
- case GET_BOOKIE_INFO: {
- GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse();
- StatusCode status = response.getStatus() == StatusCode.EOK ? getBookieInfoResponse.getStatus() : response.getStatus();
- handleGetBookieInfoResponse(getBookieInfoResponse.getFreeDiskSpace(), getBookieInfoResponse.getTotalDiskCapacity(), status, completionValue);
- break;
- }
- case START_TLS: {
- StatusCode status = response.getStatus();
- handleStartTLSResponse(status, completionValue);
- break;
- }
- default:
- LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring",
- type, addr);
- break;
- }
+ completionValue.handleV3Response(response);
}
@Override
@@ -1447,37 +1097,12 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
}
- void handleStartTLSResponse(StatusCode status, CompletionValue completionValue) {
- StartTLSCompletion tlsCompletion = (StartTLSCompletion) completionValue;
-
- // convert to BKException code because thats what the upper
- // layers expect. This is UGLY, there should just be one set of
- // error codes.
- Integer rcToRet = statusCodeToExceptionCode(status);
- if (null == rcToRet) {
- LOG.error("START_TLS failed on bookie:{}", addr);
- rcToRet = BKException.Code.SecurityException;
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received START_TLS response from {} rc: {}", addr, rcToRet);
- }
- }
-
- // Cancel START_TLS request timeout
- tlsCompletion.cb.startTLSComplete(rcToRet, tlsCompletion.ctx);
-
- if (state != ConnectionState.START_TLS) {
- LOG.error("Connection state changed before TLS response received");
- failTLS(BKException.Code.BookieHandleNotAvailableException);
- } else if (status != StatusCode.EOK) {
- LOG.error("Client received error {} during TLS negotiation", status);
- failTLS(BKException.Code.SecurityException);
- } else {
- // create TLS handler
- PerChannelBookieClient parentObj = PerChannelBookieClient.this;
- SslHandler handler = parentObj.shFactory.newTLSHandler();
- channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), handler);
- handler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
+ void initTLSHandshake() {
+ // create TLS handler
+ PerChannelBookieClient parentObj = PerChannelBookieClient.this;
+ SslHandler handler = parentObj.shFactory.newTLSHandler();
+ channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), handler);
+ handler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
int rc;
@@ -1536,361 +1161,487 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
}
});
- }
}
- void handleWriteLacResponse(long ledgerId, StatusCode status, CompletionValue completionValue) {
- // The completion value should always be an instance of an WriteLacCompletion object when we reach here.
- WriteLacCompletion plc = (WriteLacCompletion)completionValue;
+ /**
+ * Boiler-plate wrapper classes follow
+ *
+ */
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got response for writeLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: "
- + status);
+ // visible for testing
+ abstract class CompletionValue {
+ final Object ctx;
+ protected final long ledgerId;
+ protected final long entryId;
+ private final long startTime;
+ private final OpStatsLogger opLogger;
+ private final OpStatsLogger timeoutOpLogger;
+ protected final Timeout timeout;
+ private final String operationName;
+
+ public CompletionValue(String operationName,
+ Object ctx,
+ long ledgerId, long entryId,
+ OpStatsLogger opLogger,
+ OpStatsLogger timeoutOpLogger,
+ Timeout timeout) {
+ this.operationName = operationName;
+ this.ctx = ctx;
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
+ this.startTime = MathUtils.nowInNano();
+ this.opLogger = opLogger;
+ this.timeoutOpLogger = timeoutOpLogger;
+ this.timeout = timeout;
}
- // convert to BKException code
- Integer rcToRet = statusCodeToExceptionCode(status);
- if (null == rcToRet) {
- LOG.error("writeLac for ledger: " + ledgerId + " failed on bookie: " + addr
- + " with code:" + status);
- rcToRet = BKException.Code.WriteException;
+ private long latency() {
+ return MathUtils.elapsedNanos(startTime);
}
- plc.cb.writeLacComplete(rcToRet, ledgerId, addr, plc.ctx);
- }
- void handleAddResponse(long ledgerId, long entryId, StatusCode status, CompletionValue completionValue) {
- // The completion value should always be an instance of an AddCompletion object when we reach here.
- AddCompletion ac = (AddCompletion)completionValue;
+ void cancelTimeoutAndLogOp(int rc) {
+ if (null != timeout) {
+ timeout.cancel();
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
- + entryId + " rc: " + status);
- }
- // convert to BKException code because thats what the upper
- // layers expect. This is UGLY, there should just be one set of
- // error codes.
- Integer rcToRet = statusCodeToExceptionCode(status);
- if (null == rcToRet) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Add for ledger: " + ledgerId + ", entry: " + entryId + " failed on bookie: " + addr
- + " with code:" + status);
+ if (rc != BKException.Code.OK) {
+ opLogger.registerFailedEvent(latency(), TimeUnit.NANOSECONDS);
+ } else {
+ opLogger.registerSuccessfulEvent(latency(), TimeUnit.NANOSECONDS);
}
- rcToRet = BKException.Code.WriteException;
- }
- ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx);
- }
- void handleReadLacResponse(long ledgerId, StatusCode status, ByteBuf lacBuffer, ByteBuf lastEntryBuffer, CompletionValue completionValue) {
- // The completion value should always be an instance of an WriteLacCompletion object when we reach here.
- ReadLacCompletion glac = (ReadLacCompletion)completionValue;
+ if (rc != BKException.Code.OK
+ && !expectedBkOperationErrors.contains(rc)) {
+ recordError();
+ }
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got response for readLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: "
- + status);
+ void timeout() {
+ errorOut(BKException.Code.TimeoutException);
+ timeoutOpLogger.registerSuccessfulEvent(latency(),
+ TimeUnit.NANOSECONDS);
}
- // convert to BKException code
- Integer rcToRet = statusCodeToExceptionCode(status);
- if (null == rcToRet) {
+
+ protected int logAndConvertStatus(StatusCode status, int defaultStatus,
+ Object... extraInfo) {
if (LOG.isDebugEnabled()) {
- LOG.debug("readLac for ledger: " + ledgerId + " failed on bookie: " + addr + " with code:" + status);
+ LOG.debug("Got {} response from bookie:{} rc:{}, {}",
+ operationName, addr, status,
+ Joiner.on(":").join(extraInfo));
}
- rcToRet = BKException.Code.ReadException;
- }
- glac.cb.readLacComplete(rcToRet, ledgerId, lacBuffer.slice(), lastEntryBuffer.slice(), glac.ctx);
- }
-
- void handleReadResponse(long ledgerId,
- long entryId,
- StatusCode status,
- ByteBuf buffer,
- long maxLAC, // max known lac piggy-back from bookies
- long lacUpdateTimestamp, // the timestamp when the lac is updated.
- CompletionValue completionValue) {
- // The completion value should always be an instance of a ReadCompletion object when we reach here.
- ReadCompletion rc = (ReadCompletion)completionValue;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got response for read request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
- + entryId + " rc: " + rc + " entry length: " + buffer.readableBytes());
+ // convert to BKException code
+ Integer rcToRet = statusCodeToExceptionCode(status);
+ if (null == rcToRet) {
+ LOG.error("{} for failed on bookie {} code {}",
+ operationName, addr, status);
+ return defaultStatus;
+ } else {
+ return rcToRet;
+ }
}
- // convert to BKException code because thats what the uppper
- // layers expect. This is UGLY, there should just be one set of
- // error codes.
- Integer rcToRet = statusCodeToExceptionCode(status);
- if (null == rcToRet) {
- LOG.error("Read entry for ledger:{}, entry:{} failed on bookie:{} with code:{}",
- new Object[] { ledgerId, entryId, addr, status });
- rcToRet = BKException.Code.ReadException;
- }
- if(buffer != null) {
- buffer = buffer.slice();
- }
- if (maxLAC > INVALID_ENTRY_ID && (rc.ctx instanceof ReadEntryCallbackCtx)) {
- ((ReadEntryCallbackCtx) rc.ctx).setLastAddConfirmed(maxLAC);
- }
- if (lacUpdateTimestamp > -1L && (rc.ctx instanceof ReadLastConfirmedAndEntryContext)) {
- ((ReadLastConfirmedAndEntryContext) rc.ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
- }
- rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer, rc.ctx);
- }
- void handleGetBookieInfoResponse(long freeDiskSpace, long totalDiskCapacity, StatusCode status, CompletionValue completionValue) {
- // The completion value should always be an instance of a GetBookieInfoCompletion object when we reach here.
- GetBookieInfoCompletion rc = (GetBookieInfoCompletion)completionValue;
+ public abstract void errorOut();
+ public abstract void errorOut(final int rc);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got response for read metadata request from bookie: {} rc {}", addr, rc);
+ protected void errorOutAndRunCallback(final Runnable callback) {
+ executor.submitOrdered(ledgerId,
+ new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ String bAddress = "null";
+ Channel c = channel;
+ if (c != null && c.remoteAddress() != null) {
+ bAddress = c.remoteAddress().toString();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Could not write {} request writeLac to bookie {} for ledger {}, entry {}",
+ operationName, bAddress,
+ ledgerId, entryId);
+ }
+ callback.run();
+ }
+ });
}
- // convert to BKException code because thats what the upper
- // layers expect. This is UGLY, there should just be one set of
- // error codes.
- Integer rcToRet = statusCodeToExceptionCode(status);
- if (null == rcToRet) {
- LOG.error("Read metadata failed on bookie:{} with code:{}",
- new Object[] { addr, status });
- rcToRet = BKException.Code.ReadException;
+ public void handleV2Response(
+ long ledgerId, long entryId, StatusCode status,
+ BookieProtocol.Response response) {
+ LOG.warn("Unhandled V2 response {}", response);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Response received from bookie info read: freeDiskSpace=" + freeDiskSpace + " totalDiskSpace:"
- + totalDiskCapacity);
- }
- rc.cb.getBookieInfoComplete(rcToRet, new BookieInfo(totalDiskCapacity, freeDiskSpace), rc.ctx);
+ public abstract void handleV3Response(
+ BookkeeperProtocol.Response response);
}
- /**
- * Boiler-plate wrapper classes follow
- *
- */
-
// visible for testing
- static abstract class CompletionValue {
- final Object ctx;
- protected final long ledgerId;
- protected final long entryId;
- protected final Timeout timeout;
+ class WriteLacCompletion extends CompletionValue {
+ final WriteLacCallback cb;
- public CompletionValue(Object ctx, long ledgerId, long entryId,
- Timeout timeout) {
- this.ctx = ctx;
- this.ledgerId = ledgerId;
- this.entryId = entryId;
- this.timeout = timeout;
+ public WriteLacCompletion(CompletionKey key,
+ final WriteLacCallback originalCallback,
+ final Object originalCtx,
+ final long ledgerId) {
+ super("WriteLAC",
+ originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
+ writeLacOpLogger, writeLacTimeoutOpLogger,
+ scheduleTimeout(key, addEntryTimeout));
+ this.cb = new WriteLacCallback() {
+ @Override
+ public void writeLacComplete(int rc, long ledgerId,
+ BookieSocketAddress addr,
+ Object ctx) {
+ cancelTimeoutAndLogOp(rc);
+ originalCallback.writeLacComplete(rc, ledgerId,
+ addr, originalCtx);
+ }
+ };
}
- void cancelTimeout() {
- if (null != timeout) {
- timeout.cancel();
- }
+ @Override
+ public void errorOut() {
+ errorOut(BKException.Code.BookieHandleNotAvailableException);
}
- }
- // visible for testing
- static class WriteLacCompletion extends CompletionValue {
- final WriteLacCallback cb;
-
- public WriteLacCompletion(WriteLacCallback cb, Object ctx, long ledgerId) {
- this(null, cb, ctx, ledgerId, null);
+ @Override
+ public void errorOut(final int rc) {
+ errorOutAndRunCallback(
+ () -> cb.writeLacComplete(rc, ledgerId, addr, ctx));
}
- public WriteLacCompletion(final OpStatsLogger writeLacOpLogger, final WriteLacCallback originalCallback,
- final Object originalCtx, final long ledgerId, final Timeout timeout) {
- super(originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, timeout);
- final long startTime = MathUtils.nowInNano();
- this.cb = null == writeLacOpLogger ? originalCallback : new WriteLacCallback() {
- @Override
- public void writeLacComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx) {
- cancelTimeout();
- long latency = MathUtils.elapsedNanos(startTime);
- if (rc != BKException.Code.OK) {
- writeLacOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
- } else {
- writeLacOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
- }
- originalCallback.writeLacComplete(rc, ledgerId, addr, originalCtx);
- }
- };
+ @Override
+ public void handleV3Response(BookkeeperProtocol.Response response) {
+ WriteLacResponse writeLacResponse = response.getWriteLacResponse();
+ StatusCode status = response.getStatus() == StatusCode.EOK ?
+ writeLacResponse.getStatus() : response.getStatus();
+ long ledgerId = writeLacResponse.getLedgerId();
+ int rc = logAndConvertStatus(status,
+ BKException.Code.WriteException,
+ "ledger", ledgerId);
+ cb.writeLacComplete(rc, ledgerId, addr, ctx);
}
}
// visible for testing
- static class ReadLacCompletion extends CompletionValue {
+ class ReadLacCompletion extends CompletionValue {
final ReadLacCallback cb;
- public ReadLacCompletion(ReadLacCallback cb, Object ctx, long ledgerId) {
- this (null, cb, ctx, ledgerId, null);
+ public ReadLacCompletion(CompletionKey key,
+ ReadLacCallback originalCallback,
+ final Object ctx, final long ledgerId) {
+ super("ReadLAC", ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
+ readLacOpLogger, readLacTimeoutOpLogger,
+ scheduleTimeout(key, readEntryTimeout));
+ this.cb = new ReadLacCallback() {
+ @Override
+ public void readLacComplete(int rc, long ledgerId,
+ ByteBuf lacBuffer,
+ ByteBuf lastEntryBuffer,
+ Object ctx) {
+ cancelTimeoutAndLogOp(rc);
+ originalCallback.readLacComplete(
+ rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
+ }
+ };
}
- public ReadLacCompletion(final OpStatsLogger readLacOpLogger, final ReadLacCallback originalCallback,
- final Object ctx, final long ledgerId, final Timeout timeout) {
- super(ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, timeout);
- final long startTime = MathUtils.nowInNano();
- this.cb = null == readLacOpLogger ? originalCallback : new ReadLacCallback() {
- @Override
- public void readLacComplete(int rc, long ledgerId, ByteBuf lacBuffer, ByteBuf lastEntryBuffer,
- Object ctx) {
- cancelTimeout();
- long latency = MathUtils.elapsedNanos(startTime);
- if (rc != BKException.Code.OK) {
- readLacOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
- } else {
- readLacOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
- }
- originalCallback.readLacComplete(rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
- }
- };
+ @Override
+ public void errorOut() {
+ errorOut(BKException.Code.BookieHandleNotAvailableException);
+ }
+
+ @Override
+ public void errorOut(final int rc) {
+ errorOutAndRunCallback(
+ () -> cb.readLacComplete(rc, ledgerId, null, null, ctx));
+ }
+
+ @Override
+ public void handleV3Response(BookkeeperProtocol.Response response) {
+ ReadLacResponse readLacResponse = response.getReadLacResponse();
+ ByteBuf lacBuffer = Unpooled.EMPTY_BUFFER;
+ ByteBuf lastEntryBuffer = Unpooled.EMPTY_BUFFER;
+ StatusCode status = response.getStatus() == StatusCode.EOK ? readLacResponse.getStatus() : response.getStatus();
+
+ if (readLacResponse.hasLacBody()) {
+ lacBuffer = Unpooled.wrappedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
+ }
+
+ if (readLacResponse.hasLastEntryBody()) {
+ lastEntryBuffer = Unpooled.wrappedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
+ }
+
+ int rc = logAndConvertStatus(status,
+ BKException.Code.ReadException,
+ "ledger", ledgerId);
+ cb.readLacComplete(rc, ledgerId, lacBuffer.slice(),
+ lastEntryBuffer.slice(), ctx);
}
}
// visible for testing
- static class ReadCompletion extends CompletionValue {
+ class ReadCompletion extends CompletionValue {
final ReadEntryCallback cb;
- public ReadCompletion(final PerChannelBookieClient pcbc, ReadEntryCallback cb, Object ctx,
- long ledgerId, long entryId) {
- this(pcbc, null, cb, ctx, ledgerId, entryId, null);
- }
-
- public ReadCompletion(final PerChannelBookieClient pcbc, final OpStatsLogger readEntryOpLogger,
+ public ReadCompletion(CompletionKey key,
final ReadEntryCallback originalCallback,
- final Object originalCtx, final long ledgerId, final long entryId,
- final Timeout timeout) {
- super(originalCtx, ledgerId, entryId, timeout);
- final long startTime = MathUtils.nowInNano();
+ final Object originalCtx,
+ long ledgerId, final long entryId) {
+ super("Read", originalCtx, ledgerId, entryId,
+ readEntryOpLogger, readTimeoutOpLogger,
+ scheduleTimeout(key, readEntryTimeout));
+
this.cb = new ReadEntryCallback() {
- @Override
- public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx) {
- cancelTimeout();
- if (readEntryOpLogger != null) {
- long latency = MathUtils.elapsedNanos(startTime);
- if (rc != BKException.Code.OK) {
- readEntryOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
- } else {
- readEntryOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
- }
+ @Override
+ public void readEntryComplete(int rc, long ledgerId,
+ long entryId, ByteBuf buffer,
+ Object ctx) {
+ cancelTimeoutAndLogOp(rc);
+ originalCallback.readEntryComplete(rc,
+ ledgerId, entryId,
+ buffer, originalCtx);
}
+ };
+ }
- if (rc != BKException.Code.OK && !expectedBkOperationErrors.contains(rc)) {
- pcbc.recordError();
- }
+ @Override
+ public void errorOut() {
+ errorOut(BKException.Code.BookieHandleNotAvailableException);
+ }
- originalCallback.readEntryComplete(rc, ledgerId, entryId, buffer, originalCtx);
- }
- };
+ @Override
+ public void errorOut(final int rc) {
+ errorOutAndRunCallback(
+ () -> cb.readEntryComplete(rc, ledgerId,
+ entryId, null, ctx));
}
- }
- static class StartTLSCompletion extends CompletionValue {
- final StartTLSCallback cb;
+ @Override
+ public void handleV2Response(long ledgerId, long entryId,
+ StatusCode status,
+ BookieProtocol.Response response) {
+ if (!(response instanceof BookieProtocol.ReadResponse)) {
+ return;
+ }
+ BookieProtocol.ReadResponse readResponse = (BookieProtocol.ReadResponse) response;
+ ByteBuf data = null;
+ if (readResponse.hasData()) {
+ data = readResponse.getData();
+ }
+ handleReadResponse(ledgerId, entryId, status, data,
+ INVALID_ENTRY_ID, -1L);
+ }
- public StartTLSCompletion(final PerChannelBookieClient pcbc, StartTLSCallback cb, Object ctx) {
- this(pcbc, null, cb, ctx, null);
+ @Override
+ public void handleV3Response(BookkeeperProtocol.Response response) {
+ ReadResponse readResponse = response.getReadResponse();
+ StatusCode status = response.getStatus() == StatusCode.EOK
+ ? readResponse.getStatus() : response.getStatus();
+ ByteBuf buffer = Unpooled.EMPTY_BUFFER;
+ if (readResponse.hasBody()) {
+ buffer = Unpooled.wrappedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
+ }
+ long maxLAC = INVALID_ENTRY_ID;
+ if (readResponse.hasMaxLAC()) {
+ maxLAC = readResponse.getMaxLAC();
+ }
+ long lacUpdateTimestamp = -1L;
+ if (readResponse.hasLacUpdateTimestamp()) {
+ lacUpdateTimestamp = readResponse.getLacUpdateTimestamp();
+ }
+ handleReadResponse(readResponse.getLedgerId(),
+ readResponse.getEntryId(),
+ status, buffer, maxLAC, lacUpdateTimestamp);
+ }
+
+ private void handleReadResponse(long ledgerId,
+ long entryId,
+ StatusCode status,
+ ByteBuf buffer,
+ long maxLAC, // max known lac piggy-back from bookies
+ long lacUpdateTimestamp) { // the timestamp when the lac is updated.
+ int readableBytes = buffer == null ? 0 : buffer.readableBytes();
+ int rc = logAndConvertStatus(status,
+ BKException.Code.ReadException,
+ "ledger", ledgerId,
+ "entry", entryId,
+ "entryLength", readableBytes);
+
+ if(buffer != null) {
+ buffer = buffer.slice();
+ }
+ if (maxLAC > INVALID_ENTRY_ID
+ && (ctx instanceof ReadEntryCallbackCtx)) {
+ ((ReadEntryCallbackCtx)ctx).setLastAddConfirmed(maxLAC);
+ }
+ if (lacUpdateTimestamp > -1L
+ && (ctx instanceof ReadLastConfirmedAndEntryContext)) {
+ ((ReadLastConfirmedAndEntryContext)ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
+ }
+ cb.readEntryComplete(rc, ledgerId, entryId, buffer, ctx);
}
+ }
+
+ class StartTLSCompletion extends CompletionValue {
+ final StartTLSCallback cb;
- public StartTLSCompletion(final PerChannelBookieClient pcbc, final OpStatsLogger startTLSOpLogger,
- final StartTLSCallback originalCallback, final Object originalCtx, final Timeout timeout) {
- super(originalCtx, -1, -1, timeout);
- final long startTime = MathUtils.nowInNano();
+ public StartTLSCompletion(CompletionKey key) {
+ super("StartTLS", null, -1, -1,
+ startTLSOpLogger, startTLSTimeoutOpLogger,
+ scheduleTimeout(key, startTLSTimeout));
this.cb = new StartTLSCallback() {
@Override
public void startTLSComplete(int rc, Object ctx) {
- cancelTimeout();
- if (startTLSOpLogger != null) {
- long latency = MathUtils.elapsedNanos(startTime);
- if (rc != BKException.Code.OK) {
- startTLSOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
- } else {
- startTLSOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
- }
- }
-
- if (rc != BKException.Code.OK && !expectedBkOperationErrors.contains(rc)) {
- pcbc.recordError();
- }
-
- if (originalCallback != null) {
- originalCallback.startTLSComplete(rc, originalCtx);
- }
+ cancelTimeoutAndLogOp(rc);
}
};
}
+
+ @Override
+ public void errorOut() {
+ errorOut(BKException.Code.BookieHandleNotAvailableException);
+ }
+
+ @Override
+ public void errorOut(final int rc) {
+ failTLS(rc);
+ }
+
+ @Override
+ public void handleV3Response(BookkeeperProtocol.Response response) {
+ StatusCode status = response.getStatus();
+
+ int rc = logAndConvertStatus(status,
+ BKException.Code.SecurityException);
+
+ // Cancel START_TLS request timeout
+ cb.startTLSComplete(rc, null);
+
+ if (state != ConnectionState.START_TLS) {
+ LOG.error("Connection state changed before TLS response received");
+ failTLS(BKException.Code.BookieHandleNotAvailableException);
+ } else if (status != StatusCode.EOK) {
+ LOG.error("Client received error {} during TLS negotiation", status);
+ failTLS(BKException.Code.SecurityException);
+ } else {
+ initTLSHandshake();
+ }
+ }
+
}
// visible for testing
- static class GetBookieInfoCompletion extends CompletionValue {
+ class GetBookieInfoCompletion extends CompletionValue {
final GetBookieInfoCallback cb;
- public GetBookieInfoCompletion(final PerChannelBookieClient pcbc, GetBookieInfoCallback cb, Object ctx) {
- this(pcbc, null, cb, ctx, null);
+ public GetBookieInfoCompletion(CompletionKey key,
+ final GetBookieInfoCallback origCallback,
+ final Object origCtx) {
+ super("GetBookieInfo", origCtx, 0L, 0L,
+ getBookieInfoOpLogger, getBookieInfoTimeoutOpLogger,
+ scheduleTimeout(key, getBookieInfoTimeout));
+ this.cb = new GetBookieInfoCallback() {
+ @Override
+ public void getBookieInfoComplete(int rc, BookieInfo bInfo,
+ Object ctx) {
+ cancelTimeoutAndLogOp(rc);
+ origCallback.getBookieInfoComplete(rc, bInfo, origCtx);
+ }
+ };
}
- public GetBookieInfoCompletion(final PerChannelBookieClient pcbc, final OpStatsLogger getBookieInfoOpLogger,
- final GetBookieInfoCallback originalCallback,
- final Object originalCtx, final Timeout timeout) {
- super(originalCtx, 0L, 0L, timeout);
- final long startTime = MathUtils.nowInNano();
- this.cb = (null == getBookieInfoOpLogger) ? originalCallback : new GetBookieInfoCallback() {
- @Override
- public void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx) {
- cancelTimeout();
- if (getBookieInfoOpLogger != null) {
- long latency = MathUtils.elapsedNanos(startTime);
- if (rc != BKException.Code.OK) {
- getBookieInfoOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
- } else {
- getBookieInfoOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
- }
- }
+ @Override
+ public void errorOut() {
+ errorOut(BKException.Code.BookieHandleNotAvailableException);
+ }
- if (rc != BKException.Code.OK && !expectedBkOperationErrors.contains(rc)) {
- pcbc.recordError();
- }
+ @Override
+ public void errorOut(final int rc) {
+ errorOutAndRunCallback(
+ () -> cb.getBookieInfoComplete(rc, new BookieInfo(), ctx));
+ }
- originalCallback.getBookieInfoComplete(rc, bInfo, originalCtx);
- }
- };
+ @Override
+ public void handleV3Response(BookkeeperProtocol.Response response) {
+ GetBookieInfoResponse getBookieInfoResponse
+ = response.getGetBookieInfoResponse();
+ StatusCode status = response.getStatus() == StatusCode.EOK
+ ? getBookieInfoResponse.getStatus() : response.getStatus();
+
+ long freeDiskSpace = getBookieInfoResponse.getFreeDiskSpace();
+ long totalDiskSpace = getBookieInfoResponse.getTotalDiskCapacity();
+
+ int rc = logAndConvertStatus(status,
+ BKException.Code.ReadException,
+ "freeDisk", freeDiskSpace,
+ "totalDisk", totalDiskSpace);
+ cb.getBookieInfoComplete(rc,
+ new BookieInfo(totalDiskSpace,
+ freeDiskSpace), ctx);
}
}
// visible for testing
- static class AddCompletion extends CompletionValue {
+ class AddCompletion extends CompletionValue {
final WriteCallback cb;
- public AddCompletion(final PerChannelBookieClient pcbc, WriteCallback cb, Object ctx,
- long ledgerId, long entryId) {
- this(pcbc, null, cb, ctx, ledgerId, entryId, null);
- }
-
- public AddCompletion(final PerChannelBookieClient pcbc, final OpStatsLogger addEntryOpLogger,
+ public AddCompletion(CompletionKey key,
final WriteCallback originalCallback,
- final Object originalCtx, final long ledgerId, final long entryId,
- final Timeout timeout) {
- super(originalCtx, ledgerId, entryId, timeout);
- final long startTime = MathUtils.nowInNano();
- this.cb = null == addEntryOpLogger ? originalCallback : new WriteCallback() {
+ final Object originalCtx,
+ final long ledgerId, final long entryId) {
+ super("Add", originalCtx, ledgerId, entryId,
+ addEntryOpLogger, addTimeoutOpLogger,
+ scheduleTimeout(key, addEntryTimeout));
+ this.cb = new WriteCallback() {
@Override
- public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
- cancelTimeout();
- if (pcbc.addEntryOpLogger != null) {
- long latency = MathUtils.elapsedNanos(startTime);
- if (rc != BKException.Code.OK) {
- pcbc.addEntryOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
- } else {
- pcbc.addEntryOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
- }
- }
-
- if (rc != BKException.Code.OK && !expectedBkOperationErrors.contains(rc)) {
- pcbc.recordError();
- }
-
- originalCallback.writeComplete(rc, ledgerId, entryId, addr, originalCtx);
+ public void writeComplete(int rc, long ledgerId, long entryId,
+ BookieSocketAddress addr,
+ Object ctx) {
+ cancelTimeoutAndLogOp(rc);
+ originalCallback.writeComplete(rc, ledgerId, entryId,
+ addr, originalCtx);
}
};
}
+
+ @Override
+ public void errorOut() {
+ errorOut(BKException.Code.BookieHandleNotAvailableException);
+ }
+
+ @Override
+ public void errorOut(final int rc) {
+ errorOutAndRunCallback(
+ () -> cb.writeComplete(rc, ledgerId, entryId, addr, ctx));
+ }
+
+ @Override
+ public void handleV2Response(
+ long ledgerId, long entryId, StatusCode status,
+ BookieProtocol.Response response) {
+ handleResponse(ledgerId, entryId, status);
+ }
+
+ @Override
+ public void handleV3Response(
+ BookkeeperProtocol.Response response) {
+ AddResponse addResponse = response.getAddResponse();
+ StatusCode status = response.getStatus() == StatusCode.EOK
+ ? addResponse.getStatus() : response.getStatus();
+ handleResponse(addResponse.getLedgerId(), addResponse.getEntryId(),
+ status);
+ }
+
+ private void handleResponse(long ledgerId, long entryId,
+ StatusCode status) {
+ int rc = logAndConvertStatus(status,
+ BKException.Code.WriteException,
+ "ledger", ledgerId,
+ "entry", entryId);
+ cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+ }
}
// visable for testing
@@ -1936,16 +1687,11 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
abstract class CompletionKey implements TimerTask {
final long txnId;
final OperationType operationType;
- final long requestAt;
- CompletionKey(long txnId, OperationType operationType) {
+ CompletionKey(long txnId,
+ OperationType operationType) {
this.txnId = txnId;
this.operationType = operationType;
- this.requestAt = MathUtils.nowInNano();
- }
-
- private long elapsedTime() {
- return MathUtils.elapsedNanos(requestAt);
}
@Override
@@ -1953,26 +1699,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
if (timeout.isCancelled()) {
return;
}
- if (OperationType.ADD_ENTRY == operationType) {
- errorOutAddKey(this, BKException.Code.TimeoutException);
- addTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
- } else if (OperationType.READ_ENTRY == operationType) {
- errorOutReadKey(this, BKException.Code.TimeoutException);
- readTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
- } else if (OperationType.WRITE_LAC == operationType) {
- errorOutWriteLacKey(this, BKException.Code.TimeoutException);
- writeLacTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
- } else if (OperationType.READ_LAC == operationType) {
- errorOutReadLacKey(this, BKException.Code.TimeoutException);
- readLacTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
- } else if (OperationType.GET_BOOKIE_INFO == operationType) {
- errorOutGetBookieInfoKey(this, BKException.Code.TimeoutException);
- getBookieInfoTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
- } else if (OperationType.START_TLS == operationType) {
- errorStartTLS(BKException.Code.TimeoutException);
- } else {
- errorOutGetBookieInfoKey(this, BKException.Code.TimeoutException);
- getBookieInfoTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
+ CompletionValue completion = completionObjects.remove(this);
+ if (completion != null) {
+ completion.timeout();
}
}
}
@@ -2051,7 +1780,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
return String.format("%d:%d %s", ledgerId, entryId, operationType);
}
}
-
+
public class ConnectionFutureListener implements ChannelFutureListener {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@@ -2121,8 +1850,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
assert state == ConnectionState.CONNECTING;
final long txnId = getTxnId();
final CompletionKey completionKey = new V3CompletionKey(txnId, OperationType.START_TLS);
- completionObjects.put(completionKey, new StartTLSCompletion(this, startTLSOpLogger, null, null,
- scheduleTimeout(completionKey, startTLSTimeout)));
+ completionObjects.put(completionKey,
+ new StartTLSCompletion(completionKey));
BookkeeperProtocol.Request.Builder h = BookkeeperProtocol.Request.newBuilder();
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
.setVersion(ProtocolVersion.VERSION_THREE)
@@ -2131,16 +1860,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
h.setHeader(headerBuilder.build());
h.setStartTLSRequest(BookkeeperProtocol.StartTLSRequest.newBuilder().build());
state = ConnectionState.START_TLS;
- channel.writeAndFlush(h.build()).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- LOG.error("Failed to send START_TLS request");
- failTLS(BKException.Code.SecurityException);
- }
- }
- });
+ writeAndFlush(channel, completionKey, h.build());
}
+
private void failTLS(int rc) {
LOG.error("TLS failure on: {}, rc: {}", channel, rc);
Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;
--
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].