You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/04/03 19:31:51 UTC
bookkeeper git commit: BOOKKEEPER-1018: Allow client to select older
V2 protocol (no protobuf)
Repository: bookkeeper
Updated Branches:
refs/heads/master 9836c87dc -> 9001e300c
BOOKKEEPER-1018: Allow client to select older V2 protocol (no protobuf)
Originally done by Matteo Merli (merlimat). Tagging sijie and eolivelli for review.
Author: Govind Menon <go...@gmail.com>
Reviewers: Sijie Guo <si...@apache.org>
Closes #120 from govind-menon/BOOKKEEPER-1018
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/9001e300
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/9001e300
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/9001e300
Branch: refs/heads/master
Commit: 9001e300ce0d5d2655d437e3eaa52f91487caed6
Parents: 9836c87
Author: Govind Menon <go...@gmail.com>
Authored: Mon Apr 3 12:31:47 2017 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Mon Apr 3 12:31:47 2017 -0700
----------------------------------------------------------------------
.../bookkeeper/conf/ClientConfiguration.java | 23 +-
.../apache/bookkeeper/proto/AuthHandler.java | 8 +
.../apache/bookkeeper/proto/BookieProtocol.java | 3 +
.../proto/PerChannelBookieClient.java | 436 +++++++++++++------
4 files changed, 336 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9001e300/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index ee137c0..2b75e9e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -57,7 +57,7 @@ public class ClientConfiguration extends AbstractConfiguration {
protected final static String CLIENT_WRITEBUFFER_HIGH_WATER_MARK = "clientWriteBufferHighWaterMark";
protected final static String CLIENT_CONNECT_TIMEOUT_MILLIS = "clientConnectTimeoutMillis";
protected final static String NUM_CHANNELS_PER_BOOKIE = "numChannelsPerBookie";
-
+ protected final static String USE_V2_WIRE_PROTOCOL = "useV2WireProtocol";
// Read Parameters
protected final static String READ_TIMEOUT = "readTimeout";
protected final static String SPECULATIVE_READ_TIMEOUT = "speculativeReadTimeout";
@@ -436,6 +436,27 @@ public class ClientConfiguration extends AbstractConfiguration {
}
/**
+ * Use older Bookkeeper wire protocol (no protobuf)
+ *
+ * @return whether or not to use older Bookkeeper wire protocol (no protobuf)
+ */
+ public boolean getUseV2WireProtocol() {
+ return getBoolean(USE_V2_WIRE_PROTOCOL, false);
+ }
+
+ /**
+ * Set whether or not to use older Bookkeeper wire protocol (no protobuf)
+ *
+ * @param useV2WireProtocol
+ * whether or not to use older Bookkeeper wire protocol (no protobuf)
+ * @return client configuration.
+ */
+ public ClientConfiguration setUseV2WireProtocol(boolean useV2WireProtocol) {
+ setProperty(USE_V2_WIRE_PROTOCOL, useV2WireProtocol);
+ return this;
+ }
+
+ /**
* Get zookeeper servers to connect
*
* @return zookeeper servers
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9001e300/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index 75dced5..d2608e7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -322,6 +322,14 @@ class AuthHandler {
} else {
waitingForAuth.add(e);
}
+ } else if (e.getMessage() instanceof BookieProtocol.Request) {
+ // let auth messages through, queue the rest
+ BookieProtocol.Request req = (BookieProtocol.Request)e.getMessage();
+ if (BookkeeperProtocol.OperationType.AUTH.getNumber() == req.getOpCode()) {
+ super.writeRequested(ctx, e);
+ } else {
+ waitingForAuth.add(e);
+ }
} // else just drop
}
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9001e300/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 2ce5ed8..1191d3c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -140,6 +140,9 @@ public interface BookieProtocol {
* by the auth providers themselves.
*/
public static final byte AUTH = 3;
+ public static final byte READ_LAC = 4;
+ public static final byte WRITE_LAC = 5;
+ public static final byte GET_BOOKIE_INFO = 6;
/**
* The error code that indicates success
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9001e300/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index f6e9e8f..a24bb1e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -21,8 +21,10 @@ import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Collections;
+import java.util.Collection;
import java.util.Queue;
import java.util.Set;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -98,7 +100,7 @@ import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistry;
import java.net.SocketAddress;
-import java.util.Collection;
+
import org.apache.bookkeeper.auth.BookKeeperPrincipal;
import org.jboss.netty.channel.ChannelFactory;
import org.apache.bookkeeper.client.ClientConnectionPeer;
@@ -147,6 +149,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
private final OpStatsLogger getBookieInfoOpLogger;
private final OpStatsLogger getBookieInfoTimeoutOpLogger;
+ private final boolean useV2WireProtocol;
+
/**
* The following member variables do not need to be concurrent, or volatile
* because they are always updated under a lock
@@ -202,6 +206,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
this.addEntryTimeout = conf.getAddEntryTimeout();
this.readEntryTimeout = conf.getReadEntryTimeout();
this.getBookieInfoTimeout = conf.getBookieInfoTimeout();
+ this.useV2WireProtocol = conf.getUseV2WireProtocol();
this.authProviderFactory = authProviderFactory;
this.extRegistry = extRegistry;
@@ -469,33 +474,45 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
*/
void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, WriteCallback cb,
Object ctx, final int options) {
- final long txnId = getTxnId();
- final int entrySize = toSend.readableBytes();
- final CompletionKey completionKey = new CompletionKey(txnId, OperationType.ADD_ENTRY);
- completionObjects.put(completionKey,
- new AddCompletion(this, addEntryOpLogger, cb, ctx, ledgerId, entryId,
- scheduleTimeout(completionKey, addEntryTimeout)));
-
- // Build the request and calculate the total size to be included in the packet.
- BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
- .setVersion(ProtocolVersion.VERSION_THREE)
- .setOperation(OperationType.ADD_ENTRY)
- .setTxnId(txnId);
+ Object request = null;
+ CompletionKey completion = null;
+ if (useV2WireProtocol) {
+ completion = new V2CompletionKey(ledgerId, entryId, OperationType.ADD_ENTRY);
+ request = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
+ (short) options, masterKey, toSend);
- AddRequest.Builder addBuilder = AddRequest.newBuilder()
- .setLedgerId(ledgerId)
- .setEntryId(entryId)
- .setMasterKey(ByteString.copyFrom(masterKey))
- .setBody(ByteString.copyFrom(toSend.toByteBuffer()));
- if (((short)options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) {
- addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
+ } else {
+ final long txnId = getTxnId();
+ completion = new CompletionKey(txnId, OperationType.ADD_ENTRY);
+ // Build the request and calculate the total size to be included in the packet.
+ BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.ADD_ENTRY)
+ .setTxnId(txnId);
+
+ AddRequest.Builder addBuilder = AddRequest.newBuilder()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId)
+ .setMasterKey(ByteString.copyFrom(masterKey))
+ .setBody(ByteString.copyFrom(toSend.toByteBuffer()));
+
+ if (((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) {
+ addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
+ }
+ request = Request.newBuilder()
+ .setHeader(headerBuilder)
+ .setAddRequest(addBuilder)
+ .build();
}
- final Request addRequest = Request.newBuilder()
- .setHeader(headerBuilder)
- .setAddRequest(addBuilder)
- .build();
+ final Object addRequest = request;
+ final CompletionKey completionKey = completion;
+
+ completionObjects.put(completionKey, new AddCompletion(this,
+ addEntryOpLogger, cb, ctx, ledgerId, entryId, scheduleTimeout(completion, addEntryTimeout)));
+
+ final int entrySize = toSend.readableBytes();
final Channel c = channel;
if (c == null) {
@@ -531,28 +548,44 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
public void readEntryAndFenceLedger(final long ledgerId, byte[] masterKey,
final long entryId,
ReadEntryCallback cb, Object ctx) {
- final long txnId = getTxnId();
- final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
- completionObjects.put(completionKey,
- new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
- scheduleTimeout(completionKey, readEntryTimeout)));
-
- // Build the request and calculate the total size to be included in the packet.
- BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
- .setVersion(ProtocolVersion.VERSION_THREE)
- .setOperation(OperationType.READ_ENTRY)
- .setTxnId(txnId);
-
- ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
- .setLedgerId(ledgerId)
- .setEntryId(entryId)
- .setMasterKey(ByteString.copyFrom(masterKey))
- .setFlag(ReadRequest.Flag.FENCE_LEDGER);
-
- final Request readRequest = Request.newBuilder()
- .setHeader(headerBuilder)
- .setReadRequest(readBuilder)
- .build();
+ Object request = null;
+ CompletionKey completion = null;
+ if (useV2WireProtocol) {
+ completion = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
+ request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
+ BookieProtocol.FLAG_DO_FENCING, masterKey);
+ } else {
+ final long txnId = getTxnId();
+ final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
+ completionObjects.put(completionKey,
+ new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
+ scheduleTimeout(completionKey, readEntryTimeout)));
+
+ // Build the request and calculate the total size to be included in the packet.
+ BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.READ_ENTRY)
+ .setTxnId(txnId);
+
+ ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId)
+ .setMasterKey(ByteString.copyFrom(masterKey))
+ .setFlag(ReadRequest.Flag.FENCE_LEDGER);
+
+ final Request readRequest = Request.newBuilder()
+ .setHeader(headerBuilder)
+ .setReadRequest(readBuilder)
+ .build();
+ }
+
+ final CompletionKey completionKey = completion;
+ if (completionObjects.putIfAbsent(completionKey, new ReadCompletion(this, readEntryOpLogger, cb,
+ ctx, ledgerId, entryId, scheduleTimeout(completionKey, readEntryTimeout))) != null) {
+ // We cannot have more than 1 pending read on the same ledger/entry in the v2 protocol
+ cb.readEntryComplete(BKException.Code.BookieHandleNotAvailableException, ledgerId, entryId, null, ctx);
+ return;
+ }
final Channel c = channel;
if (c == null) {
@@ -560,6 +593,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
return;
}
+ final Object readRequest = request;
try {
ChannelFuture future = c.write(readRequest);
future.addListener(new ChannelFutureListener() {
@@ -586,22 +620,34 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
}
public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
- final long txnId = getTxnId();
- final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_LAC);
+ Object request = null;
+ CompletionKey completion = null;
+ if (useV2WireProtocol) {
+ request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
+ ledgerId, (long) 0, (short) 0);
+ completion = new V2CompletionKey(ledgerId, (long) 0, OperationType.READ_LAC);
+ } else {
+ final long txnId = getTxnId();
+ final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_LAC);
+
+ // Build the request and calculate the total size to be included in the packet.
+ BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.READ_LAC)
+ .setTxnId(txnId);
+ ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder()
+ .setLedgerId(ledgerId);
+ request = Request.newBuilder()
+ .setHeader(headerBuilder)
+ .setReadLacRequest(readLacBuilder)
+ .build();
+ }
+ final Object readLacRequest = request;
+ final CompletionKey completionKey = completion;
+
completionObjects.put(completionKey,
new ReadLacCompletion(readLacOpLogger, cb, ctx, ledgerId,
scheduleTimeout(completionKey, readEntryTimeout)));
- // Build the request and calculate the total size to be included in the packet.
- BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
- .setVersion(ProtocolVersion.VERSION_THREE)
- .setOperation(OperationType.READ_LAC)
- .setTxnId(txnId);
- ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder()
- .setLedgerId(ledgerId);
- final Request readLacRequest = Request.newBuilder()
- .setHeader(headerBuilder)
- .setReadLacRequest(readLacBuilder)
- .build();
final Channel c = channel;
if (c == null) {
errorOutReadLacKey(completionKey);
@@ -632,27 +678,37 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
}
public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) {
- final long txnId = getTxnId();
- final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
- completionObjects.put(completionKey,
- new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
- scheduleTimeout(completionKey, readEntryTimeout)));
+ Object request = null;
+ CompletionKey completion = null;
+ if (useV2WireProtocol) {
+ request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
+ ledgerId, entryId, (short) 0);
+ completion = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
+ } else {
+ final long txnId = getTxnId();
+ completion = new CompletionKey(txnId, OperationType.READ_ENTRY);
- // Build the request and calculate the total size to be included in the packet.
- BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
- .setVersion(ProtocolVersion.VERSION_THREE)
- .setOperation(OperationType.READ_ENTRY)
- .setTxnId(txnId);
+ // Build the request and calculate the total size to be included in the packet.
+ BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.READ_ENTRY)
+ .setTxnId(txnId);
- ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
- .setLedgerId(ledgerId)
- .setEntryId(entryId);
+ ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId);
- final Request readRequest = Request.newBuilder()
- .setHeader(headerBuilder)
- .setReadRequest(readBuilder)
- .build();
+ final Request readRequest = Request.newBuilder()
+ .setHeader(headerBuilder)
+ .setReadRequest(readBuilder)
+ .build();
+ }
+ final Object readRequest = request;
+ final CompletionKey completionKey = completion;
+ completionObjects.put(completionKey,
+ new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
+ scheduleTimeout(completionKey, readEntryTimeout)));
final Channel c = channel;
if (c == null) {
errorOutReadKey(completionKey);
@@ -803,6 +859,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
}
void errorOutReadKey(final CompletionKey key, final int rc) {
+ LOG.info("Removing completion key: {}", key);
final ReadCompletion readCompletion = (ReadCompletion)completionObjects.remove(key);
if (null == readCompletion) {
return;
@@ -835,6 +892,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
}
void errorOutWriteLacKey(final CompletionKey key, final int rc) {
+ LOG.info("Removing completion key: {}", key);
final WriteLacCompletion writeLacCompletion = (WriteLacCompletion)completionObjects.remove(key);
if (null == writeLacCompletion) {
return;
@@ -859,6 +917,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
}
void errorOutReadLacKey(final CompletionKey key, final int rc) {
+ LOG.info("Removing completion key: {}", key);
final ReadLacCompletion readLacCompletion = (ReadLacCompletion)completionObjects.remove(key);
if (null == readLacCompletion) {
return;
@@ -1058,12 +1117,101 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- if (!(e.getMessage() instanceof Response)) {
+
+ if (e.getMessage() instanceof BookieProtocol.Response) {
+ BookieProtocol.Response response = (BookieProtocol.Response) e.getMessage();
+ readV2Response(response);
+ } else if (e.getMessage() instanceof Response) {
+ Response response = (Response) e.getMessage();
+ readV3Response(response);
+ } else {
ctx.sendUpstream(e);
- return;
}
+ }
+
+ private void readV2Response(final BookieProtocol.Response response) {
+ final long ledgerId = response.ledgerId;
+ final long entryId = response.entryId;
+
+ final OperationType operationType = getOperationType(response.getOpCode());
+ final StatusCode status = getStatusCodeFromErrorCode(response.errorCode);
+
+ final CompletionValue completionValue = completionObjects.remove(new V2CompletionKey(ledgerId, entryId, operationType));
- final Response response = (Response) e.getMessage();
+ if (null == completionValue) {
+ // Unexpected response, so log it. The txnId should have been present.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unexpected response received from bookie : " + addr + " for type : " + operationType
+ + " and ledger:entry : " + ledgerId + ":" + entryId);
+ }
+ } else {
+ long orderingKey = completionValue.ledgerId;
+
+ executor.submitOrdered(orderingKey, new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ switch (operationType) {
+ case ADD_ENTRY: {
+ handleAddResponse(ledgerId, entryId, status, completionValue);
+ }
+ case READ_ENTRY: {
+ BookieProtocol.ReadResponse readResponse = (BookieProtocol.ReadResponse) response;
+ handleReadResponse(ledgerId, entryId, status, readResponse.getData(), completionValue);
+ }
+ default:
+ LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring", operationType, addr);
+ break;
+ }
+ }
+ });
+ }
+ }
+
+ private StatusCode getStatusCodeFromErrorCode(int errorCode) {
+ switch (errorCode) {
+ case BookieProtocol.EOK:
+ return StatusCode.EOK;
+ case BookieProtocol.ENOLEDGER:
+ return StatusCode.ENOLEDGER;
+ case BookieProtocol.ENOENTRY:
+ return StatusCode.ENOENTRY;
+ case BookieProtocol.EBADREQ:
+ return StatusCode.EBADREQ;
+ case BookieProtocol.EIO:
+ return StatusCode.EIO;
+ case BookieProtocol.EUA:
+ return StatusCode.EUA;
+ case BookieProtocol.EBADVERSION:
+ return StatusCode.EBADVERSION;
+ case BookieProtocol.EFENCED:
+ return StatusCode.EFENCED;
+ case BookieProtocol.EREADONLY:
+ return StatusCode.EREADONLY;
+ default:
+ throw new IllegalArgumentException("Invalid error code: " + errorCode);
+ }
+ }
+
+ private OperationType getOperationType(byte opCode) {
+ switch (opCode) {
+ case BookieProtocol.ADDENTRY:
+ return OperationType.ADD_ENTRY;
+ case BookieProtocol.READENTRY:
+ return OperationType.READ_ENTRY;
+ case BookieProtocol.AUTH:
+ return OperationType.AUTH;
+ case BookieProtocol.READ_LAC:
+ return OperationType.READ_LAC;
+ case BookieProtocol.WRITE_LAC:
+ return OperationType.WRITE_LAC;
+ case BookieProtocol.GET_BOOKIE_INFO:
+ return OperationType.GET_BOOKIE_INFO;
+ default:
+ throw new IllegalArgumentException("Invalid operation type");
+ }
+ }
+
+ private void readV3Response(final Response response) {
final BKPacketHeader header = response.getHeader();
final CompletionValue completionValue = completionObjects.remove(newCompletionKey(header.getTxnId(),
@@ -1082,21 +1230,51 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
public void safeRun() {
OperationType type = header.getOperation();
switch (type) {
- case ADD_ENTRY:
- handleAddResponse(response, completionValue);
+ case ADD_ENTRY: {
+ AddResponse addResponse = response.getAddResponse();
+ StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus();
+ handleAddResponse(addResponse.getLedgerId(), addResponse.getEntryId(), status, completionValue);
break;
- case READ_ENTRY:
- handleReadResponse(response, completionValue);
+ }
+ case READ_ENTRY: {
+ ReadResponse readResponse = response.getReadResponse();
+ StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus();
+ ChannelBuffer buffer = ChannelBuffers.buffer(0);
+ if (readResponse.hasBody()) {
+ buffer = ChannelBuffers.copiedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
+ }
+ handleReadResponse(readResponse.getLedgerId(), readResponse.getEntryId(), status, buffer, completionValue);
break;
- case WRITE_LAC:
- handleWriteLacResponse(response.getWriteLacResponse(), completionValue);
+ }
+ case WRITE_LAC: {
+ WriteLacResponse writeLacResponse = response.getWriteLacResponse();
+ StatusCode status = response.getStatus() == StatusCode.EOK ? writeLacResponse.getStatus() : response.getStatus();
+ handleWriteLacResponse(writeLacResponse.getLedgerId(), status, completionValue);
break;
- case READ_LAC:
- handleReadLacResponse(response.getReadLacResponse(), completionValue);
+ }
+ case READ_LAC: {
+ ReadLacResponse readLacResponse = response.getReadLacResponse();
+ ChannelBuffer lacBuffer = ChannelBuffers.buffer(0);
+ ChannelBuffer lastEntryBuffer = ChannelBuffers.buffer(0);
+ StatusCode status = response.getStatus() == StatusCode.EOK ? readLacResponse.getStatus() : response.getStatus();
+ // Thread.dumpStack();
+
+ if (readLacResponse.hasLacBody()) {
+ lacBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
+ }
+
+ if (readLacResponse.hasLastEntryBody()) {
+ lastEntryBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
+ }
+ handleReadLacResponse(readLacResponse.getLedgerId(), status, lacBuffer, lastEntryBuffer, completionValue);
break;
- case GET_BOOKIE_INFO:
- handleGetBookieInfoResponse(response, completionValue);
+ }
+ case GET_BOOKIE_INFO: {
+ GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse();
+ StatusCode status = response.getStatus() == StatusCode.EOK ? getBookieInfoResponse.getStatus() : response.getStatus();
+ handleGetBookieInfoResponse(getBookieInfoResponse.getFreeDiskSpace(), getBookieInfoResponse.getTotalDiskCapacity(), status, completionValue);
break;
+ }
default:
LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring",
type, addr);
@@ -1114,13 +1292,10 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
}
}
- void handleWriteLacResponse(WriteLacResponse writeLacResponse, CompletionValue completionValue) {
+ void handleWriteLacResponse(long ledgerId, StatusCode status, CompletionValue completionValue) {
// The completion value should always be an instance of an WriteLacCompletion object when we reach here.
WriteLacCompletion plc = (WriteLacCompletion)completionValue;
- long ledgerId = writeLacResponse.getLedgerId();
- StatusCode status = writeLacResponse.getStatus();
-
LOG.debug("Got response for writeLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status);
// convert to BKException code
@@ -1133,14 +1308,9 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
plc.cb.writeLacComplete(rcToRet, ledgerId, addr, plc.ctx);
}
- void handleAddResponse(Response response, CompletionValue completionValue) {
+ void handleAddResponse(long ledgerId, long entryId, StatusCode status, CompletionValue completionValue) {
// The completion value should always be an instance of an AddCompletion object when we reach here.
AddCompletion ac = (AddCompletion)completionValue;
- AddResponse addResponse = response.getAddResponse();
-
- long ledgerId = addResponse.getLedgerId();
- long entryId = addResponse.getEntryId();
- StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus();
if (LOG.isDebugEnabled()) {
LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
@@ -1160,25 +1330,10 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx);
}
- void handleReadLacResponse(ReadLacResponse readLacResponse, CompletionValue completionValue) {
+ void handleReadLacResponse(long ledgerId, StatusCode status, ChannelBuffer lacBuffer, ChannelBuffer lastEntryBuffer, CompletionValue completionValue) {
// The completion value should always be an instance of an WriteLacCompletion object when we reach here.
ReadLacCompletion glac = (ReadLacCompletion)completionValue;
- long ledgerId = readLacResponse.getLedgerId();
- StatusCode status = readLacResponse.getStatus();
- ChannelBuffer lacBuffer = ChannelBuffers.buffer(0);
- ChannelBuffer lastEntryBuffer = ChannelBuffers.buffer(0);
-
- // Thread.dumpStack();
-
- if (readLacResponse.hasLacBody()) {
- lacBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
- }
-
- if (readLacResponse.hasLastEntryBody()) {
- lastEntryBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
- }
-
LOG.debug("Got response for readLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status);
// convert to BKException code
Integer rcToRet = statusCodeToExceptionCode(status);
@@ -1190,20 +1345,10 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
glac.cb.readLacComplete(rcToRet, ledgerId, lacBuffer.slice(), lastEntryBuffer.slice(), glac.ctx);
}
- void handleReadResponse(Response response, CompletionValue completionValue) {
+ void handleReadResponse(long ledgerId, long entryId, StatusCode status, ChannelBuffer buffer, CompletionValue completionValue) {
// The completion value should always be an instance of a ReadCompletion object when we reach here.
ReadCompletion rc = (ReadCompletion)completionValue;
- ReadResponse readResponse = response.getReadResponse();
-
- long ledgerId = readResponse.getLedgerId();
- long entryId = readResponse.getEntryId();
- StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus();
- ChannelBuffer buffer = ChannelBuffers.buffer(0);
-
- if (readResponse.hasBody()) {
- buffer = ChannelBuffers.copiedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
- }
if (LOG.isDebugEnabled()) {
LOG.debug("Got response for read request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
@@ -1222,15 +1367,9 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer.slice(), rc.ctx);
}
- void handleGetBookieInfoResponse(Response response, CompletionValue completionValue) {
+ void handleGetBookieInfoResponse(long freeDiskSpace, long totalDiskCapacity, StatusCode status, CompletionValue completionValue) {
// The completion value should always be an instance of a GetBookieInfoCompletion object when we reach here.
GetBookieInfoCompletion rc = (GetBookieInfoCompletion)completionValue;
- GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse();
-
- long freeDiskSpace = getBookieInfoResponse.hasFreeDiskSpace() ? getBookieInfoResponse.getFreeDiskSpace() : 0L;
- long totalDiskCapacity = getBookieInfoResponse.hasTotalDiskCapacity() ? getBookieInfoResponse.getTotalDiskCapacity() : 0L;
-
- StatusCode status = response.getStatus() == StatusCode.EOK ? getBookieInfoResponse.getStatus() : response.getStatus();
if (LOG.isDebugEnabled()) {
LOG.debug("Got response for read metadata request from bookie: {} rc {}", addr, rc);
@@ -1556,4 +1695,35 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
return txnIdGenerator.incrementAndGet();
}
+ private class V2CompletionKey extends CompletionKey {
+ final long ledgerId;
+ final long entryId;
+
+
+ public V2CompletionKey(long ledgerId, long entryId, OperationType operationType) {
+ super(0, operationType);
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
+
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (!(object instanceof V2CompletionKey)) {
+ return false;
+ }
+ V2CompletionKey that = (V2CompletionKey) object;
+ return this.entryId == that.entryId && this.ledgerId == that.ledgerId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(ledgerId, entryId);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%d:%d %s", ledgerId, entryId, operationType);
+ }
+ }
}