You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2013/03/18 00:31:21 UTC
svn commit: r1457592 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/
bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/
Author: sijie
Date: Sun Mar 17 23:31:20 2013
New Revision: 1457592
URL: http://svn.apache.org/r1457592
Log:
BOOKKEEPER-576: Bookie client should use netty Decoder/Encoder (ivank via sijie)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1457592&r1=1457591&r2=1457592&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sun Mar 17 23:31:20 2013
@@ -44,6 +44,8 @@ Trunk (unreleased changes)
BOOKKEEPER-574: Extend the bookkeeper shell to get a list of available bookies (ivank via umamahesh)
+ BOOKKEEPER-576: Bookie client should use netty Decoder/Encoder (ivank via sijie)
+
Release 4.2.0 - 2013-01-14
Non-backward compatible changes:
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java?rev=1457592&r1=1457591&r2=1457592&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java Sun Mar 17 23:31:20 2013
@@ -141,8 +141,8 @@ class BookieNettyServer {
new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
- pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.Decoder());
- pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.Encoder());
+ pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder());
+ pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder());
pipeline.addLast("bookieRequestHandler", new BookieRequestHandler(conf, bookie,
allChannels));
return pipeline;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java?rev=1457592&r1=1457591&r2=1457592&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java Sun Mar 17 23:31:20 2013
@@ -35,7 +35,45 @@ import org.slf4j.LoggerFactory;
public class BookieProtoEncoding {
static Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class);
- public static class Decoder extends OneToOneDecoder {
+ public static class RequestEncoder extends OneToOneEncoder {
+ @Override
+ public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+ throws Exception {
+ if (!(msg instanceof BookieProtocol.Request)) {
+ return msg;
+ }
+ BookieProtocol.Request r = (BookieProtocol.Request)msg;
+ if (r instanceof BookieProtocol.AddRequest) {
+ BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest)r;
+ int totalHeaderSize = 4 // for the header
+ + BookieProtocol.MASTER_KEY_LENGTH; // for the master key
+ ChannelBuffer buf = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
+ buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt());
+ buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
+ return ChannelBuffers.wrappedBuffer(buf, ar.getData());
+ } else {
+ assert(r instanceof BookieProtocol.ReadRequest);
+ int totalHeaderSize = 4 // for request type
+ + 8 // for ledgerId
+ + 8; // for entryId
+ if (r.hasMasterKey()) {
+ totalHeaderSize += BookieProtocol.MASTER_KEY_LENGTH;
+ }
+
+ ChannelBuffer buf = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
+ buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt());
+ buf.writeLong(r.getLedgerId());
+ buf.writeLong(r.getEntryId());
+ if (r.hasMasterKey()) {
+ buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
+ }
+
+ return buf;
+ }
+ }
+ }
+
+ public static class RequestDecoder extends OneToOneDecoder {
@Override
public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
throws Exception {
@@ -61,11 +99,11 @@ public class BookieProtoEncoding {
packet.readBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH);
ChannelBuffer bb = packet.duplicate();
+
ledgerId = bb.readLong();
entryId = bb.readLong();
-
return new BookieProtocol.AddRequest(h.getVersion(), ledgerId, entryId,
- flags, masterKey, packet.toByteBuffer().slice());
+ flags, masterKey, packet.slice());
case BookieProtocol.READENTRY:
ledgerId = packet.readLong();
entryId = packet.readLong();
@@ -83,7 +121,7 @@ public class BookieProtoEncoding {
}
}
- public static class Encoder extends OneToOneEncoder {
+ public static class ResponseEncoder extends OneToOneEncoder {
@Override
public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
throws Exception {
@@ -100,13 +138,15 @@ public class BookieProtoEncoding {
buf.writeLong(r.getEntryId());
ServerStats.getInstance().incrementPacketsSent();
-
if (msg instanceof BookieProtocol.ReadResponse) {
BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse)r;
- return ChannelBuffers.wrappedBuffer(buf,
- ChannelBuffers.wrappedBuffer(rr.getData()));
- } else if ((msg instanceof BookieProtocol.AddResponse)
- || (msg instanceof BookieProtocol.ErrorResponse)) {
+ if (rr.hasData()) {
+ return ChannelBuffers.wrappedBuffer(buf,
+ ChannelBuffers.wrappedBuffer(rr.getData()));
+ } else {
+ return buf;
+ }
+ } else if (msg instanceof BookieProtocol.AddResponse) {
return buf;
} else {
LOG.error("Cannot encode unknown response type {}", msg.getClass().getName());
@@ -115,4 +155,40 @@ public class BookieProtoEncoding {
}
}
+ public static class ResponseDecoder extends OneToOneDecoder {
+ @Override
+ public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
+ throws Exception {
+ if (!(msg instanceof ChannelBuffer)) {
+ return msg;
+ }
+
+ final ChannelBuffer buffer = (ChannelBuffer)msg;
+ final int rc;
+ final long ledgerId, entryId;
+ final PacketHeader header;
+
+ header = PacketHeader.fromInt(buffer.readInt());
+ rc = buffer.readInt();
+ ledgerId = buffer.readLong();
+ entryId = buffer.readLong();
+
+ switch (header.getOpCode()) {
+ case BookieProtocol.ADDENTRY:
+ return new BookieProtocol.AddResponse(header.getVersion(), rc, ledgerId, entryId);
+ case BookieProtocol.READENTRY:
+ if (rc == BookieProtocol.EOK) {
+ return new BookieProtocol.ReadResponse(header.getVersion(), rc,
+ ledgerId, entryId, buffer.slice());
+ } else {
+ return new BookieProtocol.ReadResponse(header.getVersion(), rc,
+ ledgerId, entryId);
+ }
+ default:
+ LOG.error("Unexpected response of type {} received from {}",
+ header.getOpCode(), channel.getRemoteAddress());
+ return msg;
+ }
+ }
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java?rev=1457592&r1=1457591&r2=1457592&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java Sun Mar 17 23:31:20 2013
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.proto;
*
*/
+import org.jboss.netty.buffer.ChannelBuffer;
import java.nio.ByteBuffer;
/**
@@ -216,6 +217,10 @@ public interface BookieProtocol {
return entryId;
}
+ short getFlags() {
+ return flags;
+ }
+
boolean hasMasterKey() {
return masterKey != null;
}
@@ -232,18 +237,22 @@ public interface BookieProtocol {
}
static class AddRequest extends Request {
- final ByteBuffer data;
+ final ChannelBuffer data;
AddRequest(byte protocolVersion, long ledgerId, long entryId,
- short flags, byte[] masterKey, ByteBuffer data) {
+ short flags, byte[] masterKey, ChannelBuffer data) {
super(protocolVersion, ADDENTRY, ledgerId, entryId, flags, masterKey);
this.data = data;
}
- ByteBuffer getData() {
+ ChannelBuffer getData() {
return data;
}
+ ByteBuffer getDataAsByteBuffer() {
+ return data.toByteBuffer().slice();
+ }
+
boolean isRecoveryAdd() {
return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD;
}
@@ -302,33 +311,36 @@ public interface BookieProtocol {
@Override
public String toString() {
- return String.format("Op(%d)[Ledger:%d,Entry:%d]", opCode, ledgerId, entryId);
+ return String.format("Op(%d)[Ledger:%d,Entry:%d,errorCode=%d]",
+ opCode, ledgerId, entryId, errorCode);
}
}
static class ReadResponse extends Response {
- final ByteBuffer data;
+ final ChannelBuffer data;
- ReadResponse(byte protocolVersion, long ledgerId, long entryId, ByteBuffer data) {
- super(protocolVersion, READENTRY, EOK, ledgerId, entryId);
+ ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) {
+ super(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
+ this.data = null;
+ }
+
+ ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, ChannelBuffer data) {
+ super(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
this.data = data;
}
- ByteBuffer getData() {
- return data;
+ boolean hasData() {
+ return data != null;
}
- }
- static class AddResponse extends Response {
- AddResponse(byte protocolVersion, long ledgerId, long entryId) {
- super(protocolVersion, ADDENTRY, EOK, ledgerId, entryId);
+ ChannelBuffer getData() {
+ return data;
}
}
- static class ErrorResponse extends Response {
- ErrorResponse(byte protocolVersion, byte opCode, int errorCode,
- long ledgerId, long entryId) {
- super(protocolVersion, opCode, errorCode, ledgerId, entryId);
+ static class AddResponse extends Response {
+ AddResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) {
+ super(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId);
}
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java?rev=1457592&r1=1457591&r2=1457592&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java Sun Mar 17 23:31:20 2013
@@ -159,10 +159,11 @@ class BookieRequestHandler extends Simpl
int rc = BookieProtocol.EOK;
try {
if (add.isRecoveryAdd()) {
- bookie.recoveryAddEntry(add.getData(), this, new AddCtx(c, add),
+ bookie.recoveryAddEntry(add.getDataAsByteBuffer(), this, new AddCtx(c, add),
add.getMasterKey());
} else {
- bookie.addEntry(add.getData(), this, new AddCtx(c, add), add.getMasterKey());
+ bookie.addEntry(add.getDataAsByteBuffer(),
+ this, new AddCtx(c, add), add.getMasterKey());
}
} catch (IOException e) {
LOG.error("Error writing " + add, e);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1457592&r1=1457591&r2=1457592&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Sun Mar 17 23:31:20 2013
@@ -52,6 +52,7 @@ import org.jboss.netty.channel.SimpleCha
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
@@ -227,27 +228,13 @@ public class PerChannelBookieClient exte
*/
void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, WriteCallback cb,
Object ctx, final int options) {
+ BookieProtocol.AddRequest r = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
+ ledgerId, entryId, (short)options, masterKey, toSend);
final int entrySize = toSend.readableBytes();
-
final CompletionKey completionKey = new CompletionKey(ledgerId, entryId);
-
addCompletions.put(completionKey, new AddCompletion(cb, entrySize, ctx));
-
- int totalHeaderSize = 4 // for the length of the packet
- + 4 // for the type of request
- + BookieProtocol.MASTER_KEY_LENGTH; // for the master key
-
- try{
- ChannelBuffer header = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
-
- header.writeInt(totalHeaderSize - 4 + entrySize);
- header.writeInt(new PacketHeader(BookieProtocol.CURRENT_PROTOCOL_VERSION,
- BookieProtocol.ADDENTRY, (short)options).toInt());
- header.writeBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH);
-
- ChannelBuffer wrappedBuffer = ChannelBuffers.wrappedBuffer(header, toSend);
-
- ChannelFuture future = channel.write(wrappedBuffer);
+ try {
+ ChannelFuture future = channel.write(r);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@@ -274,64 +261,48 @@ public class PerChannelBookieClient exte
final CompletionKey key = new CompletionKey(ledgerId, entryId);
readCompletions.put(key, new ReadCompletion(cb, ctx));
- int totalHeaderSize = 4 // for the length of the packet
- + 4 // for request type
- + 8 // for ledgerId
- + 8 // for entryId
- + BookieProtocol.MASTER_KEY_LENGTH; // for masterKey
-
- ChannelBuffer tmpEntry = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
- tmpEntry.writeInt(totalHeaderSize - 4);
-
- tmpEntry.writeInt(new PacketHeader(BookieProtocol.CURRENT_PROTOCOL_VERSION,
- BookieProtocol.READENTRY,
- BookieProtocol.FLAG_DO_FENCING).toInt());
- tmpEntry.writeLong(ledgerId);
- tmpEntry.writeLong(entryId);
- tmpEntry.writeBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH);
+ final BookieProtocol.ReadRequest r = new BookieProtocol.ReadRequest(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
+ BookieProtocol.FLAG_DO_FENCING, masterKey);
- ChannelFuture future = channel.write(tmpEntry);
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully wrote request for reading entry: " + entryId + " ledger-id: "
- + ledgerId + " bookie: " + channel.getRemoteAddress());
+ try {
+ ChannelFuture future = channel.write(r);
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Successfully wrote request {} to {}",
+ r, channel.getRemoteAddress());
+ }
+ } else {
+ errorOutReadKey(key);
+ }
}
- } else {
- errorOutReadKey(key);
- }
- }
- });
+ });
+ } catch(Throwable e) {
+ LOG.warn("Read entry operation " + r + " failed", e);
+ errorOutReadKey(key);
+ }
}
public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) {
final CompletionKey key = new CompletionKey(ledgerId, entryId);
readCompletions.put(key, new ReadCompletion(cb, ctx));
- int totalHeaderSize = 4 // for the length of the packet
- + 4 // for request type
- + 8 // for ledgerId
- + 8; // for entryId
+ final BookieProtocol.ReadRequest r = new BookieProtocol.ReadRequest(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
+ BookieProtocol.FLAG_NONE);
try{
- ChannelBuffer tmpEntry = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
- tmpEntry.writeInt(totalHeaderSize - 4);
-
- tmpEntry.writeInt(new PacketHeader(BookieProtocol.CURRENT_PROTOCOL_VERSION,
- BookieProtocol.READENTRY, BookieProtocol.FLAG_NONE).toInt());
- tmpEntry.writeLong(ledgerId);
- tmpEntry.writeLong(entryId);
-
- ChannelFuture future = channel.write(tmpEntry);
+ ChannelFuture future = channel.write(r);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully wrote request for reading entry: " + entryId + " ledger-id: "
- + ledgerId + " bookie: " + channel.getRemoteAddress());
+ LOG.debug("Successfully wrote request {} to {}",
+ r, channel.getRemoteAddress());
}
} else {
errorOutReadKey(key);
@@ -339,7 +310,7 @@ public class PerChannelBookieClient exte
}
});
} catch(Throwable e) {
- LOG.warn("Read entry operation failed", e);
+ LOG.warn("Read entry operation " + r + " failed", e);
errorOutReadKey(key);
}
}
@@ -460,6 +431,11 @@ public class PerChannelBookieClient exte
pipeline.addLast("readTimeout", new ReadTimeoutHandler(readTimeoutTimer,
conf.getReadTimeout()));
pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
+ pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+ pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder());
+ pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.ResponseDecoder());
+
+
pipeline.addLast("mainhandler", this);
return pipeline;
}
@@ -527,54 +503,41 @@ public class PerChannelBookieClient exte
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- if (!(e.getMessage() instanceof ChannelBuffer)) {
+ if (!(e.getMessage() instanceof BookieProtocol.Response)) {
ctx.sendUpstream(e);
return;
}
+ final BookieProtocol.Response r = (BookieProtocol.Response)e.getMessage();
- final ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
- final int rc;
- final long ledgerId, entryId;
- final PacketHeader header;
-
- try {
- header = PacketHeader.fromInt(buffer.readInt());
- rc = buffer.readInt();
- ledgerId = buffer.readLong();
- entryId = buffer.readLong();
- } catch (IndexOutOfBoundsException ex) {
- LOG.error("Unparseable response from bookie: " + addr, ex);
- return;
- }
-
- executor.submitOrdered(ledgerId, new SafeRunnable() {
+ executor.submitOrdered(r.getLedgerId(), new SafeRunnable() {
@Override
public void safeRun() {
- switch (header.getOpCode()) {
+ switch (r.getOpCode()) {
case BookieProtocol.ADDENTRY:
- handleAddResponse(ledgerId, entryId, rc);
+ BookieProtocol.AddResponse a = (BookieProtocol.AddResponse)r;
+ handleAddResponse(a);
break;
case BookieProtocol.READENTRY:
- handleReadResponse(ledgerId, entryId, rc, buffer);
+ BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse)r;
+ handleReadResponse(rr);
break;
default:
- LOG.error("Unexpected response, type: " + header.getOpCode()
- + " received from bookie: " + addr + " , ignoring");
+ LOG.error("Unexpected response, type: {}", r);
}
}
});
}
- void handleAddResponse(long ledgerId, long entryId, int rc) {
+ void handleAddResponse(BookieProtocol.AddResponse a) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
- + entryId + " rc: " + rc);
+ LOG.debug("Got response for add request from bookie: {} for ledger: {}", addr, a);
}
// convert to BKException code because thats what the uppper
// layers expect. This is UGLY, there should just be one set of
// error codes.
- switch (rc) {
+ int rc = BKException.Code.WriteException;
+ switch (a.getErrorCode()) {
case BookieProtocol.EOK:
rc = BKException.Code.OK;
break;
@@ -591,50 +554,53 @@ public class PerChannelBookieClient exte
rc = BKException.Code.WriteOnReadOnlyBookieException;
break;
default:
- LOG.error("Add for ledger: " + ledgerId + ", entry: " + entryId + " failed on bookie: " + addr
- + " with code: " + rc);
+ LOG.error("Add failed {}", a);
rc = BKException.Code.WriteException;
break;
}
AddCompletion ac;
- ac = addCompletions.remove(new CompletionKey(ledgerId, entryId));
+ ac = addCompletions.remove(new CompletionKey(a.getLedgerId(),
+ a.getEntryId()));
if (ac == null) {
- LOG.error("Unexpected add response received from bookie: " + addr + " for ledger: " + ledgerId
- + ", entry: " + entryId + " , ignoring");
+ LOG.error("Unexpected add response from bookie {} for {}", addr, a);
return;
}
- // totalBytesOutstanding.addAndGet(-ac.size);
-
- ac.cb.writeComplete(rc, ledgerId, entryId, addr, ac.ctx);
-
+ ac.cb.writeComplete(rc, a.getLedgerId(), a.getEntryId(), addr, ac.ctx);
}
- void handleReadResponse(long ledgerId, long entryId, int rc, ChannelBuffer buffer) {
+ void handleReadResponse(BookieProtocol.ReadResponse rr) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Got response for read request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
- + entryId + " rc: " + rc + " entry length: " + buffer.readableBytes());
+ LOG.debug("Got response for read request {} entry length: {}",
+ rr, rr.getData().readableBytes());
}
// convert to BKException code because thats what the uppper
// layers expect. This is UGLY, there should just be one set of
// error codes.
- if (rc == BookieProtocol.EOK) {
+ int rc = BKException.Code.ReadException;
+ switch (rr.getErrorCode()) {
+ case BookieProtocol.EOK:
rc = BKException.Code.OK;
- } else if (rc == BookieProtocol.ENOENTRY || rc == BookieProtocol.ENOLEDGER) {
+ break;
+ case BookieProtocol.ENOENTRY:
+ case BookieProtocol.ENOLEDGER:
rc = BKException.Code.NoSuchEntryException;
- } else if (rc == BookieProtocol.EBADVERSION) {
+ break;
+ case BookieProtocol.EBADVERSION:
rc = BKException.Code.ProtocolVersionException;
- } else if (rc == BookieProtocol.EUA) {
+ break;
+ case BookieProtocol.EUA:
rc = BKException.Code.UnauthorizedAccessException;
- } else {
- LOG.error("Read for ledger: " + ledgerId + ", entry: " + entryId + " failed on bookie: " + addr
- + " with code: " + rc);
+ break;
+ default:
+ LOG.error("Read error for {}", rr);
rc = BKException.Code.ReadException;
+ break;
}
- CompletionKey key = new CompletionKey(ledgerId, entryId);
+ CompletionKey key = new CompletionKey(rr.getLedgerId(), rr.getEntryId());
ReadCompletion readCompletion = readCompletions.remove(key);
if (readCompletion == null) {
@@ -644,16 +610,17 @@ public class PerChannelBookieClient exte
* different entry id.
*/
- readCompletion = readCompletions.remove(new CompletionKey(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED));
+ readCompletion = readCompletions.remove(new CompletionKey(rr.getLedgerId(),
+ BookieProtocol.LAST_ADD_CONFIRMED));
}
if (readCompletion == null) {
- LOG.error("Unexpected read response received from bookie: " + addr + " for ledger: " + ledgerId
- + ", entry: " + entryId + " , ignoring");
+ LOG.error("Unexpected read response received from bookie: {} for {}", addr, rr);
return;
}
- readCompletion.cb.readEntryComplete(rc, ledgerId, entryId, buffer.slice(), readCompletion.ctx);
+ readCompletion.cb.readEntryComplete(rc, rr.getLedgerId(), rr.getEntryId(),
+ rr.getData(), readCompletion.ctx);
}
/**
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java?rev=1457592&r1=1457591&r2=1457592&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java Sun Mar 17 23:31:20 2013
@@ -21,20 +21,27 @@
package org.apache.bookkeeper.proto;
import java.nio.ByteBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
class ResponseBuilder {
static BookieProtocol.Response buildErrorResponse(int errorCode, BookieProtocol.Request r) {
- return new BookieProtocol.ErrorResponse(r.getProtocolVersion(), r.getOpCode(),
- errorCode, r.getLedgerId(), r.getEntryId());
+ if (r.getOpCode() == BookieProtocol.ADDENTRY) {
+ return new BookieProtocol.AddResponse(r.getProtocolVersion(), errorCode,
+ r.getLedgerId(), r.getEntryId());
+ } else {
+ assert(r.getOpCode() == BookieProtocol.READENTRY);
+ return new BookieProtocol.ReadResponse(r.getProtocolVersion(), errorCode,
+ r.getLedgerId(), r.getEntryId());
+ }
}
static BookieProtocol.Response buildAddResponse(BookieProtocol.Request r) {
- return new BookieProtocol.AddResponse(r.getProtocolVersion(), r.getLedgerId(),
+ return new BookieProtocol.AddResponse(r.getProtocolVersion(), BookieProtocol.EOK, r.getLedgerId(),
r.getEntryId());
}
static BookieProtocol.Response buildReadResponse(ByteBuffer data, BookieProtocol.Request r) {
- return new BookieProtocol.ReadResponse(r.getProtocolVersion(),
- r.getLedgerId(), r.getEntryId(), data);
+ return new BookieProtocol.ReadResponse(r.getProtocolVersion(), BookieProtocol.EOK,
+ r.getLedgerId(), r.getEntryId(), ChannelBuffers.wrappedBuffer(data));
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java?rev=1457592&r1=1457591&r2=1457592&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java Sun Mar 17 23:31:20 2013
@@ -58,7 +58,7 @@ public class TestProtoVersions {
base.tearDown();
}
- private void testVersion(int version, int expectedresult) throws Exception {
+ private void testVersion(byte version, int expectedresult) throws Exception {
PerChannelBookieClient bc = new PerChannelBookieClient(base.executor, base.channelFactory,
new InetSocketAddress(InetAddress.getLocalHost(), base.port), new AtomicLong(0));
final AtomicInteger outerrc = new AtomicInteger(-1);
@@ -83,20 +83,9 @@ public class TestProtoVersions {
bc.readCompletions.put(bc.newCompletionKey(1, 1),
new PerChannelBookieClient.ReadCompletion(cb, this));
- int totalHeaderSize = 4 // for the length of the packet
- + 4 // for request type
- + 8 // for ledgerId
- + 8; // for entryId
-
- // This will need to updated if the protocol for read changes
- ChannelBuffer tmpEntry = bc.channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
- tmpEntry.writeInt(totalHeaderSize - 4);
- tmpEntry.writeInt(new BookieProtocol.PacketHeader((byte)version, BookieProtocol.READENTRY, (short)0).toInt());
- tmpEntry.writeLong(1);
- tmpEntry.writeLong(1);
-
+ BookieProtocol.ReadRequest req = new BookieProtocol.ReadRequest(version, 1L, 1L, (short)0);
- bc.channel.write(tmpEntry).awaitUninterruptibly();
+ bc.channel.write(req).awaitUninterruptibly();
readLatch.await(5, TimeUnit.SECONDS);
assertEquals("Expected result differs", expectedresult, outerrc.get());
@@ -105,9 +94,9 @@ public class TestProtoVersions {
@Test(timeout=60000)
public void testVersions() throws Exception {
- testVersion(BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION-1, BKException.Code.ProtocolVersionException);
+ testVersion((byte)(BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION-1), BKException.Code.ProtocolVersionException);
testVersion(BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION, BKException.Code.NoSuchEntryException);
testVersion(BookieProtocol.CURRENT_PROTOCOL_VERSION, BKException.Code.NoSuchEntryException);
- testVersion(BookieProtocol.CURRENT_PROTOCOL_VERSION+1, BKException.Code.ProtocolVersionException);
+ testVersion((byte)(BookieProtocol.CURRENT_PROTOCOL_VERSION+1), BKException.Code.ProtocolVersionException);
}
-}
\ No newline at end of file
+}