You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/04/04 01:22:11 UTC
bookkeeper git commit: BOOKKEEPER-1018: Revert ": Allow client to select older V2 protocol (no…
Repository: bookkeeper
Updated Branches:
refs/heads/master 9001e300c -> f30f60889
BOOKKEEPER-1018: Revert ": Allow client to select older V2 protocol (no\u2026
\u2026 protobuf)"
This reverts commit 9001e300ce0d5d2655d437e3eaa52f91487caed6.
I broke trunk - not exactly sure how - I will fix it and put up the PR again. For now I'm reverting the commit.
Author: Govind Menon <go...@gmail.com>
Reviewers: Sijie Guo <si...@apache.org>
Closes #124 from govind-menon/BOOKKEEPER-1018-Revert
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/f30f6088
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/f30f6088
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/f30f6088
Branch: refs/heads/master
Commit: f30f60889e0810a47797daf4107c0d9bc2ee998c
Parents: 9001e30
Author: Govind Menon <go...@gmail.com>
Authored: Mon Apr 3 18:22:07 2017 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Mon Apr 3 18:22:07 2017 -0700
----------------------------------------------------------------------
.../bookkeeper/conf/ClientConfiguration.java | 23 +-
.../apache/bookkeeper/proto/AuthHandler.java | 8 -
.../apache/bookkeeper/proto/BookieProtocol.java | 3 -
.../proto/PerChannelBookieClient.java | 436 ++++++-------------
4 files changed, 134 insertions(+), 336 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/f30f6088/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 2b75e9e..ee137c0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -57,7 +57,7 @@ public class ClientConfiguration extends AbstractConfiguration {
protected final static String CLIENT_WRITEBUFFER_HIGH_WATER_MARK = "clientWriteBufferHighWaterMark";
protected final static String CLIENT_CONNECT_TIMEOUT_MILLIS = "clientConnectTimeoutMillis";
protected final static String NUM_CHANNELS_PER_BOOKIE = "numChannelsPerBookie";
- protected final static String USE_V2_WIRE_PROTOCOL = "useV2WireProtocol";
+
// Read Parameters
protected final static String READ_TIMEOUT = "readTimeout";
protected final static String SPECULATIVE_READ_TIMEOUT = "speculativeReadTimeout";
@@ -436,27 +436,6 @@ public class ClientConfiguration extends AbstractConfiguration {
}
/**
- * Use older Bookkeeper wire protocol (no protobuf)
- *
- * @return whether or not to use older Bookkeeper wire protocol (no protobuf)
- */
- public boolean getUseV2WireProtocol() {
- return getBoolean(USE_V2_WIRE_PROTOCOL, false);
- }
-
- /**
- * Set whether or not to use older Bookkeeper wire protocol (no protobuf)
- *
- * @param useV2WireProtocol
- * whether or not to use older Bookkeeper wire protocol (no protobuf)
- * @return client configuration.
- */
- public ClientConfiguration setUseV2WireProtocol(boolean useV2WireProtocol) {
- setProperty(USE_V2_WIRE_PROTOCOL, useV2WireProtocol);
- return this;
- }
-
- /**
* Get zookeeper servers to connect
*
* @return zookeeper servers
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/f30f6088/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index d2608e7..75dced5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -322,14 +322,6 @@ class AuthHandler {
} else {
waitingForAuth.add(e);
}
- } else if (e.getMessage() instanceof BookieProtocol.Request) {
- // let auth messages through, queue the rest
- BookieProtocol.Request req = (BookieProtocol.Request)e.getMessage();
- if (BookkeeperProtocol.OperationType.AUTH.getNumber() == req.getOpCode()) {
- super.writeRequested(ctx, e);
- } else {
- waitingForAuth.add(e);
- }
} // else just drop
}
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/f30f6088/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 1191d3c..2ce5ed8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -140,9 +140,6 @@ public interface BookieProtocol {
* by the auth providers themselves.
*/
public static final byte AUTH = 3;
- public static final byte READ_LAC = 4;
- public static final byte WRITE_LAC = 5;
- public static final byte GET_BOOKIE_INFO = 6;
/**
* The error code that indicates success
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/f30f6088/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index a24bb1e..f6e9e8f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -21,10 +21,8 @@ import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Collections;
-import java.util.Collection;
import java.util.Queue;
import java.util.Set;
-import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -100,7 +98,7 @@ import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistry;
import java.net.SocketAddress;
-
+import java.util.Collection;
import org.apache.bookkeeper.auth.BookKeeperPrincipal;
import org.jboss.netty.channel.ChannelFactory;
import org.apache.bookkeeper.client.ClientConnectionPeer;
@@ -149,8 +147,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
private final OpStatsLogger getBookieInfoOpLogger;
private final OpStatsLogger getBookieInfoTimeoutOpLogger;
- private final boolean useV2WireProtocol;
-
/**
* The following member variables do not need to be concurrent, or volatile
* because they are always updated under a lock
@@ -206,7 +202,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
this.addEntryTimeout = conf.getAddEntryTimeout();
this.readEntryTimeout = conf.getReadEntryTimeout();
this.getBookieInfoTimeout = conf.getBookieInfoTimeout();
- this.useV2WireProtocol = conf.getUseV2WireProtocol();
this.authProviderFactory = authProviderFactory;
this.extRegistry = extRegistry;
@@ -474,45 +469,33 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
*/
void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, WriteCallback cb,
Object ctx, final int options) {
- Object request = null;
- CompletionKey completion = null;
- if (useV2WireProtocol) {
- completion = new V2CompletionKey(ledgerId, entryId, OperationType.ADD_ENTRY);
- request = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
- (short) options, masterKey, toSend);
-
+ final long txnId = getTxnId();
+ final int entrySize = toSend.readableBytes();
+ final CompletionKey completionKey = new CompletionKey(txnId, OperationType.ADD_ENTRY);
+ completionObjects.put(completionKey,
+ new AddCompletion(this, addEntryOpLogger, cb, ctx, ledgerId, entryId,
+ scheduleTimeout(completionKey, addEntryTimeout)));
- } else {
- final long txnId = getTxnId();
- completion = new CompletionKey(txnId, OperationType.ADD_ENTRY);
- // Build the request and calculate the total size to be included in the packet.
- BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
- .setVersion(ProtocolVersion.VERSION_THREE)
- .setOperation(OperationType.ADD_ENTRY)
- .setTxnId(txnId);
-
- AddRequest.Builder addBuilder = AddRequest.newBuilder()
- .setLedgerId(ledgerId)
- .setEntryId(entryId)
- .setMasterKey(ByteString.copyFrom(masterKey))
- .setBody(ByteString.copyFrom(toSend.toByteBuffer()));
-
- if (((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) {
- addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
- }
- request = Request.newBuilder()
- .setHeader(headerBuilder)
- .setAddRequest(addBuilder)
- .build();
- }
+ // Build the request and calculate the total size to be included in the packet.
+ BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.ADD_ENTRY)
+ .setTxnId(txnId);
- final Object addRequest = request;
- final CompletionKey completionKey = completion;
+ AddRequest.Builder addBuilder = AddRequest.newBuilder()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId)
+ .setMasterKey(ByteString.copyFrom(masterKey))
+ .setBody(ByteString.copyFrom(toSend.toByteBuffer()));
- completionObjects.put(completionKey, new AddCompletion(this,
- addEntryOpLogger, cb, ctx, ledgerId, entryId, scheduleTimeout(completion, addEntryTimeout)));
+ if (((short)options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) {
+ addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
+ }
- final int entrySize = toSend.readableBytes();
+ final Request addRequest = Request.newBuilder()
+ .setHeader(headerBuilder)
+ .setAddRequest(addBuilder)
+ .build();
final Channel c = channel;
if (c == null) {
@@ -548,44 +531,28 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
public void readEntryAndFenceLedger(final long ledgerId, byte[] masterKey,
final long entryId,
ReadEntryCallback cb, Object ctx) {
- Object request = null;
- CompletionKey completion = null;
- if (useV2WireProtocol) {
- completion = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
- request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
- BookieProtocol.FLAG_DO_FENCING, masterKey);
- } else {
- final long txnId = getTxnId();
- final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
- completionObjects.put(completionKey,
- new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
- scheduleTimeout(completionKey, readEntryTimeout)));
-
- // Build the request and calculate the total size to be included in the packet.
- BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
- .setVersion(ProtocolVersion.VERSION_THREE)
- .setOperation(OperationType.READ_ENTRY)
- .setTxnId(txnId);
-
- ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
- .setLedgerId(ledgerId)
- .setEntryId(entryId)
- .setMasterKey(ByteString.copyFrom(masterKey))
- .setFlag(ReadRequest.Flag.FENCE_LEDGER);
-
- final Request readRequest = Request.newBuilder()
- .setHeader(headerBuilder)
- .setReadRequest(readBuilder)
- .build();
- }
-
- final CompletionKey completionKey = completion;
- if (completionObjects.putIfAbsent(completionKey, new ReadCompletion(this, readEntryOpLogger, cb,
- ctx, ledgerId, entryId, scheduleTimeout(completionKey, readEntryTimeout))) != null) {
- // We cannot have more than 1 pending read on the same ledger/entry in the v2 protocol
- cb.readEntryComplete(BKException.Code.BookieHandleNotAvailableException, ledgerId, entryId, null, ctx);
- return;
- }
+ final long txnId = getTxnId();
+ final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
+ completionObjects.put(completionKey,
+ new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
+ scheduleTimeout(completionKey, readEntryTimeout)));
+
+ // Build the request and calculate the total size to be included in the packet.
+ BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.READ_ENTRY)
+ .setTxnId(txnId);
+
+ ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId)
+ .setMasterKey(ByteString.copyFrom(masterKey))
+ .setFlag(ReadRequest.Flag.FENCE_LEDGER);
+
+ final Request readRequest = Request.newBuilder()
+ .setHeader(headerBuilder)
+ .setReadRequest(readBuilder)
+ .build();
final Channel c = channel;
if (c == null) {
@@ -593,7 +560,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
return;
}
- final Object readRequest = request;
try {
ChannelFuture future = c.write(readRequest);
future.addListener(new ChannelFutureListener() {
@@ -620,34 +586,22 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
}
public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
- Object request = null;
- CompletionKey completion = null;
- if (useV2WireProtocol) {
- request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
- ledgerId, (long) 0, (short) 0);
- completion = new V2CompletionKey(ledgerId, (long) 0, OperationType.READ_LAC);
- } else {
- final long txnId = getTxnId();
- final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_LAC);
-
- // Build the request and calculate the total size to be included in the packet.
- BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
- .setVersion(ProtocolVersion.VERSION_THREE)
- .setOperation(OperationType.READ_LAC)
- .setTxnId(txnId);
- ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder()
- .setLedgerId(ledgerId);
- request = Request.newBuilder()
- .setHeader(headerBuilder)
- .setReadLacRequest(readLacBuilder)
- .build();
- }
- final Object readLacRequest = request;
- final CompletionKey completionKey = completion;
-
+ final long txnId = getTxnId();
+ final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_LAC);
completionObjects.put(completionKey,
new ReadLacCompletion(readLacOpLogger, cb, ctx, ledgerId,
scheduleTimeout(completionKey, readEntryTimeout)));
+ // Build the request and calculate the total size to be included in the packet.
+ BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.READ_LAC)
+ .setTxnId(txnId);
+ ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder()
+ .setLedgerId(ledgerId);
+ final Request readLacRequest = Request.newBuilder()
+ .setHeader(headerBuilder)
+ .setReadLacRequest(readLacBuilder)
+ .build();
final Channel c = channel;
if (c == null) {
errorOutReadLacKey(completionKey);
@@ -678,37 +632,27 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
}
public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) {
- Object request = null;
- CompletionKey completion = null;
- if (useV2WireProtocol) {
- request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
- ledgerId, entryId, (short) 0);
- completion = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
- } else {
- final long txnId = getTxnId();
- completion = new CompletionKey(txnId, OperationType.READ_ENTRY);
+ final long txnId = getTxnId();
+ final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
+ completionObjects.put(completionKey,
+ new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
+ scheduleTimeout(completionKey, readEntryTimeout)));
- // Build the request and calculate the total size to be included in the packet.
- BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
- .setVersion(ProtocolVersion.VERSION_THREE)
- .setOperation(OperationType.READ_ENTRY)
- .setTxnId(txnId);
+ // Build the request and calculate the total size to be included in the packet.
+ BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.READ_ENTRY)
+ .setTxnId(txnId);
- ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
- .setLedgerId(ledgerId)
- .setEntryId(entryId);
+ ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId);
- final Request readRequest = Request.newBuilder()
- .setHeader(headerBuilder)
- .setReadRequest(readBuilder)
- .build();
- }
- final Object readRequest = request;
- final CompletionKey completionKey = completion;
+ final Request readRequest = Request.newBuilder()
+ .setHeader(headerBuilder)
+ .setReadRequest(readBuilder)
+ .build();
- completionObjects.put(completionKey,
- new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
- scheduleTimeout(completionKey, readEntryTimeout)));
final Channel c = channel;
if (c == null) {
errorOutReadKey(completionKey);
@@ -859,7 +803,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
}
void errorOutReadKey(final CompletionKey key, final int rc) {
- LOG.info("Removing completion key: {}", key);
final ReadCompletion readCompletion = (ReadCompletion)completionObjects.remove(key);
if (null == readCompletion) {
return;
@@ -892,7 +835,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
}
void errorOutWriteLacKey(final CompletionKey key, final int rc) {
- LOG.info("Removing completion key: {}", key);
final WriteLacCompletion writeLacCompletion = (WriteLacCompletion)completionObjects.remove(key);
if (null == writeLacCompletion) {
return;
@@ -917,7 +859,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
}
void errorOutReadLacKey(final CompletionKey key, final int rc) {
- LOG.info("Removing completion key: {}", key);
final ReadLacCompletion readLacCompletion = (ReadLacCompletion)completionObjects.remove(key);
if (null == readLacCompletion) {
return;
@@ -1117,101 +1058,12 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-
- if (e.getMessage() instanceof BookieProtocol.Response) {
- BookieProtocol.Response response = (BookieProtocol.Response) e.getMessage();
- readV2Response(response);
- } else if (e.getMessage() instanceof Response) {
- Response response = (Response) e.getMessage();
- readV3Response(response);
- } else {
+ if (!(e.getMessage() instanceof Response)) {
ctx.sendUpstream(e);
+ return;
}
- }
-
- private void readV2Response(final BookieProtocol.Response response) {
- final long ledgerId = response.ledgerId;
- final long entryId = response.entryId;
-
- final OperationType operationType = getOperationType(response.getOpCode());
- final StatusCode status = getStatusCodeFromErrorCode(response.errorCode);
-
- final CompletionValue completionValue = completionObjects.remove(new V2CompletionKey(ledgerId, entryId, operationType));
-
- if (null == completionValue) {
- // Unexpected response, so log it. The txnId should have been present.
- if (LOG.isDebugEnabled()) {
- LOG.debug("Unexpected response received from bookie : " + addr + " for type : " + operationType
- + " and ledger:entry : " + ledgerId + ":" + entryId);
- }
- } else {
- long orderingKey = completionValue.ledgerId;
-
- executor.submitOrdered(orderingKey, new SafeRunnable() {
- @Override
- public void safeRun() {
- switch (operationType) {
- case ADD_ENTRY: {
- handleAddResponse(ledgerId, entryId, status, completionValue);
- }
- case READ_ENTRY: {
- BookieProtocol.ReadResponse readResponse = (BookieProtocol.ReadResponse) response;
- handleReadResponse(ledgerId, entryId, status, readResponse.getData(), completionValue);
- }
- default:
- LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring", operationType, addr);
- break;
- }
- }
- });
- }
- }
- private StatusCode getStatusCodeFromErrorCode(int errorCode) {
- switch (errorCode) {
- case BookieProtocol.EOK:
- return StatusCode.EOK;
- case BookieProtocol.ENOLEDGER:
- return StatusCode.ENOLEDGER;
- case BookieProtocol.ENOENTRY:
- return StatusCode.ENOENTRY;
- case BookieProtocol.EBADREQ:
- return StatusCode.EBADREQ;
- case BookieProtocol.EIO:
- return StatusCode.EIO;
- case BookieProtocol.EUA:
- return StatusCode.EUA;
- case BookieProtocol.EBADVERSION:
- return StatusCode.EBADVERSION;
- case BookieProtocol.EFENCED:
- return StatusCode.EFENCED;
- case BookieProtocol.EREADONLY:
- return StatusCode.EREADONLY;
- default:
- throw new IllegalArgumentException("Invalid error code: " + errorCode);
- }
- }
-
- private OperationType getOperationType(byte opCode) {
- switch (opCode) {
- case BookieProtocol.ADDENTRY:
- return OperationType.ADD_ENTRY;
- case BookieProtocol.READENTRY:
- return OperationType.READ_ENTRY;
- case BookieProtocol.AUTH:
- return OperationType.AUTH;
- case BookieProtocol.READ_LAC:
- return OperationType.READ_LAC;
- case BookieProtocol.WRITE_LAC:
- return OperationType.WRITE_LAC;
- case BookieProtocol.GET_BOOKIE_INFO:
- return OperationType.GET_BOOKIE_INFO;
- default:
- throw new IllegalArgumentException("Invalid operation type");
- }
- }
-
- private void readV3Response(final Response response) {
+ final Response response = (Response) e.getMessage();
final BKPacketHeader header = response.getHeader();
final CompletionValue completionValue = completionObjects.remove(newCompletionKey(header.getTxnId(),
@@ -1230,51 +1082,21 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
public void safeRun() {
OperationType type = header.getOperation();
switch (type) {
- case ADD_ENTRY: {
- AddResponse addResponse = response.getAddResponse();
- StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus();
- handleAddResponse(addResponse.getLedgerId(), addResponse.getEntryId(), status, completionValue);
+ case ADD_ENTRY:
+ handleAddResponse(response, completionValue);
break;
- }
- case READ_ENTRY: {
- ReadResponse readResponse = response.getReadResponse();
- StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus();
- ChannelBuffer buffer = ChannelBuffers.buffer(0);
- if (readResponse.hasBody()) {
- buffer = ChannelBuffers.copiedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
- }
- handleReadResponse(readResponse.getLedgerId(), readResponse.getEntryId(), status, buffer, completionValue);
+ case READ_ENTRY:
+ handleReadResponse(response, completionValue);
break;
- }
- case WRITE_LAC: {
- WriteLacResponse writeLacResponse = response.getWriteLacResponse();
- StatusCode status = response.getStatus() == StatusCode.EOK ? writeLacResponse.getStatus() : response.getStatus();
- handleWriteLacResponse(writeLacResponse.getLedgerId(), status, completionValue);
+ case WRITE_LAC:
+ handleWriteLacResponse(response.getWriteLacResponse(), completionValue);
break;
- }
- case READ_LAC: {
- ReadLacResponse readLacResponse = response.getReadLacResponse();
- ChannelBuffer lacBuffer = ChannelBuffers.buffer(0);
- ChannelBuffer lastEntryBuffer = ChannelBuffers.buffer(0);
- StatusCode status = response.getStatus() == StatusCode.EOK ? readLacResponse.getStatus() : response.getStatus();
- // Thread.dumpStack();
-
- if (readLacResponse.hasLacBody()) {
- lacBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
- }
-
- if (readLacResponse.hasLastEntryBody()) {
- lastEntryBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
- }
- handleReadLacResponse(readLacResponse.getLedgerId(), status, lacBuffer, lastEntryBuffer, completionValue);
+ case READ_LAC:
+ handleReadLacResponse(response.getReadLacResponse(), completionValue);
break;
- }
- case GET_BOOKIE_INFO: {
- GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse();
- StatusCode status = response.getStatus() == StatusCode.EOK ? getBookieInfoResponse.getStatus() : response.getStatus();
- handleGetBookieInfoResponse(getBookieInfoResponse.getFreeDiskSpace(), getBookieInfoResponse.getTotalDiskCapacity(), status, completionValue);
+ case GET_BOOKIE_INFO:
+ handleGetBookieInfoResponse(response, completionValue);
break;
- }
default:
LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring",
type, addr);
@@ -1292,10 +1114,13 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
}
}
- void handleWriteLacResponse(long ledgerId, StatusCode status, CompletionValue completionValue) {
+ void handleWriteLacResponse(WriteLacResponse writeLacResponse, CompletionValue completionValue) {
// The completion value should always be an instance of an WriteLacCompletion object when we reach here.
WriteLacCompletion plc = (WriteLacCompletion)completionValue;
+ long ledgerId = writeLacResponse.getLedgerId();
+ StatusCode status = writeLacResponse.getStatus();
+
LOG.debug("Got response for writeLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status);
// convert to BKException code
@@ -1308,9 +1133,14 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
plc.cb.writeLacComplete(rcToRet, ledgerId, addr, plc.ctx);
}
- void handleAddResponse(long ledgerId, long entryId, StatusCode status, CompletionValue completionValue) {
+ void handleAddResponse(Response response, CompletionValue completionValue) {
// The completion value should always be an instance of an AddCompletion object when we reach here.
AddCompletion ac = (AddCompletion)completionValue;
+ AddResponse addResponse = response.getAddResponse();
+
+ long ledgerId = addResponse.getLedgerId();
+ long entryId = addResponse.getEntryId();
+ StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus();
if (LOG.isDebugEnabled()) {
LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
@@ -1330,10 +1160,25 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx);
}
- void handleReadLacResponse(long ledgerId, StatusCode status, ChannelBuffer lacBuffer, ChannelBuffer lastEntryBuffer, CompletionValue completionValue) {
+ void handleReadLacResponse(ReadLacResponse readLacResponse, CompletionValue completionValue) {
// The completion value should always be an instance of an WriteLacCompletion object when we reach here.
ReadLacCompletion glac = (ReadLacCompletion)completionValue;
+ long ledgerId = readLacResponse.getLedgerId();
+ StatusCode status = readLacResponse.getStatus();
+ ChannelBuffer lacBuffer = ChannelBuffers.buffer(0);
+ ChannelBuffer lastEntryBuffer = ChannelBuffers.buffer(0);
+
+ // Thread.dumpStack();
+
+ if (readLacResponse.hasLacBody()) {
+ lacBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
+ }
+
+ if (readLacResponse.hasLastEntryBody()) {
+ lastEntryBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
+ }
+
LOG.debug("Got response for readLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status);
// convert to BKException code
Integer rcToRet = statusCodeToExceptionCode(status);
@@ -1345,10 +1190,20 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
glac.cb.readLacComplete(rcToRet, ledgerId, lacBuffer.slice(), lastEntryBuffer.slice(), glac.ctx);
}
- void handleReadResponse(long ledgerId, long entryId, StatusCode status, ChannelBuffer buffer, CompletionValue completionValue) {
+ void handleReadResponse(Response response, CompletionValue completionValue) {
// The completion value should always be an instance of a ReadCompletion object when we reach here.
ReadCompletion rc = (ReadCompletion)completionValue;
+ ReadResponse readResponse = response.getReadResponse();
+
+ long ledgerId = readResponse.getLedgerId();
+ long entryId = readResponse.getEntryId();
+ StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus();
+ ChannelBuffer buffer = ChannelBuffers.buffer(0);
+
+ if (readResponse.hasBody()) {
+ buffer = ChannelBuffers.copiedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Got response for read request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
@@ -1367,9 +1222,15 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer.slice(), rc.ctx);
}
- void handleGetBookieInfoResponse(long freeDiskSpace, long totalDiskCapacity, StatusCode status, CompletionValue completionValue) {
+ void handleGetBookieInfoResponse(Response response, CompletionValue completionValue) {
// The completion value should always be an instance of a GetBookieInfoCompletion object when we reach here.
GetBookieInfoCompletion rc = (GetBookieInfoCompletion)completionValue;
+ GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse();
+
+ long freeDiskSpace = getBookieInfoResponse.hasFreeDiskSpace() ? getBookieInfoResponse.getFreeDiskSpace() : 0L;
+ long totalDiskCapacity = getBookieInfoResponse.hasTotalDiskCapacity() ? getBookieInfoResponse.getTotalDiskCapacity() : 0L;
+
+ StatusCode status = response.getStatus() == StatusCode.EOK ? getBookieInfoResponse.getStatus() : response.getStatus();
if (LOG.isDebugEnabled()) {
LOG.debug("Got response for read metadata request from bookie: {} rc {}", addr, rc);
@@ -1695,35 +1556,4 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
return txnIdGenerator.incrementAndGet();
}
- private class V2CompletionKey extends CompletionKey {
- final long ledgerId;
- final long entryId;
-
-
- public V2CompletionKey(long ledgerId, long entryId, OperationType operationType) {
- super(0, operationType);
- this.ledgerId = ledgerId;
- this.entryId = entryId;
-
- }
-
- @Override
- public boolean equals(Object object) {
- if (!(object instanceof V2CompletionKey)) {
- return false;
- }
- V2CompletionKey that = (V2CompletionKey) object;
- return this.entryId == that.entryId && this.ledgerId == that.ledgerId;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(ledgerId, entryId);
- }
-
- @Override
- public String toString() {
- return String.format("%d:%d %s", ledgerId, entryId, operationType);
- }
- }
}