You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2014/07/25 00:34:20 UTC

svn commit: r1613315 [1/4] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/ bookkeeper-server/src/main/java/org/apache...

Author: fpj
Date: Thu Jul 24 22:34:19 2014
New Revision: 1613315

URL: http://svn.apache.org/r1613315
Log:
BOOKKEEPER-582: Make bookie and client use protobuf for requests (non-wire part)
(sijie via fpj)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
Removed:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java
    zookeeper/bookkeeper/trunk/compat-deps/pom.xml
    zookeeper/bookkeeper/trunk/hedwig-server/pom.xml
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Jul 24 22:34:19 2014
@@ -303,6 +303,9 @@ Trunk (unreleased changes)
       BOOKKEEPER-562: Ability to tell if a ledger is closed or not (fpj)
 
       BOOKKEEPER-257: Ability to list all ledgers (fpj via ivank)
+  
+      BOOKKEEPER-582: Make bookie and client use protobuf for requests (non-wire part)
+      (sijie via fpj)
 
 Release 4.2.0 - 2013-01-14
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml Thu Jul 24 22:34:19 2014
@@ -161,6 +161,18 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-server-compat420</artifactId>
+      <version>4.2.0</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.bookkeeper</groupId>
+          <artifactId>bookkeeper-server</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
@@ -249,6 +261,7 @@
           <excludes>
             <!-- exclude generated file //-->
             <exclude>**/DataFormats.java</exclude>
+            <exclude>**/BookkeeperProtocol.java</exclude>
           </excludes>
         </configuration>
       </plugin>

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java Thu Jul 24 22:34:19 2014
@@ -127,7 +127,8 @@ public class LedgerChecker {
 
         public void readEntryComplete(int rc, long ledgerId, long entryId,
                                       ChannelBuffer buffer, Object ctx) {
-            if (rc != BKException.Code.NoSuchEntryException) {
+            if (BKException.Code.NoSuchEntryException != rc &&
+                BKException.Code.NoSuchLedgerExistsException != rc) {
                 entryMayExist.set(true);
             }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Thu Jul 24 22:34:19 2014
@@ -186,15 +186,19 @@ class PendingReadOp implements Enumerati
 
         synchronized void logErrorAndReattemptRead(BookieSocketAddress host, String errMsg, int rc) {
             if (BKException.Code.OK == firstError ||
-                BKException.Code.NoSuchEntryException == firstError) {
+                BKException.Code.NoSuchEntryException == firstError ||
+                BKException.Code.NoSuchLedgerExistsException == firstError) {
                 firstError = rc;
             } else if (BKException.Code.BookieHandleNotAvailableException == firstError &&
-                       BKException.Code.NoSuchEntryException != rc) {
-                // if other exception rather than NoSuchEntryException is returned
-                // we need to update firstError to indicate that it might be a valid read but just failed.
+                       BKException.Code.NoSuchEntryException != rc &&
+                       BKException.Code.NoSuchLedgerExistsException != rc) {
+                // if other exception rather than NoSuchEntryException or NoSuchLedgerExistsException is
+                // returned we need to update firstError to indicate that it might be a valid read but just
+                // failed.
                 firstError = rc;
             }
-            if (BKException.Code.NoSuchEntryException == rc) {
+            if (BKException.Code.NoSuchEntryException == rc ||
+                BKException.Code.NoSuchLedgerExistsException == rc) {
                 ++numMissedEntryReads;
                 LOG.debug("No such entry found on bookie.  L{} E{} bookie: {}",
                         new Object[] { lh.ledgerId, entryId, host });

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java Thu Jul 24 22:34:19 2014
@@ -20,7 +20,6 @@
  */
 package org.apache.bookkeeper.processor;
 
-import org.apache.bookkeeper.proto.BookieProtocol;
 import org.jboss.netty.channel.Channel;
 
 public interface RequestProcessor {
@@ -38,6 +37,6 @@ public interface RequestProcessor {
      * @param channel
      *          channel received the given request <i>r</i>
      */
-    public void processRequest(BookieProtocol.Request r, Channel channel);
+    public void processRequest(Object r, Channel channel);
 
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java Thu Jul 24 22:34:19 2014
@@ -118,9 +118,11 @@ public class BookieClient implements Per
                 PerChannelBookieClientPool oldClientPool = channels.putIfAbsent(addr, newClientPool);
                 if (null == oldClientPool) {
                     clientPool = newClientPool;
+                    // initialize the pool only after we put the pool into the map
+                    clientPool.intialize();
                 } else {
                     clientPool = oldClientPool;
-                    newClientPool.close();
+                    newClientPool.close(false);
                 }
             } finally {
                 closeLock.readLock().unlock();
@@ -130,10 +132,10 @@ public class BookieClient implements Per
     }
 
     public void closeClients(Set<BookieSocketAddress> addrs) {
+        final HashSet<PerChannelBookieClientPool> clients =
+                new HashSet<PerChannelBookieClientPool>();
         closeLock.readLock().lock();
         try {
-            final HashSet<PerChannelBookieClientPool> clients =
-                    new HashSet<PerChannelBookieClientPool>();
             for (BookieSocketAddress a : addrs) {
                 PerChannelBookieClientPool c = channels.get(a);
                 if (c != null) {
@@ -144,17 +146,12 @@ public class BookieClient implements Per
             if (clients.size() == 0) {
                 return;
             }
-            executor.submit(new SafeRunnable() {
-                    @Override
-                    public void safeRun() {
-                        for (PerChannelBookieClientPool c : clients) {
-                            c.disconnect();
-                        }
-                    }
-                });
         } finally {
             closeLock.readLock().unlock();
         }
+        for (PerChannelBookieClientPool c : clients) {
+            c.disconnect(false);
+        }
     }
 
     public void addEntry(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
@@ -279,7 +276,7 @@ public class BookieClient implements Per
         try {
             closed = true;
             for (PerChannelBookieClientPool pool : channels.values()) {
-                pool.close();
+                pool.close(true);
             }
             channels.clear();
         } finally {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java Thu Jul 24 22:34:19 2014
@@ -20,6 +20,9 @@
  */
 package org.apache.bookkeeper.proto;
 
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.jboss.netty.buffer.ChannelBufferFactory;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
@@ -35,9 +38,38 @@ import org.slf4j.LoggerFactory;
 public class BookieProtoEncoding {
     private final static Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class);
 
-    public static class RequestEncoder extends OneToOneEncoder {
+    static final EnDecoder REQ_PREV3 = new RequestEnDeCoderPreV3();
+    static final EnDecoder REP_PREV3 = new ResponseEnDeCoderPreV3();
+    static final EnDecoder REQ_V3 = new RequestEnDecoderV3();
+    static final EnDecoder REP_V3 = new ResponseEnDecoderV3();
+
+    static interface EnDecoder {
+
+        /**
+         * Encode a <i>object</i> into channel buffer.
+         *
+         * @param object
+         *          object.
+         * @return encode buffer.
+         * @throws Exception
+         */
+        public Object encode(Object object, ChannelBufferFactory factory) throws Exception;
+
+        /**
+         * Decode a <i>packet</i> into an object.
+         *
+         * @param packet
+         *          received packet.
+         * @return parsed object.
+         * @throws Exception
+         */
+        public Object decode(ChannelBuffer packet) throws Exception;
+
+    }
+
+    static class RequestEnDeCoderPreV3 implements EnDecoder {
         @Override
-        public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+        public Object encode(Object msg, ChannelBufferFactory bufferFactory)
                 throws Exception {
             if (!(msg instanceof BookieProtocol.Request)) {
                 return msg;
@@ -47,7 +79,7 @@ public class BookieProtoEncoding {
                 BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest)r;
                 int totalHeaderSize = 4 // for the header
                     + BookieProtocol.MASTER_KEY_LENGTH; // for the master key
-                ChannelBuffer buf = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
+                ChannelBuffer buf = bufferFactory.getBuffer(totalHeaderSize);
                 buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt());
                 buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
                 return ChannelBuffers.wrappedBuffer(buf, ar.getData());
@@ -60,7 +92,7 @@ public class BookieProtoEncoding {
                     totalHeaderSize += BookieProtocol.MASTER_KEY_LENGTH;
                 }
 
-                ChannelBuffer buf = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
+                ChannelBuffer buf = bufferFactory.getBuffer(totalHeaderSize);
                 buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt());
                 buf.writeLong(r.getLedgerId());
                 buf.writeLong(r.getEntryId());
@@ -71,17 +103,10 @@ public class BookieProtoEncoding {
                 return buf;
             }
         }
-    }
 
-    public static class RequestDecoder extends OneToOneDecoder {
         @Override
-        public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
+        public Object decode(ChannelBuffer packet)
                 throws Exception {
-            if (!(msg instanceof ChannelBuffer)) {
-                return msg;
-            }
-            ChannelBuffer packet = (ChannelBuffer)msg;
-
             PacketHeader h = PacketHeader.fromInt(packet.readInt());
 
             // packet format is different between ADDENTRY and READENTRY
@@ -117,20 +142,19 @@ public class BookieProtoEncoding {
                     return new BookieProtocol.ReadRequest(h.getVersion(), ledgerId, entryId, flags);
                 }
             }
-            return msg;
+            return packet;
         }
     }
 
-    public static class ResponseEncoder extends OneToOneEncoder {
+    static class ResponseEnDeCoderPreV3 implements EnDecoder {
         @Override
-        public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+        public Object encode(Object msg, ChannelBufferFactory bufferFactory)
                 throws Exception {
             if (!(msg instanceof BookieProtocol.Response)) {
                 return msg;
             }
             BookieProtocol.Response r = (BookieProtocol.Response)msg;
-            ChannelBuffer buf = ctx.getChannel().getConfig().getBufferFactory()
-                .getBuffer(24);
+            ChannelBuffer buf = bufferFactory.getBuffer(24);
             buf.writeInt(new PacketHeader(r.getProtocolVersion(),
                                           r.getOpCode(), (short)0).toInt());
             buf.writeInt(r.getErrorCode());
@@ -153,17 +177,9 @@ public class BookieProtoEncoding {
                 return msg;
             }
         }
-    }
-
-    public static class ResponseDecoder extends OneToOneDecoder {
         @Override
-        public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
+        public Object decode(ChannelBuffer buffer)
                 throws Exception {
-            if (!(msg instanceof ChannelBuffer)) {
-                return msg;
-            }
-
-            final ChannelBuffer buffer = (ChannelBuffer)msg;
             final int rc;
             final long ledgerId, entryId;
             final PacketHeader header;
@@ -185,10 +201,130 @@ public class BookieProtoEncoding {
                                                            ledgerId, entryId);
                 }
             default:
-                LOG.error("Unexpected response of type {} received from {}",
-                          header.getOpCode(), channel.getRemoteAddress());
+                return buffer;
+            }
+        }
+    }
+
+    static class RequestEnDecoderV3 implements EnDecoder {
+
+        @Override
+        public Object decode(ChannelBuffer packet) throws Exception {
+            return BookkeeperProtocol.Request.parseFrom(new ChannelBufferInputStream(packet));
+        }
+
+        @Override
+        public Object encode(Object msg, ChannelBufferFactory factory) throws Exception {
+            BookkeeperProtocol.Request request = (BookkeeperProtocol.Request) msg;
+            return ChannelBuffers.wrappedBuffer(request.toByteArray());
+        }
+
+    }
+
+    static class ResponseEnDecoderV3 implements EnDecoder {
+
+        @Override
+        public Object decode(ChannelBuffer packet) throws Exception {
+            return BookkeeperProtocol.Response.parseFrom(new ChannelBufferInputStream(packet));
+        }
+
+        @Override
+        public Object encode(Object msg, ChannelBufferFactory factory) throws Exception {
+            BookkeeperProtocol.Response response = (BookkeeperProtocol.Response) msg;
+            return ChannelBuffers.wrappedBuffer(response.toByteArray());
+        }
+
+    }
+
+    public static class RequestEncoder extends OneToOneEncoder {
+
+        @Override
+        protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+                throws Exception {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Encode request {} to channel {}.", msg, channel);
+            }
+            if (msg instanceof BookkeeperProtocol.Request) {
+                return REQ_V3.encode(msg, ctx.getChannel().getConfig().getBufferFactory());
+            } else if (msg instanceof BookieProtocol.Request) {
+                return REQ_PREV3.encode(msg, ctx.getChannel().getConfig().getBufferFactory());
+            } else {
+                LOG.error("Invalid request to encode to {}: {}", channel, msg.getClass().getName());
                 return msg;
             }
         }
     }
+
+    public static class RequestDecoder extends OneToOneDecoder {
+
+        @Override
+        protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
+                throws Exception {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Received request {} from channel {} to decode.", msg, channel);
+            }
+            if (!(msg instanceof ChannelBuffer)) {
+                return msg;
+            }
+            ChannelBuffer buffer = (ChannelBuffer) msg;
+            try {
+                buffer.markReaderIndex();
+                try {
+                    return REQ_V3.decode(buffer);
+                } catch (InvalidProtocolBufferException e) {
+                    buffer.resetReaderIndex();
+                    return REQ_PREV3.decode(buffer);
+                }
+            } catch (Exception e) {
+                LOG.error("Failed to decode a request from {} : ", channel, e);
+                throw e;
+            }
+        }
+    }
+
+    public static class ResponseEncoder extends OneToOneEncoder {
+
+        @Override
+        protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+                throws Exception {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Encode response {} to channel {}.", msg, channel);
+            }
+            if (msg instanceof BookkeeperProtocol.Response) {
+                return REP_V3.encode(msg, ctx.getChannel().getConfig().getBufferFactory());
+            } else if (msg instanceof BookieProtocol.Response) {
+                return REP_PREV3.encode(msg, ctx.getChannel().getConfig().getBufferFactory());
+            } else {
+                LOG.error("Invalid response to encode to {}: {}", channel, msg.getClass().getName());
+                return msg;
+            }
+        }
+    }
+
+    public static class ResponseDecoder extends OneToOneDecoder {
+
+        @Override
+        protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
+                throws Exception {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Received response {} from channel {} to decode.", msg, channel);
+            }
+            if (!(msg instanceof ChannelBuffer)) {
+                return msg;
+            }
+            ChannelBuffer buffer = (ChannelBuffer) msg;
+            try {
+                buffer.markReaderIndex();
+                try {
+                    return REP_V3.decode(buffer);
+                } catch (InvalidProtocolBufferException e) {
+                    buffer.resetReaderIndex();
+                    return REP_PREV3.decode(buffer);
+                }
+            } catch (Exception e) {
+                LOG.error("Failed to decode a response from channel {} : ", channel, e);
+                throw e;
+            }
+        }
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java Thu Jul 24 22:34:19 2014
@@ -22,7 +22,6 @@ package org.apache.bookkeeper.proto;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.processor.RequestProcessor;
-import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ExceptionEvent;
@@ -71,6 +70,7 @@ class BookieRequestHandler extends Simpl
         LOG.debug("Channel connected {}", e);
     }
 
+    @Override
     public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
             throws Exception {
         LOG.debug("Channel disconnected {}", e);
@@ -78,14 +78,12 @@ class BookieRequestHandler extends Simpl
 
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-        if (!(e.getMessage() instanceof BookieProtocol.Request)) {
+        Object event = e.getMessage();
+        if (!(event instanceof BookkeeperProtocol.Request || event instanceof BookieProtocol.Request)) {
             ctx.sendUpstream(e);
             return;
         }
-        BookieProtocol.Request r = (BookieProtocol.Request)e.getMessage();
-        Channel c = ctx.getChannel();
-        requestProcessor.processRequest(r, c);
+        requestProcessor.processRequest(event, ctx.getChannel());
     }
 
-
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java Thu Jul 24 22:34:19 2014
@@ -21,26 +21,18 @@
 package org.apache.bookkeeper.proto;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.processor.RequestProcessor;
-import org.apache.bookkeeper.util.MathUtils;
 import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BookieRequestProcessor implements RequestProcessor, BookkeeperInternalCallbacks.WriteCallback {
+public class BookieRequestProcessor implements RequestProcessor {
 
     private final static Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class);
     /**
@@ -101,234 +93,83 @@ public class BookieRequestProcessor impl
     }
 
     @Override
-    public void processRequest(BookieProtocol.Request r, Channel c) {
-        if (r.getProtocolVersion() < BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION
-                        || r.getProtocolVersion() > BookieProtocol.CURRENT_PROTOCOL_VERSION) {
-            LOG.error("Invalid protocol version, expected something between "
-                            + BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION
-                            + " & " + BookieProtocol.CURRENT_PROTOCOL_VERSION
-                            + ". got " + r.getProtocolVersion());
-            c.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADVERSION, r));
-            return;
-        }
-
-        switch (r.getOpCode()) {
-        case BookieProtocol.ADDENTRY:
-            processAddRequest(r, c);
-            break;
-        case BookieProtocol.READENTRY:
-            processReadRequest(r, c);
-            break;
-        default:
-            LOG.error("Unknown op type {}, sending error", r.getOpCode());
-            c.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
-            if (statsEnabled) {
-                bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
+    public void processRequest(Object msg, Channel c) {
+        // If we can decode this packet as a Request protobuf packet, process
+        // it as a version 3 packet. Else, just use the old protocol.
+        if (msg instanceof BookkeeperProtocol.Request) {
+            BookkeeperProtocol.Request r = (BookkeeperProtocol.Request) msg;
+            BookkeeperProtocol.BKPacketHeader header = r.getHeader();
+            switch (header.getOperation()) {
+                case ADD_ENTRY:
+                    processAddRequestV3(r, c);
+                    break;
+                case READ_ENTRY:
+                    processReadRequestV3(r, c);
+                    break;
+                default:
+                    BookkeeperProtocol.Response.Builder response =
+                            BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
+                            .setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
+                    c.write(response.build());
+                    if (statsEnabled) {
+                        bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
+                    }
+                    break;
             }
-            break;
-        }
-    }
-
-    class AddCtx {
-        final Channel c;
-        final BookieProtocol.AddRequest r;
-        final long startTime;
-
-        AddCtx(Channel c, BookieProtocol.AddRequest r) {
-            this.c = c;
-            this.r = r;
-
-            if (statsEnabled) {
-                startTime = MathUtils.now();
-            } else {
-                startTime = 0;
+        } else {
+            BookieProtocol.Request r = (BookieProtocol.Request) msg;
+            // process packet
+            switch (r.getOpCode()) {
+                case BookieProtocol.ADDENTRY:
+                    processAddRequest(r, c);
+                    break;
+                case BookieProtocol.READENTRY:
+                    processReadRequest(r, c);
+                    break;
+                default:
+                    LOG.error("Unknown op type {}, sending error", r.getOpCode());
+                    c.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
+                    if (statsEnabled) {
+                        bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
+                    }
+                    break;
             }
         }
     }
 
-    private void processAddRequest(final BookieProtocol.Request r, final Channel c) {
+    private void processAddRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
+        WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, bookie);
         if (null == writeThreadPool) {
-            handleAdd(r, c);
+            write.run();
         } else {
-            writeThreadPool.submit(new Runnable() {
-                @Override
-                public void run() {
-                    handleAdd(r, c);
-                }
-            });
+            writeThreadPool.submit(write);
         }
     }
 
-    private void handleAdd(BookieProtocol.Request r, Channel c) {
-        assert (r instanceof BookieProtocol.AddRequest);
-        BookieProtocol.AddRequest add = (BookieProtocol.AddRequest) r;
-
-        if (bookie.isReadOnly()) {
-            LOG.warn("BookieServer is running as readonly mode,"
-                            + " so rejecting the request from the client!");
-            c.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, add));
-            if (statsEnabled) {
-                bkStats.getOpStats(BKStats.STATS_ADD).incrementFailedOps();
-            }
-            return;
-        }
-
-        int rc = BookieProtocol.EOK;
-        try {
-            if (add.isRecoveryAdd()) {
-                bookie.recoveryAddEntry(add.getDataAsByteBuffer(), this, new AddCtx(c, add),
-                                add.getMasterKey());
-            } else {
-                bookie.addEntry(add.getDataAsByteBuffer(),
-                                this, new AddCtx(c, add), add.getMasterKey());
-            }
-        } catch (IOException e) {
-            LOG.error("Error writing " + add, e);
-            rc = BookieProtocol.EIO;
-        } catch (BookieException.LedgerFencedException lfe) {
-            LOG.error("Attempt to write to fenced ledger", lfe);
-            rc = BookieProtocol.EFENCED;
-        } catch (BookieException e) {
-            LOG.error("Unauthorized access to ledger " + add.getLedgerId(), e);
-            rc = BookieProtocol.EUA;
-        }
-        if (rc != BookieProtocol.EOK) {
-            c.write(ResponseBuilder.buildErrorResponse(rc, add));
-            if (statsEnabled) {
-                bkStats.getOpStats(BKStats.STATS_ADD).incrementFailedOps();
-            }
+    private void processReadRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
+        ReadEntryProcessorV3 read = new ReadEntryProcessorV3(r, c, bookie);
+        if (null == readThreadPool) {
+            read.run();
+        } else {
+            readThreadPool.submit(read);
         }
     }
 
-    @Override
-    public void writeComplete(int rc, long ledgerId, long entryId,
-                              BookieSocketAddress addr, Object ctx) {
-        assert (ctx instanceof AddCtx);
-        AddCtx addctx = (AddCtx) ctx;
-        addctx.c.write(ResponseBuilder.buildAddResponse(addctx.r));
-
-        if (statsEnabled) {
-            // compute the latency
-            if (0 == rc) {
-                // for add operations, we compute latency in writeComplete callbacks.
-                long elapsedTime = MathUtils.now() - addctx.startTime;
-                bkStats.getOpStats(BKStats.STATS_ADD).updateLatency(elapsedTime);
-            } else {
-                bkStats.getOpStats(BKStats.STATS_ADD).incrementFailedOps();
-            }
+    private void processAddRequest(final BookieProtocol.Request r, final Channel c) {
+        WriteEntryProcessor write = new WriteEntryProcessor(r, c, bookie);
+        if (null == writeThreadPool) {
+            write.run();
+        } else {
+            writeThreadPool.submit(write);
         }
     }
 
     private void processReadRequest(final BookieProtocol.Request r, final Channel c) {
+        ReadEntryProcessor read = new ReadEntryProcessor(r, c, bookie);
         if (null == readThreadPool) {
-            handleRead(r, c);
-        } else {
-            readThreadPool.submit(new Runnable() {
-                @Override
-                public void run() {
-                    handleRead(r, c);
-                }
-            });
-        }
-    }
-
-    private void handleRead(BookieProtocol.Request r, Channel c) {
-        assert (r instanceof BookieProtocol.ReadRequest);
-        BookieProtocol.ReadRequest read = (BookieProtocol.ReadRequest) r;
-
-        LOG.debug("Received new read request: {}", r);
-        int errorCode = BookieProtocol.EIO;
-        long startTime = 0;
-        if (statsEnabled) {
-            startTime = MathUtils.now();
-        }
-        ByteBuffer data = null;
-        try {
-            Future<Boolean> fenceResult = null;
-            if (read.isFencingRequest()) {
-                LOG.warn("Ledger " + r.getLedgerId() + " fenced by " + c.getRemoteAddress());
-
-                if (read.hasMasterKey()) {
-                    fenceResult = bookie.fenceLedger(read.getLedgerId(), read.getMasterKey());
-                } else {
-                    LOG.error("Password not provided, Not safe to fence {}", read.getLedgerId());
-                    if (statsEnabled) {
-                        bkStats.getOpStats(BKStats.STATS_READ).incrementFailedOps();
-                    }
-                    throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
-                }
-            }
-            data = bookie.readEntry(r.getLedgerId(), r.getEntryId());
-            LOG.debug("##### Read entry ##### {}", data.remaining());
-            if (null != fenceResult) {
-                // TODO:
-                // currently we don't have readCallback to run in separated read
-                // threads. after BOOKKEEPER-429 is complete, we could improve
-                // following code to make it not wait here
-                //
-                // For now, since we only try to wait after read entry. so writing
-                // to journal and read entry are executed in different thread
-                // it would be fine.
-                try {
-                    Boolean fenced = fenceResult.get(1000, TimeUnit.MILLISECONDS);
-                    if (null == fenced || !fenced) {
-                        // if failed to fence, fail the read request to make it retry.
-                        errorCode = BookieProtocol.EIO;
-                        data = null;
-                    } else {
-                        errorCode = BookieProtocol.EOK;
-                    }
-                } catch (InterruptedException ie) {
-                    LOG.error("Interrupting fence read entry " + read, ie);
-                    errorCode = BookieProtocol.EIO;
-                    data = null;
-                } catch (ExecutionException ee) {
-                    LOG.error("Failed to fence read entry " + read, ee);
-                    errorCode = BookieProtocol.EIO;
-                    data = null;
-                } catch (TimeoutException te) {
-                    LOG.error("Timeout to fence read entry " + read, te);
-                    errorCode = BookieProtocol.EIO;
-                    data = null;
-                }
-            } else {
-                errorCode = BookieProtocol.EOK;
-            }
-        } catch (Bookie.NoLedgerException e) {
-            if (LOG.isTraceEnabled()) {
-                LOG.error("Error reading " + read, e);
-            }
-            errorCode = BookieProtocol.ENOLEDGER;
-        } catch (Bookie.NoEntryException e) {
-            if (LOG.isTraceEnabled()) {
-                LOG.error("Error reading " + read, e);
-            }
-            errorCode = BookieProtocol.ENOENTRY;
-        } catch (IOException e) {
-            if (LOG.isTraceEnabled()) {
-                LOG.error("Error reading " + read, e);
-            }
-            errorCode = BookieProtocol.EIO;
-        } catch (BookieException e) {
-            LOG.error("Unauthorized access to ledger " + read.getLedgerId(), e);
-            errorCode = BookieProtocol.EUA;
-        }
-
-        LOG.trace("Read entry rc = {} for {}",
-                        new Object[] { errorCode, read });
-        if (errorCode == BookieProtocol.EOK) {
-            assert data != null;
-
-            c.write(ResponseBuilder.buildReadResponse(data, read));
-            if (statsEnabled) {
-                long elapsedTime = MathUtils.now() - startTime;
-                bkStats.getOpStats(BKStats.STATS_READ).updateLatency(elapsedTime);
-            }
+            read.run();
         } else {
-            c.write(ResponseBuilder.buildErrorResponse(errorCode, read));
-            if (statsEnabled) {
-                bkStats.getOpStats(BKStats.STATS_READ).incrementFailedOps();
-            }
+            readThreadPool.submit(read);
         }
     }