You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2013/03/18 00:31:21 UTC

svn commit: r1457592 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/

Author: sijie
Date: Sun Mar 17 23:31:20 2013
New Revision: 1457592

URL: http://svn.apache.org/r1457592
Log:
BOOKKEEPER-576: Bookie client should use netty Decoder/Encoder (ivank via sijie)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1457592&r1=1457591&r2=1457592&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sun Mar 17 23:31:20 2013
@@ -44,6 +44,8 @@ Trunk (unreleased changes)
 
       BOOKKEEPER-574: Extend the bookkeeper shell to get a list of available bookies (ivank via umamahesh)
 
+      BOOKKEEPER-576: Bookie client should use netty Decoder/Encoder (ivank via sijie)
+
 Release 4.2.0 - 2013-01-14
 
   Non-backward compatible changes:

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java?rev=1457592&r1=1457591&r2=1457592&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java Sun Mar 17 23:31:20 2013
@@ -141,8 +141,8 @@ class BookieNettyServer {
                              new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4));
             pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
 
-            pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.Decoder());
-            pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.Encoder());
+            pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder());
+            pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder());
             pipeline.addLast("bookieRequestHandler", new BookieRequestHandler(conf, bookie,
                                                                               allChannels));
             return pipeline;

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java?rev=1457592&r1=1457591&r2=1457592&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java Sun Mar 17 23:31:20 2013
@@ -35,7 +35,45 @@ import org.slf4j.LoggerFactory;
 public class BookieProtoEncoding {
     static Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class);
 
-    public static class Decoder extends OneToOneDecoder {
+    public static class RequestEncoder extends OneToOneEncoder {
+        @Override
+        public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+                throws Exception {
+            if (!(msg instanceof BookieProtocol.Request)) {
+                return msg;
+            }
+            BookieProtocol.Request r = (BookieProtocol.Request)msg;
+            if (r instanceof BookieProtocol.AddRequest) {
+                BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest)r;
+                int totalHeaderSize = 4 // for the header
+                    + BookieProtocol.MASTER_KEY_LENGTH; // for the master key
+                ChannelBuffer buf = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
+                buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt());
+                buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
+                return ChannelBuffers.wrappedBuffer(buf, ar.getData());
+            } else {
+                assert(r instanceof BookieProtocol.ReadRequest);
+                int totalHeaderSize = 4 // for request type
+                    + 8 // for ledgerId
+                    + 8; // for entryId
+                if (r.hasMasterKey()) {
+                    totalHeaderSize += BookieProtocol.MASTER_KEY_LENGTH;
+                }
+
+                ChannelBuffer buf = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
+                buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt());
+                buf.writeLong(r.getLedgerId());
+                buf.writeLong(r.getEntryId());
+                if (r.hasMasterKey()) {
+                    buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
+                }
+
+                return buf;
+            }
+        }
+    }
+
+    public static class RequestDecoder extends OneToOneDecoder {
         @Override
         public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
                 throws Exception {
@@ -61,11 +99,11 @@ public class BookieProtoEncoding {
                 packet.readBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH);
 
                 ChannelBuffer bb = packet.duplicate();
+
                 ledgerId = bb.readLong();
                 entryId = bb.readLong();
-
                 return new BookieProtocol.AddRequest(h.getVersion(), ledgerId, entryId,
-                        flags, masterKey, packet.toByteBuffer().slice());
+                        flags, masterKey, packet.slice());
             case BookieProtocol.READENTRY:
                 ledgerId = packet.readLong();
                 entryId = packet.readLong();
@@ -83,7 +121,7 @@ public class BookieProtoEncoding {
         }
     }
 
-    public static class Encoder extends OneToOneEncoder {
+    public static class ResponseEncoder extends OneToOneEncoder {
         @Override
         public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
                 throws Exception {
@@ -100,13 +138,15 @@ public class BookieProtoEncoding {
             buf.writeLong(r.getEntryId());
 
             ServerStats.getInstance().incrementPacketsSent();
-
             if (msg instanceof BookieProtocol.ReadResponse) {
                 BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse)r;
-                return ChannelBuffers.wrappedBuffer(buf,
-                        ChannelBuffers.wrappedBuffer(rr.getData()));
-            } else if ((msg instanceof BookieProtocol.AddResponse)
-                       || (msg instanceof BookieProtocol.ErrorResponse)) {
+                if (rr.hasData()) {
+                    return ChannelBuffers.wrappedBuffer(buf,
+                            ChannelBuffers.wrappedBuffer(rr.getData()));
+                } else {
+                    return buf;
+                }
+            } else if (msg instanceof BookieProtocol.AddResponse) {
                 return buf;
             } else {
                 LOG.error("Cannot encode unknown response type {}", msg.getClass().getName());
@@ -115,4 +155,40 @@ public class BookieProtoEncoding {
         }
     }
 
+    public static class ResponseDecoder extends OneToOneDecoder {
+        @Override
+        public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
+                throws Exception {
+            if (!(msg instanceof ChannelBuffer)) {
+                return msg;
+            }
+
+            final ChannelBuffer buffer = (ChannelBuffer)msg;
+            final int rc;
+            final long ledgerId, entryId;
+            final PacketHeader header;
+
+            header = PacketHeader.fromInt(buffer.readInt());
+            rc = buffer.readInt();
+            ledgerId = buffer.readLong();
+            entryId = buffer.readLong();
+
+            switch (header.getOpCode()) {
+            case BookieProtocol.ADDENTRY:
+                return new BookieProtocol.AddResponse(header.getVersion(), rc, ledgerId, entryId);
+            case BookieProtocol.READENTRY:
+                if (rc == BookieProtocol.EOK) {
+                    return new BookieProtocol.ReadResponse(header.getVersion(), rc,
+                                                           ledgerId, entryId, buffer.slice());
+                } else {
+                    return new BookieProtocol.ReadResponse(header.getVersion(), rc,
+                                                           ledgerId, entryId);
+                }
+            default:
+                LOG.error("Unexpected response of type {} received from {}",
+                          header.getOpCode(), channel.getRemoteAddress());
+                return msg;
+            }
+        }
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java?rev=1457592&r1=1457591&r2=1457592&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java Sun Mar 17 23:31:20 2013
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.proto;
  *
  */
 
+import org.jboss.netty.buffer.ChannelBuffer;
 import java.nio.ByteBuffer;
 
 /**
@@ -216,6 +217,10 @@ public interface BookieProtocol {
             return entryId;
         }
 
+        short getFlags() {
+            return flags;
+        }
+
         boolean hasMasterKey() {
             return masterKey != null;
         }
@@ -232,18 +237,22 @@ public interface BookieProtocol {
     }
 
     static class AddRequest extends Request {
-        final ByteBuffer data;
+        final ChannelBuffer data;
 
         AddRequest(byte protocolVersion, long ledgerId, long entryId,
-                   short flags, byte[] masterKey, ByteBuffer data) {
+                   short flags, byte[] masterKey, ChannelBuffer data) {
             super(protocolVersion, ADDENTRY, ledgerId, entryId, flags, masterKey);
             this.data = data;
         }
 
-        ByteBuffer getData() {
+        ChannelBuffer getData() {
             return data;
         }
 
+        ByteBuffer getDataAsByteBuffer() {
+            return data.toByteBuffer().slice();
+        }
+
         boolean isRecoveryAdd() {
             return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD;
         }
@@ -302,33 +311,36 @@ public interface BookieProtocol {
 
         @Override
         public String toString() {
-            return String.format("Op(%d)[Ledger:%d,Entry:%d]", opCode, ledgerId, entryId);
+            return String.format("Op(%d)[Ledger:%d,Entry:%d,errorCode=%d]",
+                                 opCode, ledgerId, entryId, errorCode);
         }
     }
 
     static class ReadResponse extends Response {
-        final ByteBuffer data;
+        final ChannelBuffer data;
 
-        ReadResponse(byte protocolVersion, long ledgerId, long entryId, ByteBuffer data) {
-            super(protocolVersion, READENTRY, EOK, ledgerId, entryId);
+        ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) {
+            super(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
+            this.data = null;
+        }
+
+        ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, ChannelBuffer data) {
+            super(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
             this.data = data;
         }
 
-        ByteBuffer getData() {
-            return data;
+        boolean hasData() {
+            return data != null;
         }
-    }
 
-    static class AddResponse extends Response {
-        AddResponse(byte protocolVersion, long ledgerId, long entryId) {
-            super(protocolVersion, ADDENTRY, EOK, ledgerId, entryId);
+        ChannelBuffer getData() {
+            return data;
         }
     }
 
-    static class ErrorResponse extends Response {
-        ErrorResponse(byte protocolVersion, byte opCode, int errorCode,
-                      long ledgerId, long entryId) {
-            super(protocolVersion, opCode, errorCode, ledgerId, entryId);
+    static class AddResponse extends Response {
+        AddResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) {
+            super(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId);
         }
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java?rev=1457592&r1=1457591&r2=1457592&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java Sun Mar 17 23:31:20 2013
@@ -159,10 +159,11 @@ class BookieRequestHandler extends Simpl
         int rc = BookieProtocol.EOK;
         try {
             if (add.isRecoveryAdd()) {
-                bookie.recoveryAddEntry(add.getData(), this, new AddCtx(c, add),
+                bookie.recoveryAddEntry(add.getDataAsByteBuffer(), this, new AddCtx(c, add),
                                         add.getMasterKey());
             } else {
-                bookie.addEntry(add.getData(), this, new AddCtx(c, add), add.getMasterKey());
+                bookie.addEntry(add.getDataAsByteBuffer(),
+                                this, new AddCtx(c, add), add.getMasterKey());
             }
         } catch (IOException e) {
             LOG.error("Error writing " + add, e);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1457592&r1=1457591&r2=1457592&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Sun Mar 17 23:31:20 2013
@@ -52,6 +52,7 @@ import org.jboss.netty.channel.SimpleCha
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
 import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
 import org.jboss.netty.handler.codec.frame.TooLongFrameException;
 import org.jboss.netty.handler.timeout.ReadTimeoutException;
 import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
@@ -227,27 +228,13 @@ public class PerChannelBookieClient exte
      */
     void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, WriteCallback cb,
                   Object ctx, final int options) {
+        BookieProtocol.AddRequest r = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
+                ledgerId, entryId, (short)options, masterKey, toSend);
         final int entrySize = toSend.readableBytes();
-
         final CompletionKey completionKey = new CompletionKey(ledgerId, entryId);
-
         addCompletions.put(completionKey, new AddCompletion(cb, entrySize, ctx));
-
-        int totalHeaderSize = 4 // for the length of the packet
-                              + 4 // for the type of request
-                              + BookieProtocol.MASTER_KEY_LENGTH; // for the master key
-
-        try{
-            ChannelBuffer header = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
-
-            header.writeInt(totalHeaderSize - 4 + entrySize);
-            header.writeInt(new PacketHeader(BookieProtocol.CURRENT_PROTOCOL_VERSION,
-                                             BookieProtocol.ADDENTRY, (short)options).toInt());
-            header.writeBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH);
-
-            ChannelBuffer wrappedBuffer = ChannelBuffers.wrappedBuffer(header, toSend);
-
-            ChannelFuture future = channel.write(wrappedBuffer);
+        try {
+            ChannelFuture future = channel.write(r);
             future.addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) throws Exception {
@@ -274,64 +261,48 @@ public class PerChannelBookieClient exte
         final CompletionKey key = new CompletionKey(ledgerId, entryId);
         readCompletions.put(key, new ReadCompletion(cb, ctx));
 
-        int totalHeaderSize = 4 // for the length of the packet
-                              + 4 // for request type
-                              + 8 // for ledgerId
-                              + 8 // for entryId
-                              + BookieProtocol.MASTER_KEY_LENGTH; // for masterKey
-
-        ChannelBuffer tmpEntry = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
-        tmpEntry.writeInt(totalHeaderSize - 4);
-
-        tmpEntry.writeInt(new PacketHeader(BookieProtocol.CURRENT_PROTOCOL_VERSION,
-                                           BookieProtocol.READENTRY,
-                                           BookieProtocol.FLAG_DO_FENCING).toInt());
-        tmpEntry.writeLong(ledgerId);
-        tmpEntry.writeLong(entryId);
-        tmpEntry.writeBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH);
+        final BookieProtocol.ReadRequest r = new BookieProtocol.ReadRequest(
+                BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
+                BookieProtocol.FLAG_DO_FENCING, masterKey);
 
-        ChannelFuture future = channel.write(tmpEntry);
-        future.addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-                if (future.isSuccess()) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Successfully wrote request for reading entry: " + entryId + " ledger-id: "
-                                  + ledgerId + " bookie: " + channel.getRemoteAddress());
+        try {
+            ChannelFuture future = channel.write(r);
+            future.addListener(new ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(ChannelFuture future) throws Exception {
+                        if (future.isSuccess()) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Successfully wrote request {} to {}",
+                                          r, channel.getRemoteAddress());
+                            }
+                        } else {
+                            errorOutReadKey(key);
+                        }
                     }
-                } else {
-                    errorOutReadKey(key);
-                }
-            }
-        });
+                });
+        } catch(Throwable e) {
+            LOG.warn("Read entry operation " + r + " failed", e);
+            errorOutReadKey(key);
+        }
     }
 
     public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) {
         final CompletionKey key = new CompletionKey(ledgerId, entryId);
         readCompletions.put(key, new ReadCompletion(cb, ctx));
 
-        int totalHeaderSize = 4 // for the length of the packet
-                              + 4 // for request type
-                              + 8 // for ledgerId
-                              + 8; // for entryId
+        final BookieProtocol.ReadRequest r = new BookieProtocol.ReadRequest(
+                BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
+                BookieProtocol.FLAG_NONE);
 
         try{
-            ChannelBuffer tmpEntry = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
-            tmpEntry.writeInt(totalHeaderSize - 4);
-
-            tmpEntry.writeInt(new PacketHeader(BookieProtocol.CURRENT_PROTOCOL_VERSION,
-                                               BookieProtocol.READENTRY, BookieProtocol.FLAG_NONE).toInt());
-            tmpEntry.writeLong(ledgerId);
-            tmpEntry.writeLong(entryId);
-
-            ChannelFuture future = channel.write(tmpEntry);
+            ChannelFuture future = channel.write(r);
             future.addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) throws Exception {
                     if (future.isSuccess()) {
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("Successfully wrote request for reading entry: " + entryId + " ledger-id: "
-                                                            + ledgerId + " bookie: " + channel.getRemoteAddress());
+                            LOG.debug("Successfully wrote request {} to {}",
+                                      r, channel.getRemoteAddress());
                         }
                     } else {
                         errorOutReadKey(key);
@@ -339,7 +310,7 @@ public class PerChannelBookieClient exte
                 }
             });
         } catch(Throwable e) {
-            LOG.warn("Read entry operation failed", e);
+            LOG.warn("Read entry operation " + r + " failed", e);
             errorOutReadKey(key);
         }
     }
@@ -460,6 +431,11 @@ public class PerChannelBookieClient exte
         pipeline.addLast("readTimeout", new ReadTimeoutHandler(readTimeoutTimer, 
                                                                conf.getReadTimeout()));
         pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
+        pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+        pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder());
+        pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.ResponseDecoder());
+
+
         pipeline.addLast("mainhandler", this);
         return pipeline;
     }
@@ -527,54 +503,41 @@ public class PerChannelBookieClient exte
      */
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-        if (!(e.getMessage() instanceof ChannelBuffer)) {
+        if (!(e.getMessage() instanceof BookieProtocol.Response)) {
             ctx.sendUpstream(e);
             return;
         }
+        final BookieProtocol.Response r = (BookieProtocol.Response)e.getMessage();
 
-        final ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
-        final int rc;
-        final long ledgerId, entryId;
-        final PacketHeader header;
-
-        try {
-            header = PacketHeader.fromInt(buffer.readInt());
-            rc = buffer.readInt();
-            ledgerId = buffer.readLong();
-            entryId = buffer.readLong();
-        } catch (IndexOutOfBoundsException ex) {
-            LOG.error("Unparseable response from bookie: " + addr, ex);
-            return;
-        }
-
-        executor.submitOrdered(ledgerId, new SafeRunnable() {
+        executor.submitOrdered(r.getLedgerId(), new SafeRunnable() {
             @Override
             public void safeRun() {
-                switch (header.getOpCode()) {
+                switch (r.getOpCode()) {
                 case BookieProtocol.ADDENTRY:
-                    handleAddResponse(ledgerId, entryId, rc);
+                    BookieProtocol.AddResponse a = (BookieProtocol.AddResponse)r;
+                    handleAddResponse(a);
                     break;
                 case BookieProtocol.READENTRY:
-                    handleReadResponse(ledgerId, entryId, rc, buffer);
+                    BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse)r;
+                    handleReadResponse(rr);
                     break;
                 default:
-                    LOG.error("Unexpected response, type: " + header.getOpCode() 
-                              + " received from bookie: " + addr + " , ignoring");
+                    LOG.error("Unexpected response, type: {}", r);
                 }
             }
         });
     }
 
-    void handleAddResponse(long ledgerId, long entryId, int rc) {
+    void handleAddResponse(BookieProtocol.AddResponse a) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
-                      + entryId + " rc: " + rc);
+            LOG.debug("Got response for add request from bookie: {} for ledger: {}", addr, a);
         }
 
         // convert to BKException code because thats what the uppper
         // layers expect. This is UGLY, there should just be one set of
         // error codes.
-        switch (rc) {
+        int rc = BKException.Code.WriteException;
+        switch (a.getErrorCode()) {
         case BookieProtocol.EOK:
             rc = BKException.Code.OK;
             break;
@@ -591,50 +554,53 @@ public class PerChannelBookieClient exte
             rc = BKException.Code.WriteOnReadOnlyBookieException;
             break;
         default:
-            LOG.error("Add for ledger: " + ledgerId + ", entry: " + entryId + " failed on bookie: " + addr
-                      + " with code: " + rc);
+            LOG.error("Add failed {}", a);
             rc = BKException.Code.WriteException;
             break;
         }
 
         AddCompletion ac;
-        ac = addCompletions.remove(new CompletionKey(ledgerId, entryId));
+        ac = addCompletions.remove(new CompletionKey(a.getLedgerId(),
+                                                     a.getEntryId()));
         if (ac == null) {
-            LOG.error("Unexpected add response received from bookie: " + addr + " for ledger: " + ledgerId
-                      + ", entry: " + entryId + " , ignoring");
+            LOG.error("Unexpected add response from bookie {} for {}", addr, a);
             return;
         }
 
-        // totalBytesOutstanding.addAndGet(-ac.size);
-
-        ac.cb.writeComplete(rc, ledgerId, entryId, addr, ac.ctx);
-
+        ac.cb.writeComplete(rc, a.getLedgerId(), a.getEntryId(), addr, ac.ctx);
     }
 
-    void handleReadResponse(long ledgerId, long entryId, int rc, ChannelBuffer buffer) {
+    void handleReadResponse(BookieProtocol.ReadResponse rr) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Got response for read request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
-                      + entryId + " rc: " + rc + " entry length: " + buffer.readableBytes());
+            LOG.debug("Got response for read request {} entry length: {}",
+                      rr, rr.getData().readableBytes());
         }
 
         // convert to BKException code because thats what the uppper
         // layers expect. This is UGLY, there should just be one set of
         // error codes.
-        if (rc == BookieProtocol.EOK) {
+        int rc = BKException.Code.ReadException;
+        switch (rr.getErrorCode()) {
+        case BookieProtocol.EOK:
             rc = BKException.Code.OK;
-        } else if (rc == BookieProtocol.ENOENTRY || rc == BookieProtocol.ENOLEDGER) {
+            break;
+        case BookieProtocol.ENOENTRY:
+        case BookieProtocol.ENOLEDGER:
             rc = BKException.Code.NoSuchEntryException;
-        } else if (rc == BookieProtocol.EBADVERSION) {
+            break;
+        case BookieProtocol.EBADVERSION:
             rc = BKException.Code.ProtocolVersionException;
-        } else if (rc == BookieProtocol.EUA) {
+            break;
+        case BookieProtocol.EUA:
             rc = BKException.Code.UnauthorizedAccessException;
-        } else {
-            LOG.error("Read for ledger: " + ledgerId + ", entry: " + entryId + " failed on bookie: " + addr
-                      + " with code: " + rc);
+            break;
+        default:
+            LOG.error("Read error for {}", rr);
             rc = BKException.Code.ReadException;
+            break;
         }
 
-        CompletionKey key = new CompletionKey(ledgerId, entryId);
+        CompletionKey key = new CompletionKey(rr.getLedgerId(), rr.getEntryId());
         ReadCompletion readCompletion = readCompletions.remove(key);
 
         if (readCompletion == null) {
@@ -644,16 +610,17 @@ public class PerChannelBookieClient exte
              * different entry id.
              */
             
-            readCompletion = readCompletions.remove(new CompletionKey(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED));
+            readCompletion = readCompletions.remove(new CompletionKey(rr.getLedgerId(),
+                                                                      BookieProtocol.LAST_ADD_CONFIRMED));
         }
 
         if (readCompletion == null) {
-            LOG.error("Unexpected read response received from bookie: " + addr + " for ledger: " + ledgerId
-                      + ", entry: " + entryId + " , ignoring");
+            LOG.error("Unexpected read response received from bookie: {} for {}", addr, rr);
             return;
         }
 
-        readCompletion.cb.readEntryComplete(rc, ledgerId, entryId, buffer.slice(), readCompletion.ctx);
+        readCompletion.cb.readEntryComplete(rc, rr.getLedgerId(), rr.getEntryId(),
+                                            rr.getData(), readCompletion.ctx);
     }
 
     /**

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java?rev=1457592&r1=1457591&r2=1457592&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java Sun Mar 17 23:31:20 2013
@@ -21,20 +21,27 @@
 package org.apache.bookkeeper.proto;
 
 import java.nio.ByteBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 
 class ResponseBuilder {
     static BookieProtocol.Response buildErrorResponse(int errorCode, BookieProtocol.Request r) {
-        return new BookieProtocol.ErrorResponse(r.getProtocolVersion(), r.getOpCode(),
-                                                errorCode, r.getLedgerId(), r.getEntryId());
+        if (r.getOpCode() == BookieProtocol.ADDENTRY) {
+            return new BookieProtocol.AddResponse(r.getProtocolVersion(), errorCode,
+                                                  r.getLedgerId(), r.getEntryId());
+        } else {
+            assert(r.getOpCode() == BookieProtocol.READENTRY);
+            return new BookieProtocol.ReadResponse(r.getProtocolVersion(), errorCode,
+                                                   r.getLedgerId(), r.getEntryId());
+        }
     }
 
     static BookieProtocol.Response buildAddResponse(BookieProtocol.Request r) {
-        return new BookieProtocol.AddResponse(r.getProtocolVersion(), r.getLedgerId(),
+        return new BookieProtocol.AddResponse(r.getProtocolVersion(), BookieProtocol.EOK, r.getLedgerId(),
                                               r.getEntryId());
     }
 
     static BookieProtocol.Response buildReadResponse(ByteBuffer data, BookieProtocol.Request r) {
-        return new BookieProtocol.ReadResponse(r.getProtocolVersion(),
-                                               r.getLedgerId(), r.getEntryId(), data);
+        return new BookieProtocol.ReadResponse(r.getProtocolVersion(), BookieProtocol.EOK,
+                r.getLedgerId(), r.getEntryId(), ChannelBuffers.wrappedBuffer(data));
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java?rev=1457592&r1=1457591&r2=1457592&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java Sun Mar 17 23:31:20 2013
@@ -58,7 +58,7 @@ public class TestProtoVersions {
         base.tearDown();
     }
 
-    private void testVersion(int version, int expectedresult) throws Exception {
+    private void testVersion(byte version, int expectedresult) throws Exception {
         PerChannelBookieClient bc = new PerChannelBookieClient(base.executor, base.channelFactory, 
                 new InetSocketAddress(InetAddress.getLocalHost(), base.port), new AtomicLong(0));
         final AtomicInteger outerrc = new AtomicInteger(-1);
@@ -83,20 +83,9 @@ public class TestProtoVersions {
         bc.readCompletions.put(bc.newCompletionKey(1, 1),
                                new PerChannelBookieClient.ReadCompletion(cb, this));
         
-        int totalHeaderSize = 4 // for the length of the packet
-            + 4 // for request type
-            + 8 // for ledgerId
-            + 8; // for entryId
-
-        // This will need to updated if the protocol for read changes
-        ChannelBuffer tmpEntry = bc.channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
-        tmpEntry.writeInt(totalHeaderSize - 4);
-        tmpEntry.writeInt(new BookieProtocol.PacketHeader((byte)version, BookieProtocol.READENTRY, (short)0).toInt());
-        tmpEntry.writeLong(1);
-        tmpEntry.writeLong(1);
-        
+        BookieProtocol.ReadRequest req = new BookieProtocol.ReadRequest(version, 1L, 1L, (short)0);
         
-        bc.channel.write(tmpEntry).awaitUninterruptibly();
+        bc.channel.write(req).awaitUninterruptibly();
         readLatch.await(5, TimeUnit.SECONDS);
         assertEquals("Expected result differs", expectedresult, outerrc.get());
         
@@ -105,9 +94,9 @@ public class TestProtoVersions {
 
     @Test(timeout=60000)
     public void testVersions() throws Exception {
-        testVersion(BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION-1, BKException.Code.ProtocolVersionException);
+        testVersion((byte)(BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION-1), BKException.Code.ProtocolVersionException);
         testVersion(BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION, BKException.Code.NoSuchEntryException);
         testVersion(BookieProtocol.CURRENT_PROTOCOL_VERSION, BKException.Code.NoSuchEntryException);
-        testVersion(BookieProtocol.CURRENT_PROTOCOL_VERSION+1, BKException.Code.ProtocolVersionException);
+        testVersion((byte)(BookieProtocol.CURRENT_PROTOCOL_VERSION+1), BKException.Code.ProtocolVersionException);
     }
-}
\ No newline at end of file
+}