You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2014/07/25 00:34:20 UTC

svn commit: r1613315 [3/4] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/ bookkeeper-server/src/main/java/org/apache...

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java Thu Jul 24 22:34:19 2014
@@ -52,8 +52,6 @@ class DefaultPerChannelBookieClientPool 
         this.clients = new PerChannelBookieClient[coreSize];
         for (int i = 0; i < coreSize; i++) {
             this.clients[i] = factory.create(address);
-            // connect proactively
-            this.clients[i].connectIfNeededAndDoOp(this);
         }
     }
 
@@ -63,6 +61,13 @@ class DefaultPerChannelBookieClientPool 
     }
 
     @Override
+    public void intialize() {
+        for (PerChannelBookieClient pcbc : this.clients) {
+            pcbc.connectIfNeededAndDoOp(this);
+        }
+    }
+
+    @Override
     public void obtain(GenericCallback<PerChannelBookieClient> callback) {
         if (1 == clients.length) {
             clients[0].connectIfNeededAndDoOp(callback);
@@ -73,16 +78,16 @@ class DefaultPerChannelBookieClientPool 
     }
 
     @Override
-    public void disconnect() {
+    public void disconnect(boolean wait) {
         for (PerChannelBookieClient pcbc : clients) {
-            pcbc.disconnect();
+            pcbc.disconnect(wait);
         }
     }
 
     @Override
-    public void close() {
+    public void close(boolean wait) {
         for (PerChannelBookieClient pcbc : clients) {
-            pcbc.close();
+            pcbc.close(wait);
         }
     }
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java?rev=1613315&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java Thu Jul 24 22:34:19 2014
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookieProtocol.Request;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class PacketProcessorBase implements Runnable {
+    private final static Logger logger = LoggerFactory.getLogger(PacketProcessorBase.class);
+    final Request request;
+    final Channel channel;
+    final Bookie bookie;
+
+    PacketProcessorBase(Request request, Channel channel, Bookie bookie) {
+        this.request = request;
+        this.channel = channel;
+        this.bookie = bookie;
+    }
+
+    protected boolean isVersionCompatible() {
+        byte version = request.getProtocolVersion();
+        if (version < BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION
+                || version > BookieProtocol.CURRENT_PROTOCOL_VERSION) {
+            logger.error("Invalid protocol version, expected something between "
+                    + BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION
+                    + " & " + BookieProtocol.CURRENT_PROTOCOL_VERSION
+                    + ". got " + request.getProtocolVersion());
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void run() {
+        if (!isVersionCompatible()) {
+            channel.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADVERSION, request));
+            return;
+        }
+        processPacket();
+    }
+
+    protected abstract void processPacket();
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java?rev=1613315&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java Thu Jul 24 22:34:19 2014
@@ -0,0 +1,58 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.jboss.netty.channel.Channel;
+
+public abstract class PacketProcessorBaseV3 {
+
+    final Request request;
+    final Channel channel;
+    final Bookie  bookie;
+
+    public PacketProcessorBaseV3(Request request, Channel channel, Bookie bookie) {
+        this.request = request;
+        this.channel = channel;
+        this.bookie = bookie;
+    }
+
+    protected boolean isVersionCompatible() {
+        return this.request.getHeader().getVersion().equals(ProtocolVersion.VERSION_THREE);
+    }
+
+    /**
+     * Build a header with protocol version 3 and the operation type same as what was in the
+     * request.
+     * @return
+     */
+    protected BKPacketHeader getHeader() {
+        BKPacketHeader.Builder header = BKPacketHeader.newBuilder();
+        header.setVersion(ProtocolVersion.VERSION_THREE);
+        header.setOperation(request.getHeader().getOperation());
+        header.setTxnId(request.getHeader().getTxnId());
+        return header.build();
+    }
+
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Thu Jul 24 22:34:19 2014
@@ -23,12 +23,24 @@ import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.protobuf.ByteString;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperClientStats;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -40,6 +52,7 @@ import org.apache.bookkeeper.util.Ordere
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
@@ -73,6 +86,7 @@ public class PerChannelBookieClient exte
     static final Logger LOG = LoggerFactory.getLogger(PerChannelBookieClient.class);
 
     public static final int MAX_FRAME_LENGTH = 2 * 1024 * 1024; // 2M
+    public static final AtomicLong txnIdGenerator = new AtomicLong(0);
 
     final BookieSocketAddress addr;
     final ClientSocketChannelFactory channelFactory;
@@ -81,8 +95,7 @@ public class PerChannelBookieClient exte
     final int addEntryTimeout;
     final int readEntryTimeout;
 
-    ConcurrentHashMap<CompletionKey, AddCompletion> addCompletions = new ConcurrentHashMap<CompletionKey, AddCompletion>();
-    ConcurrentHashMap<CompletionKey, ReadCompletion> readCompletions = new ConcurrentHashMap<CompletionKey, ReadCompletion>();
+    private final ConcurrentHashMap<CompletionKey, CompletionValue> completionObjects = new ConcurrentHashMap<CompletionKey, CompletionValue>();
 
     private final StatsLogger statsLogger;
     private final OpStatsLogger readEntryOpLogger;
@@ -150,7 +163,7 @@ public class PerChannelBookieClient exte
     }
 
     private void connect() {
-        LOG.info("Connecting to bookie: {}", addr);
+        LOG.debug("Connecting to bookie: {}", addr);
 
         // Set up the ClientBootStrap so we can create a new Channel connection
         // to the bookie.
@@ -277,19 +290,41 @@ public class PerChannelBookieClient exte
      */
     void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, WriteCallback cb,
                   Object ctx, final int options) {
-        BookieProtocol.AddRequest r = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
-                ledgerId, entryId, (short)options, masterKey, toSend);
+        final long txnId = getTxnId();
         final int entrySize = toSend.readableBytes();
-        final CompletionKey completionKey = new CompletionKey(ledgerId, entryId, BookieProtocol.ADDENTRY);
-        addCompletions.put(completionKey,
-                new AddCompletion(addEntryOpLogger, cb, ctx, scheduleTimeout(completionKey, addEntryTimeout)));
+        final CompletionKey completionKey = new CompletionKey(txnId, OperationType.ADD_ENTRY);
+        completionObjects.put(completionKey,
+                new AddCompletion(addEntryOpLogger, cb, ctx, ledgerId, entryId,
+                                  scheduleTimeout(completionKey, addEntryTimeout)));
+
+        // Build the request and calculate the total size to be included in the packet.
+        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.ADD_ENTRY)
+                .setTxnId(txnId);
+
+        AddRequest.Builder addBuilder = AddRequest.newBuilder()
+                .setLedgerId(ledgerId)
+                .setEntryId(entryId)
+                .setMasterKey(ByteString.copyFrom(masterKey))
+                .setBody(ByteString.copyFrom(toSend.toByteBuffer()));
+
+        if (((short)options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) {
+            addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
+        }
+
+        final Request addRequest = Request.newBuilder()
+                .setHeader(headerBuilder)
+                .setAddRequest(addBuilder)
+                .build();
+
         final Channel c = channel;
         if (c == null) {
             errorOutAddKey(completionKey);
             return;
         }
         try {
-            ChannelFuture future = c.write(r);
+            ChannelFuture future = c.write(addRequest);
             future.addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) throws Exception {
@@ -317,82 +352,110 @@ public class PerChannelBookieClient exte
     public void readEntryAndFenceLedger(final long ledgerId, byte[] masterKey,
                                         final long entryId,
                                         ReadEntryCallback cb, Object ctx) {
-        final CompletionKey key = new CompletionKey(ledgerId, entryId, BookieProtocol.READENTRY);
-        readCompletions.put(key,
-                new ReadCompletion(readEntryOpLogger, cb, ctx, scheduleTimeout(key, readEntryTimeout)));
-
-        final BookieProtocol.ReadRequest r = new BookieProtocol.ReadRequest(
-                BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
-                BookieProtocol.FLAG_DO_FENCING, masterKey);
+        final long txnId = getTxnId();
+        final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
+        completionObjects.put(completionKey,
+                new ReadCompletion(readEntryOpLogger, cb, ctx, ledgerId, entryId,
+                                   scheduleTimeout(completionKey, readEntryTimeout)));
+
+        // Build the request and calculate the total size to be included in the packet.
+        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.READ_ENTRY)
+                .setTxnId(txnId);
+
+        ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
+                .setLedgerId(ledgerId)
+                .setEntryId(entryId)
+                .setMasterKey(ByteString.copyFrom(masterKey))
+                .setFlag(ReadRequest.Flag.FENCE_LEDGER);
+
+        final Request readRequest = Request.newBuilder()
+                .setHeader(headerBuilder)
+                .setReadRequest(readBuilder)
+                .build();
 
         final Channel c = channel;
         if (c == null) {
-            errorOutReadKey(key);
+            errorOutReadKey(completionKey);
             return;
         }
 
         try {
-            ChannelFuture future = c.write(r);
+            ChannelFuture future = c.write(readRequest);
             future.addListener(new ChannelFutureListener() {
                     @Override
                     public void operationComplete(ChannelFuture future) throws Exception {
                         if (future.isSuccess()) {
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug("Successfully wrote request {} to {}",
-                                          r, c.getRemoteAddress());
+                                          readRequest, c.getRemoteAddress());
                             }
                         } else {
                             if (!(future.getCause() instanceof ClosedChannelException)) {
                                 LOG.warn("Writing readEntryAndFenceLedger(lid={}, eid={}) to channel {} failed : ",
                                         new Object[] { ledgerId, entryId, c, future.getCause() });
                             }
-                            errorOutReadKey(key);
+                            errorOutReadKey(completionKey);
                         }
                     }
                 });
         } catch(Throwable e) {
-            LOG.warn("Read entry operation " + r + " failed", e);
-            errorOutReadKey(key);
+            LOG.warn("Read entry operation {} failed", completionKey, e);
+            errorOutReadKey(completionKey);
         }
     }
 
     public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) {
-        final CompletionKey key = new CompletionKey(ledgerId, entryId, BookieProtocol.READENTRY);
-        readCompletions.put(key,
-                new ReadCompletion(readEntryOpLogger, cb, ctx, scheduleTimeout(key, readEntryTimeout)));
-
-        final BookieProtocol.ReadRequest r = new BookieProtocol.ReadRequest(
-                BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
-                BookieProtocol.FLAG_NONE);
+        final long txnId = getTxnId();
+        final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
+        completionObjects.put(completionKey,
+                new ReadCompletion(readEntryOpLogger, cb, ctx, ledgerId, entryId,
+                                   scheduleTimeout(completionKey, readEntryTimeout)));
+
+        // Build the request and calculate the total size to be included in the packet.
+        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.READ_ENTRY)
+                .setTxnId(txnId);
+
+        ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
+                .setLedgerId(ledgerId)
+                .setEntryId(entryId);
+
+        final Request readRequest = Request.newBuilder()
+                .setHeader(headerBuilder)
+                .setReadRequest(readBuilder)
+                .build();
 
         final Channel c = channel;
         if (c == null) {
-            errorOutReadKey(key);
+            errorOutReadKey(completionKey);
             return;
         }
 
         try{
-            ChannelFuture future = c.write(r);
+            ChannelFuture future = c.write(readRequest);
             future.addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) throws Exception {
                     if (future.isSuccess()) {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Successfully wrote request {} to {}",
-                                      r, c.getRemoteAddress());
+                                      readRequest, c.getRemoteAddress());
                         }
                     } else {
                         if (!(future.getCause() instanceof ClosedChannelException)) {
                             LOG.warn("Writing readEntry(lid={}, eid={}) to channel {} failed : ",
                                     new Object[] { ledgerId, entryId, c, future.getCause() });
                         }
-                        errorOutReadKey(key);
+                        errorOutReadKey(completionKey);
                     }
                 }
             });
         } catch(Throwable e) {
-            LOG.warn("Read entry operation " + r + " failed", e);
-            errorOutReadKey(key);
+            LOG.warn("Read entry operation {} failed", readRequest, e);
+            errorOutReadKey(completionKey);
         }
     }
 
@@ -400,13 +463,23 @@ public class PerChannelBookieClient exte
      * Disconnects the bookie client. It can be reused.
      */
     public void disconnect() {
-        closeInternal(false);
+        disconnect(true);
+    }
+
+    public void disconnect(boolean wait) {
+        LOG.info("Disconnecting the per channel bookie client for {}", addr);
+        closeInternal(false, wait);
     }
 
     /**
      * Closes the bookie client permanently. It cannot be reused.
      */
     public void close() {
+        close(true);
+    }
+
+    public void close(boolean wait) {
+        LOG.info("Closing the per channel bookie client for {}", addr);
         closeLock.writeLock().lock();
         try {
             if (ConnectionState.CLOSED == state) {
@@ -417,10 +490,10 @@ public class PerChannelBookieClient exte
         } finally {
             closeLock.writeLock().unlock();
         }
-        closeInternal(true);
+        closeInternal(true, wait);
     }
 
-    private void closeInternal(boolean permanent) {
+    private void closeInternal(boolean permanent, boolean wait) {
         Channel toClose = null;
         synchronized (this) {
             if (permanent) {
@@ -432,7 +505,10 @@ public class PerChannelBookieClient exte
             channel = null;
         }
         if (toClose != null) {
-            closeChannel(toClose).awaitUninterruptibly();
+            ChannelFuture cf = closeChannel(toClose);
+            if (wait) {
+                cf.awaitUninterruptibly();
+            }
         }
     }
 
@@ -450,27 +526,25 @@ public class PerChannelBookieClient exte
     }
 
     void errorOutReadKey(final CompletionKey key, final int rc) {
-        executor.submitOrdered(key.ledgerId, new SafeRunnable() {
+        final ReadCompletion readCompletion = (ReadCompletion)completionObjects.remove(key);
+        if (null == readCompletion) {
+            return;
+        }
+        executor.submitOrdered(readCompletion.ledgerId, new SafeRunnable() {
             @Override
             public void safeRun() {
-
-                ReadCompletion readCompletion = readCompletions.remove(key);
                 String bAddress = "null";
                 Channel c = channel;
-                if(c != null) {
+                if (c != null) {
                     bAddress = c.getRemoteAddress().toString();
                 }
 
-                if (readCompletion != null) {
-                    LOG.debug("Could not write request for reading entry: {}"
-                              + " ledger-id: {} bookie: {}",
-                              new Object[] { key.entryId, key.ledgerId, bAddress });
+                LOG.debug("Could not write request for reading entry: {} ledger-id: {} bookie: {}",
+                          new Object[]{ readCompletion.entryId, readCompletion.ledgerId, bAddress });
 
-                    readCompletion.cb.readEntryComplete(rc,
-                                                        key.ledgerId, key.entryId, null, readCompletion.ctx);
-                }
+                readCompletion.cb.readEntryComplete(rc, readCompletion.ledgerId, readCompletion.entryId,
+                                                    null, readCompletion.ctx);
             }
-
         });
     }
 
@@ -479,29 +553,26 @@ public class PerChannelBookieClient exte
     }
 
     void errorOutAddKey(final CompletionKey key, final int rc) {
-        executor.submitOrdered(key.ledgerId, new SafeRunnable() {
+        final AddCompletion addCompletion = (AddCompletion)completionObjects.remove(key);
+        if (null == addCompletion) {
+            return;
+        }
+        executor.submitOrdered(addCompletion.ledgerId, new SafeRunnable() {
             @Override
             public void safeRun() {
-
-                AddCompletion addCompletion = addCompletions.remove(key);
-
-                if (addCompletion != null) {
-                    String bAddress = "null";
-                    Channel c = channel;
-                    if(c != null) {
-                        bAddress = c.getRemoteAddress().toString();
-                    }
-                    LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie: {}",
-                              new Object[] { key.entryId, key.ledgerId, bAddress });
-
-                    addCompletion.cb.writeComplete(rc, key.ledgerId,
-                                                   key.entryId, addr, addCompletion.ctx);
-                    LOG.debug("Invoked callback method: {}", key.entryId);
+                String bAddress = "null";
+                Channel c = channel;
+                if(c != null) {
+                    bAddress = c.getRemoteAddress().toString();
                 }
-            }
+                LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie: {}",
+                          new Object[] { addCompletion.entryId, addCompletion.ledgerId, bAddress });
 
+                addCompletion.cb.writeComplete(rc, addCompletion.ledgerId, addCompletion.entryId,
+                                               addr, addCompletion.ctx);
+                LOG.debug("Invoked callback method: {}", addCompletion.entryId);
+            }
         });
-
     }
 
     /**
@@ -519,13 +590,17 @@ public class PerChannelBookieClient exte
         // in case they get a write failure on the socket. The one who
         // successfully removes the key from the map is the one responsible for
         // calling the application callback.
-
-        for (CompletionKey key : addCompletions.keySet()) {
-            errorOutAddKey(key, rc);
-        }
-
-        for (CompletionKey key : readCompletions.keySet()) {
-            errorOutReadKey(key, rc);
+        for (CompletionKey key : completionObjects.keySet()) {
+            switch (key.operationType) {
+                case ADD_ENTRY:
+                    errorOutAddKey(key, rc);
+                    break;
+                case READ_ENTRY:
+                    errorOutReadKey(key, rc);
+                    break;
+                default:
+                    break;
+            }
         }
     }
 
@@ -543,8 +618,6 @@ public class PerChannelBookieClient exte
         pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
         pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder());
         pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.ResponseDecoder());
-
-
         pipeline.addLast("mainhandler", this);
         return pipeline;
     }
@@ -609,125 +682,101 @@ public class PerChannelBookieClient exte
      */
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-        if (!(e.getMessage() instanceof BookieProtocol.Response)) {
+        if (!(e.getMessage() instanceof Response)) {
             ctx.sendUpstream(e);
             return;
         }
-        final BookieProtocol.Response r = (BookieProtocol.Response)e.getMessage();
 
-        executor.submitOrdered(r.getLedgerId(), new SafeRunnable() {
-            @Override
-            public void safeRun() {
-                switch (r.getOpCode()) {
-                case BookieProtocol.ADDENTRY:
-                    BookieProtocol.AddResponse a = (BookieProtocol.AddResponse)r;
-                    handleAddResponse(a);
-                    break;
-                case BookieProtocol.READENTRY:
-                    BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse)r;
-                    handleReadResponse(rr);
-                    break;
-                default:
-                    LOG.error("Unexpected response, type: {}", r);
-                }
+        final Response response = (Response) e.getMessage();
+        final BKPacketHeader header = response.getHeader();
+
+        final CompletionValue completionValue = completionObjects.remove(newCompletionKey(header.getTxnId(),
+                header.getOperation()));
+
+        if (null == completionValue) {
+            // Unexpected response, so log it. The txnId should have been present.
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Unexpected response received from bookie : " + addr + " for type : " + header.             getOperation() +
+                        " and txnId : " + header.getTxnId());
             }
-        });
+
+        } else {
+            long orderingKey = completionValue.ledgerId;
+            executor.submitOrdered(orderingKey, new SafeRunnable() {
+                @Override
+                public void safeRun() {
+                    OperationType type = header.getOperation();
+                    switch (type) {
+                        case ADD_ENTRY:
+                            handleAddResponse(response.getAddResponse(), completionValue);
+                            break;
+                        case READ_ENTRY:
+                            handleReadResponse(response.getReadResponse(), completionValue);
+                            break;
+                        default:
+                            LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring",
+                                      type, addr);
+                            break;
+                    }
+                }
+            });
+        }
     }
 
-    void handleAddResponse(BookieProtocol.AddResponse a) {
+    void handleAddResponse(AddResponse response, CompletionValue completionValue) {
+        // The completion value should always be an instance of an AddCompletion object when we reach here.
+        AddCompletion ac = (AddCompletion)completionValue;
+
+        long ledgerId = response.getLedgerId();
+        long entryId = response.getEntryId();
+        StatusCode status = response.getStatus();
+
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Got response for add request from bookie: {} for ledger: {}", addr, a);
+            LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
+                    + entryId + " rc: " + status);
         }
-
         // convert to BKException code because thats what the uppper
         // layers expect. This is UGLY, there should just be one set of
         // error codes.
-        int rc = BKException.Code.WriteException;
-        switch (a.getErrorCode()) {
-        case BookieProtocol.EOK:
-            rc = BKException.Code.OK;
-            break;
-        case BookieProtocol.EBADVERSION:
-            rc = BKException.Code.ProtocolVersionException;
-            break;
-        case BookieProtocol.EFENCED:
-            rc = BKException.Code.LedgerFencedException;
-            break;
-        case BookieProtocol.EUA:
-            rc = BKException.Code.UnauthorizedAccessException;
-            break;
-        case BookieProtocol.EREADONLY:
-            rc = BKException.Code.WriteOnReadOnlyBookieException;
-            break;
-        default:
-            LOG.warn("Add failed {}", a);
-            rc = BKException.Code.WriteException;
-            break;
-        }
-
-        AddCompletion ac;
-        ac = addCompletions.remove(new CompletionKey(a.getLedgerId(),
-                                                     a.getEntryId(), BookieProtocol.ADDENTRY));
-        if (ac == null) {
-            LOG.debug("Unexpected add response from bookie {} for {}", addr, a);
-            return;
+        Integer rcToRet = statusCodeToExceptionCode(status);
+        if (null == rcToRet) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Add for ledger: " + ledgerId + ", entry: " + entryId + " failed on bookie: " + addr
+                        + " with code:" + status);
+            }
+            rcToRet = BKException.Code.WriteException;
         }
-
-        ac.cb.writeComplete(rc, a.getLedgerId(), a.getEntryId(), addr, ac.ctx);
+        ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx);
     }
 
-    void handleReadResponse(BookieProtocol.ReadResponse rr) {
+    void handleReadResponse(ReadResponse response, CompletionValue completionValue) {
+        // The completion value should always be an instance of a ReadCompletion object when we reach here.
+        ReadCompletion rc = (ReadCompletion)completionValue;
+
+        long ledgerId = response.getLedgerId();
+        long entryId = response.getEntryId();
+        StatusCode status = response.getStatus();
+        ChannelBuffer buffer = ChannelBuffers.buffer(0);
+
+        if (response.hasBody()) {
+            buffer = ChannelBuffers.copiedBuffer(response.getBody().asReadOnlyByteBuffer());
+        }
+
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Got response for read request {} entry length: {}", rr,
-                    rr.getData() != null ? rr.getData().readableBytes() : -1);
+            LOG.debug("Got response for read request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
+                    + entryId + " rc: " + rc + " entry length: " + buffer.readableBytes());
         }
 
         // convert to BKException code because thats what the uppper
         // layers expect. This is UGLY, there should just be one set of
         // error codes.
-        int rc = BKException.Code.ReadException;
-        switch (rr.getErrorCode()) {
-        case BookieProtocol.EOK:
-            rc = BKException.Code.OK;
-            break;
-        case BookieProtocol.ENOENTRY:
-        case BookieProtocol.ENOLEDGER:
-            rc = BKException.Code.NoSuchEntryException;
-            break;
-        case BookieProtocol.EBADVERSION:
-            rc = BKException.Code.ProtocolVersionException;
-            break;
-        case BookieProtocol.EUA:
-            rc = BKException.Code.UnauthorizedAccessException;
-            break;
-        default:
-            LOG.warn("Read error for {}", rr);
-            rc = BKException.Code.ReadException;
-            break;
-        }
-
-        CompletionKey key = new CompletionKey(rr.getLedgerId(), rr.getEntryId(), BookieProtocol.READENTRY);
-        ReadCompletion readCompletion = readCompletions.remove(key);
-
-        if (readCompletion == null) {
-            /*
-             * This is a special case. When recovering a ledger, a client
-             * submits a read request with id -1, and receives a response with a
-             * different entry id.
-             */
-
-            readCompletion = readCompletions.remove(new CompletionKey(rr.getLedgerId(),
-                                                                      BookieProtocol.LAST_ADD_CONFIRMED,
-                                                                      BookieProtocol.READENTRY));
+        Integer rcToRet = statusCodeToExceptionCode(status);
+        if (null == rcToRet) {
+            LOG.error("Read entry for ledger:{}, entry:{} failed on bookie:{} with code:{}",
+                      new Object[] { ledgerId, entryId, addr, status });
+            rcToRet = BKException.Code.ReadException;
         }
-
-        if (readCompletion == null) {
-            LOG.debug("Unexpected read response received from bookie: {} for {}", addr, rr);
-            return;
-        }
-
-        readCompletion.cb.readEntryComplete(rc, rr.getLedgerId(), rr.getEntryId(),
-                                            rr.getData(), readCompletion.ctx);
+        rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer.slice(), rc.ctx);
     }
 
     /**
@@ -738,10 +787,15 @@ public class PerChannelBookieClient exte
     // visible for testing
     static abstract class CompletionValue {
         final Object ctx;
+        protected final long ledgerId;
+        protected final long entryId;
         protected final Timeout timeout;
 
-        public CompletionValue(Object ctx, Timeout timeout) {
+        public CompletionValue(Object ctx, long ledgerId, long entryId,
+                               Timeout timeout) {
             this.ctx = ctx;
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
             this.timeout = timeout;
         }
 
@@ -756,14 +810,16 @@ public class PerChannelBookieClient exte
     static class ReadCompletion extends CompletionValue {
         final ReadEntryCallback cb;
 
-        public ReadCompletion(ReadEntryCallback cb, Object ctx) {
-            this(null, cb, ctx, null);
+        public ReadCompletion(ReadEntryCallback cb, Object ctx,
+                              long ledgerId, long entryId) {
+            this(null, cb, ctx, ledgerId, entryId, null);
         }
 
         public ReadCompletion(final OpStatsLogger readEntryOpLogger,
                               final ReadEntryCallback originalCallback,
-                              final Object originalCtx, final Timeout timeout) {
-            super(originalCtx, timeout);
+                              final Object originalCtx, final long ledgerId, final long entryId,
+                              final Timeout timeout) {
+            super(originalCtx, ledgerId, entryId, timeout);
             final long requestTimeMillis = MathUtils.now();
             this.cb = null == readEntryOpLogger ? originalCallback : new ReadEntryCallback() {
                 @Override
@@ -785,15 +841,16 @@ public class PerChannelBookieClient exte
     static class AddCompletion extends CompletionValue {
         final WriteCallback cb;
 
-        public AddCompletion(WriteCallback cb, Object ctx) {
-            this(null, cb, ctx, null);
+        public AddCompletion(WriteCallback cb, Object ctx,
+                             long ledgerId, long entryId) {
+            this(null, cb, ctx, ledgerId, entryId, null);
         }
 
         public AddCompletion(final OpStatsLogger addEntryOpLogger,
                              final WriteCallback originalCallback,
-                             final Object originalCtx,
+                             final Object originalCtx, final long ledgerId, final long entryId,
                              final Timeout timeout) {
-            super(originalCtx, timeout);
+            super(originalCtx, ledgerId, entryId, timeout);
             final long requestTimeMillis = MathUtils.now();
             this.cb = null == addEntryOpLogger ? originalCallback : new WriteCallback() {
                 @Override
@@ -812,8 +869,8 @@ public class PerChannelBookieClient exte
     }
 
     // visable for testing
-    CompletionKey newCompletionKey(long ledgerId, long entryId, byte operationType) {
-        return new CompletionKey(ledgerId, entryId, operationType);
+    CompletionKey newCompletionKey(long txnId, OperationType operationType) {
+        return new CompletionKey(txnId, operationType);
     }
 
     Timeout scheduleTimeout(CompletionKey key, long timeout) {
@@ -825,16 +882,14 @@ public class PerChannelBookieClient exte
     }
 
     class CompletionKey implements TimerTask {
-        final long ledgerId;
-        final long entryId;
+        final long txnId;
+        final OperationType operationType;
         final long requestAt;
-        final byte operationType;
 
-        CompletionKey(long ledgerId, long entryId, byte opType) {
-            this.ledgerId = ledgerId;
-            this.entryId = entryId;
+        CompletionKey(long txnId, OperationType operationType) {
+            this.txnId = txnId;
+            this.operationType = operationType;
             this.requestAt = MathUtils.nowInNano();
-            this.operationType = opType;
         }
 
         @Override
@@ -843,17 +898,17 @@ public class PerChannelBookieClient exte
                 return false;
             }
             CompletionKey that = (CompletionKey) obj;
-            return this.ledgerId == that.ledgerId && this.entryId == that.entryId;
+            return this.txnId == that.txnId && this.operationType == that.operationType;
         }
 
         @Override
         public int hashCode() {
-            return ((int) ledgerId << 16) ^ ((int) entryId);
+            return ((int) txnId);
         }
 
         @Override
         public String toString() {
-            return String.format("LedgerEntry(%d, %d)", ledgerId, entryId);
+            return String.format("TxnId(%d), OperationType(%s)", txnId, operationType);
         }
 
         private long elapsedTime() {
@@ -865,7 +920,7 @@ public class PerChannelBookieClient exte
             if (timeout.isCancelled()) {
                 return;
             }
-            if (BookieProtocol.ADDENTRY == operationType) {
+            if (OperationType.ADD_ENTRY == operationType) {
                 errorOutAddKey(this);
                 addTimeoutOpLogger.registerSuccessfulEvent(elapsedTime());
             } else {
@@ -875,4 +930,43 @@ public class PerChannelBookieClient exte
         }
     }
 
+    /**
+     * Note : Helper functions follow
+     */
+
+    /**
+     * @param status
+     * @return null if the statuscode is unknown.
+     */
+    private Integer statusCodeToExceptionCode(StatusCode status) {
+        Integer rcToRet = null;
+        switch (status) {
+            case EOK:
+                rcToRet = BKException.Code.OK;
+                break;
+            case ENOENTRY:
+                rcToRet = BKException.Code.NoSuchEntryException;
+                break;
+            case ENOLEDGER:
+                rcToRet = BKException.Code.NoSuchLedgerExistsException;
+                break;
+            case EBADVERSION:
+                rcToRet = BKException.Code.ProtocolVersionException;
+                break;
+            case EUA:
+                rcToRet = BKException.Code.UnauthorizedAccessException;
+                break;
+            case EFENCED:
+                rcToRet = BKException.Code.LedgerFencedException;
+                break;
+            default:
+                break;
+        }
+        return rcToRet;
+    }
+
+    private long getTxnId() {
+        return txnIdGenerator.incrementAndGet();
+    }
+
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java Thu Jul 24 22:34:19 2014
@@ -28,6 +28,11 @@ import org.apache.bookkeeper.proto.Bookk
 interface PerChannelBookieClientPool {
 
     /**
+     * intialize the pool. the implementation should not be blocked.
+     */
+    void intialize();
+
+    /**
      * Obtain a channel from channel pool to execute operations.
      *
      * @param callback
@@ -37,12 +42,18 @@ interface PerChannelBookieClientPool {
 
     /**
      * Disconnect the connections in the pool.
+     *
+     * @param wait
+     *          whether need to wait until pool disconnected.
      */
-    void disconnect();
+    void disconnect(boolean wait);
 
     /**
      * Close the pool.
+     *
+     * @param wait
+     *          whether need to wait until pool closed.
      */
-    void close();
+    void close(boolean wait);
 
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java?rev=1613315&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java Thu Jul 24 22:34:19 2014
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.proto.BookieProtocol.Request;
+import org.apache.bookkeeper.util.MathUtils;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ReadEntryProcessor extends PacketProcessorBase {
+    private final static Logger LOG = LoggerFactory.getLogger(ReadEntryProcessor.class);
+
+    public ReadEntryProcessor(Request request, Channel channel, Bookie bookie) {
+        super(request, channel, bookie);
+    }
+
+    @Override
+    protected void processPacket() {
+        assert (request instanceof BookieProtocol.ReadRequest);
+        BookieProtocol.ReadRequest read = (BookieProtocol.ReadRequest) request;
+
+        LOG.debug("Received new read request: {}", request);
+        int errorCode = BookieProtocol.EIO;
+        long startTime = MathUtils.now();
+        ByteBuffer data = null;
+        try {
+            Future<Boolean> fenceResult = null;
+            if (read.isFencingRequest()) {
+                LOG.warn("Ledger " + request.getLedgerId() + " fenced by " + channel.getRemoteAddress());
+
+                if (read.hasMasterKey()) {
+                    fenceResult = bookie.fenceLedger(read.getLedgerId(), read.getMasterKey());
+                } else {
+                    LOG.error("Password not provided, Not safe to fence {}", read.getLedgerId());
+                    BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
+                    throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
+                }
+            }
+            data = bookie.readEntry(request.getLedgerId(), request.getEntryId());
+            LOG.debug("##### Read entry ##### {}", data.remaining());
+            if (null != fenceResult) {
+                // TODO:
+                // currently we don't have readCallback to run in separated read
+                // threads. after BOOKKEEPER-429 is complete, we could improve
+                // following code to make it not wait here
+                //
+                // For now, since we only try to wait after read entry. so writing
+                // to journal and read entry are executed in different thread
+                // it would be fine.
+                try {
+                    Boolean fenced = fenceResult.get(1000, TimeUnit.MILLISECONDS);
+                    if (null == fenced || !fenced) {
+                        // if failed to fence, fail the read request to make it retry.
+                        errorCode = BookieProtocol.EIO;
+                        data = null;
+                    } else {
+                        errorCode = BookieProtocol.EOK;
+                    }
+                } catch (InterruptedException ie) {
+                    LOG.error("Interrupting fence read entry " + read, ie);
+                    errorCode = BookieProtocol.EIO;
+                    data = null;
+                } catch (ExecutionException ee) {
+                    LOG.error("Failed to fence read entry " + read, ee);
+                    errorCode = BookieProtocol.EIO;
+                    data = null;
+                } catch (TimeoutException te) {
+                    LOG.error("Timeout to fence read entry " + read, te);
+                    errorCode = BookieProtocol.EIO;
+                    data = null;
+                }
+            } else {
+                errorCode = BookieProtocol.EOK;
+            }
+        } catch (Bookie.NoLedgerException e) {
+            if (LOG.isTraceEnabled()) {
+                LOG.error("Error reading " + read, e);
+            }
+            errorCode = BookieProtocol.ENOLEDGER;
+        } catch (Bookie.NoEntryException e) {
+            if (LOG.isTraceEnabled()) {
+                LOG.error("Error reading " + read, e);
+            }
+            errorCode = BookieProtocol.ENOENTRY;
+        } catch (IOException e) {
+            if (LOG.isTraceEnabled()) {
+                LOG.error("Error reading " + read, e);
+            }
+            errorCode = BookieProtocol.EIO;
+        } catch (BookieException e) {
+            LOG.error("Unauthorized access to ledger " + read.getLedgerId(), e);
+            errorCode = BookieProtocol.EUA;
+        }
+
+        LOG.trace("Read entry rc = {} for {}",
+                new Object[] { errorCode, read });
+        if (errorCode == BookieProtocol.EOK) {
+            assert data != null;
+
+            channel.write(ResponseBuilder.buildReadResponse(data, read));
+            long elapsedTime = MathUtils.now() - startTime;
+            BKStats.getInstance().getOpStats(BKStats.STATS_READ).updateLatency(elapsedTime);
+        } else {
+            channel.write(ResponseBuilder.buildErrorResponse(errorCode, read));
+            BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
+        }
+    }
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java?rev=1613315&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java Thu Jul 24 22:34:19 2014
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.util.MathUtils;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+class ReadEntryProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
+
+    private final static Logger LOG = LoggerFactory.getLogger(ReadEntryProcessorV3.class);
+
+    public ReadEntryProcessorV3(Request request, Channel channel, Bookie bookie) {
+        super(request, channel, bookie);
+    }
+
+    private ReadResponse getReadResponse() {
+        long startTime = MathUtils.now();
+        ReadRequest readRequest = request.getReadRequest();
+        long ledgerId = readRequest.getLedgerId();
+        long entryId = readRequest.getEntryId();
+
+        ReadResponse.Builder readResponse = ReadResponse.newBuilder()
+                .setLedgerId(ledgerId)
+                .setEntryId(entryId);
+
+        if (!isVersionCompatible()) {
+            readResponse.setStatus(StatusCode.EBADVERSION);
+            BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
+            return readResponse.build();
+        }
+
+        LOG.debug("Received new read request: {}", request);
+        StatusCode status;
+        ByteBuffer entryBody;
+        try {
+            Future<Boolean> fenceResult = null;
+            if (readRequest.hasFlag() && readRequest.getFlag().equals(ReadRequest.Flag.FENCE_LEDGER)) {
+                LOG.warn("Ledger fence request received for ledger: {} from address: {}", ledgerId,
+                         channel.getRemoteAddress());
+
+                if (readRequest.hasMasterKey()) {
+                    byte[] masterKey = readRequest.getMasterKey().toByteArray();
+                    fenceResult = bookie.fenceLedger(ledgerId, masterKey);
+                } else {
+                    LOG.error("Fence ledger request received without master key for ledger:{} from address: {}",
+                              ledgerId, channel.getRemoteAddress());
+                    BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
+                    throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
+                }
+            }
+            entryBody = bookie.readEntry(ledgerId, entryId);
+            if (null != fenceResult) {
+                // TODO:
+                // currently we don't have readCallback to run in separated read
+                // threads. after BOOKKEEPER-429 is complete, we could improve
+                // following code to make it not wait here
+                //
+                // For now, since we only try to wait after read entry. so writing
+                // to journal and read entry are executed in different thread
+                // it would be fine.
+                try {
+                    Boolean fenced = fenceResult.get(1000, TimeUnit.MILLISECONDS);
+                    if (null == fenced || !fenced) {
+                        // if failed to fence, fail the read request to make it retry.
+                        status = StatusCode.EIO;
+                    } else {
+                        status = StatusCode.EOK;
+                        readResponse.setBody(ByteString.copyFrom(entryBody));
+                    }
+                } catch (InterruptedException ie) {
+                    LOG.error("Interrupting fence read entry (lid: {}, eid: {})",
+                              new Object[] { ledgerId, entryId, ie });
+                    status = StatusCode.EIO;
+                } catch (ExecutionException ee) {
+                    LOG.error("Failed to fence read entry (lid: {}, eid: {})",
+                              new Object[] { ledgerId, entryId, ee });
+                    status = StatusCode.EIO;
+                } catch (TimeoutException te) {
+                    LOG.error("Timeout to fence read entry (lid: {}, eid: {})",
+                              new Object[] { ledgerId, entryId, te });
+                    status = StatusCode.EIO;
+                }
+            } else {
+                readResponse.setBody(ByteString.copyFrom(entryBody));
+                status = StatusCode.EOK;
+            }
+        } catch (Bookie.NoLedgerException e) {
+            status = StatusCode.ENOLEDGER;
+            LOG.error("No ledger found while reading entry:{} from ledger: {}", entryId, ledgerId);
+        } catch (Bookie.NoEntryException e) {
+            status = StatusCode.ENOENTRY;
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("No entry found while reading entry:{} from ledger:{}", entryId, ledgerId);
+            }
+        } catch (IOException e) {
+            status = StatusCode.EIO;
+            LOG.error("IOException while reading entry:{} from ledger:{}", entryId, ledgerId);
+        } catch (BookieException e) {
+            LOG.error("Unauthorized access to ledger:{} while reading entry:{} in request from address: {}",
+                    new Object[]{ledgerId, entryId, channel.getRemoteAddress()});
+            status = StatusCode.EUA;
+        }
+
+        if (status == StatusCode.EOK) {
+            long elapsedTime = MathUtils.now() - startTime;
+            BKStats.getInstance().getOpStats(BKStats.STATS_READ).updateLatency(elapsedTime);
+        } else {
+            BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
+        }
+
+        // Finally set status and return. The body would have been updated if
+        // a read went through.
+        readResponse.setStatus(status);
+        return readResponse.build();
+    }
+
+    @Override
+    public void run() {
+        ReadResponse readResponse = getReadResponse();
+        Response.Builder response = Response.newBuilder()
+                .setHeader(getHeader())
+                .setStatus(readResponse.getStatus())
+                .setReadResponse(readResponse);
+        channel.write(response.build());
+    }
+}
+

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java?rev=1613315&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java Thu Jul 24 22:34:19 2014
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.util.MathUtils;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Processes add entry requests
+ */
+class WriteEntryProcessor extends PacketProcessorBase implements WriteCallback {
+
+    private final static Logger LOG = LoggerFactory.getLogger(WriteEntryProcessor.class);
+
+    long startTime;
+
+    public WriteEntryProcessor(Request request, Channel channel, Bookie bookie) {
+        super(request, channel, bookie);
+    }
+
+    @Override
+    protected void processPacket() {
+        assert (request instanceof BookieProtocol.AddRequest);
+        BookieProtocol.AddRequest add = (BookieProtocol.AddRequest) request;
+
+        if (bookie.isReadOnly()) {
+            LOG.warn("BookieServer is running in readonly mode,"
+                    + " so rejecting the request from the client!");
+            channel.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, add));
+            BKStats.getInstance().getOpStats(BKStats.STATS_ADD).incrementFailedOps();
+            return;
+        }
+
+        startTime = MathUtils.now();
+        int rc = BookieProtocol.EOK;
+        try {
+            if (add.isRecoveryAdd()) {
+                bookie.recoveryAddEntry(add.getDataAsByteBuffer(),
+                                        this, channel, add.getMasterKey());
+            } else {
+                bookie.addEntry(add.getDataAsByteBuffer(),
+                                this, channel, add.getMasterKey());
+            }
+        } catch (IOException e) {
+            LOG.error("Error writing " + add, e);
+            rc = BookieProtocol.EIO;
+        } catch (BookieException.LedgerFencedException lfe) {
+            LOG.error("Attempt to write to fenced ledger", lfe);
+            rc = BookieProtocol.EFENCED;
+        } catch (BookieException e) {
+            LOG.error("Unauthorized access to ledger " + add.getLedgerId(), e);
+            rc = BookieProtocol.EUA;
+        }
+        if (rc != BookieProtocol.EOK) {
+            channel.write(ResponseBuilder.buildErrorResponse(rc, add));
+            BKStats.getInstance().getOpStats(BKStats.STATS_ADD).incrementFailedOps();
+        }
+    }
+
+    @Override
+    public void writeComplete(int rc, long ledgerId, long entryId,
+                              BookieSocketAddress addr, Object ctx) {
+        channel.write(ResponseBuilder.buildAddResponse(request));
+
+        // compute the latency
+        if (0 == rc) {
+            // for add operations, we compute latency in writeComplete callbacks.
+            long elapsedTime = MathUtils.now() - startTime;
+            BKStats.getInstance().getOpStats(BKStats.STATS_ADD).updateLatency(elapsedTime);
+        } else {
+            BKStats.getInstance().getOpStats(BKStats.STATS_ADD).incrementFailedOps();
+        }
+    }
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java?rev=1613315&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java Thu Jul 24 22:34:19 2014
@@ -0,0 +1,143 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class WriteEntryProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
+    private final static Logger logger = LoggerFactory.getLogger(WriteEntryProcessorV3.class);
+
+    public WriteEntryProcessorV3(Request request, Channel channel, Bookie bookie) {
+        super(request, channel, bookie);
+    }
+
+    // Returns null if there is no exception thrown
+    private AddResponse getAddResponse() {
+        AddRequest addRequest = request.getAddRequest();
+        long ledgerId = addRequest.getLedgerId();
+        long entryId = addRequest.getEntryId();
+
+        final AddResponse.Builder addResponse = AddResponse.newBuilder()
+                .setLedgerId(ledgerId)
+                .setEntryId(entryId);
+
+        if (!isVersionCompatible()) {
+            addResponse.setStatus(StatusCode.EBADVERSION);
+            return addResponse.build();
+        }
+
+        if (bookie.isReadOnly()) {
+            logger.warn("BookieServer is running as readonly mode, so rejecting the request from the client!");
+            addResponse.setStatus(StatusCode.EREADONLY);
+            return addResponse.build();
+        }
+
+        BookkeeperInternalCallbacks.WriteCallback wcb = new BookkeeperInternalCallbacks.WriteCallback() {
+            @Override
+            public void writeComplete(int rc, long ledgerId, long entryId,
+                                      BookieSocketAddress addr, Object ctx) {
+                Channel conn = (Channel) ctx;
+                StatusCode status;
+                switch (rc) {
+                    case BookieProtocol.EOK:
+                        status = StatusCode.EOK;
+                        break;
+                    case BookieProtocol.EIO:
+                        status = StatusCode.EIO;
+                        break;
+                    default:
+                        status = StatusCode.EUA;
+                        break;
+                }
+                addResponse.setStatus(status);
+                Response.Builder response = Response.newBuilder()
+                        .setHeader(getHeader())
+                        .setStatus(addResponse.getStatus())
+                        .setAddResponse(addResponse);
+                Response resp = response.build();
+                conn.write(resp);
+            }
+        };
+        StatusCode status = null;
+        byte[] masterKey = addRequest.getMasterKey().toByteArray();
+        ByteBuffer entryToAdd = addRequest.getBody().asReadOnlyByteBuffer();
+        try {
+            if (addRequest.hasFlag() && addRequest.getFlag().equals(AddRequest.Flag.RECOVERY_ADD)) {
+                bookie.recoveryAddEntry(entryToAdd, wcb, channel, masterKey);
+            } else {
+                bookie.addEntry(entryToAdd, wcb, channel, masterKey);
+            }
+            status = StatusCode.EOK;
+        } catch (IOException e) {
+            logger.error("Error writing entry:{} to ledger:{}",
+                         new Object[] { entryId, ledgerId, e });
+            status = StatusCode.EIO;
+        } catch (BookieException.LedgerFencedException e) {
+            logger.debug("Ledger fenced while writing entry:{} to ledger:{}",
+                         entryId, ledgerId);
+            status = StatusCode.EFENCED;
+        } catch (BookieException e) {
+            logger.error("Unauthorized access to ledger:{} while writing entry:{}",
+                         ledgerId, entryId);
+            status = StatusCode.EUA;
+        } catch (Throwable t) {
+            logger.error("Unexpected exception while writing {}@{} : ",
+                         new Object[] { entryId, ledgerId, t });
+            // some bad request which cause unexpected exception
+            status = StatusCode.EBADREQ;
+        }
+
+        // If everything is okay, we return null so that the calling function
+        // doesn't return a response back to the caller.
+        if (!status.equals(StatusCode.EOK)) {
+            addResponse.setStatus(status);
+            return addResponse.build();
+        }
+        return null;
+    }
+
+    @Override
+    public void run() {
+        AddResponse addResponse = getAddResponse();
+        if (null != addResponse) {
+            // This means there was an error and we should send this back.
+            Response.Builder response = Response.newBuilder()
+                    .setHeader(getHeader())
+                    .setStatus(addResponse.getStatus())
+                    .setAddResponse(addResponse);
+            Response resp = response.build();
+            channel.write(resp);
+        }
+    }
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto?rev=1613315&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto Thu Jul 24 22:34:19 2014
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ option java_package = "org.apache.bookkeeper.proto";
+ option optimize_for = SPEED;
+
+/**
+ * Protocol Versions.
+ */
+enum ProtocolVersion {
+    VERSION_ONE = 1;
+    VERSION_TWO = 2;
+    VERSION_THREE = 3;
+}
+
+/**
+ * Status codes.
+ */
+enum StatusCode {
+    EOK = 0;
+
+    // Server side Errors 4xx
+    ENOLEDGER = 402;
+    ENOENTRY = 403;
+    EBADREQ = 404;
+
+    // IO/access errors 5xx
+    EIO = 501;
+    EUA = 502;
+    EBADVERSION = 503;
+    EFENCED = 504;
+    EREADONLY = 505;
+}
+
+/**
+ * Supported operations by this protocol.
+ */
+enum OperationType {
+    READ_ENTRY = 1;
+    ADD_ENTRY = 2;
+    // Not supported yet.
+    RANGE_READ_ENTRY = 3;
+    RANGE_ADD_ENTRY = 4;
+}
+
+/**
+ * Packet header for all requests.
+ */
+message BKPacketHeader {
+    required ProtocolVersion version = 1;
+    required OperationType operation = 2;
+    required uint64 txnId = 3;
+}
+
+message Request {
+    required BKPacketHeader header = 1;
+    // Requests
+    optional ReadRequest readRequest = 100;
+    optional AddRequest addRequest = 101;
+}
+
+message ReadRequest {
+    enum Flag {
+        FENCE_LEDGER = 1;
+    }
+    optional Flag flag = 100;
+    required int64 ledgerId = 1;
+    // entryId will be -1 for reading the LAST_ADD_CONFIRMED entry.
+    required int64 entryId = 2;
+    // Used while fencing a ledger.
+    optional bytes masterKey = 3;
+}
+
+message AddRequest {
+    enum Flag {
+        RECOVERY_ADD = 1;
+    }
+    optional Flag flag = 100;
+    required int64 ledgerId = 1;
+    required int64 entryId = 2;
+    required bytes masterKey = 3;
+    required bytes body = 4;
+}
+
+message Response {
+
+    required BKPacketHeader header = 1;
+    // EOK if the underlying request succeeded. Each individual response
+    // has a more meaningful status. EBADREQ if we have an unsupported request.
+    required StatusCode status = 2;
+    // Response
+    optional ReadResponse readResponse = 100;
+    optional AddResponse addResponse = 101;
+
+}
+
+message ReadResponse {
+    required StatusCode status = 1;
+    required int64 ledgerId = 2;
+    required int64 entryId = 3;
+    optional bytes body = 4;
+}
+
+message AddResponse {
+    required StatusCode status = 1;
+    required int64 ledgerId = 2;
+    required int64 entryId = 3;
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml Thu Jul 24 22:34:19 2014
@@ -21,6 +21,10 @@
     <Class name="~org\.apache\.bookkeeper\.proto\.DataFormats.*" />
   </Match>
   <Match>
+    <!-- generated code, we can't be held responsible for findbugs in it //-->
+    <Class name="~org\.apache\.bookkeeper\.proto\.BookkeeperProtocol.*" />
+  </Match>
+  <Match>
     <!-- it is safe to store external bytes reference here. since we are using
          bytes from a slab. //-->
     <Class name="org.apache.bookkeeper.bookie.EntryKeyValue" />

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java Thu Jul 24 22:34:19 2014
@@ -226,7 +226,9 @@ public class AuditorPeriodicCheckTest ex
             for (int j = 0; j < 100; j++) {
                 lh.asyncAddEntry("testdata".getBytes(), new AddCallback() {
                         public void addComplete(int rc2, LedgerHandle lh, long entryId, Object ctx) {
-                            rc.compareAndSet(BKException.Code.OK, rc2);
+                            if (rc.compareAndSet(BKException.Code.OK, rc2)) {
+                                LOG.info("Failed to add entry : {}", BKException.getMessage(rc2));
+                            }
                             completeLatch.countDown();
                         }
                     }, null);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java Thu Jul 24 22:34:19 2014
@@ -240,7 +240,7 @@ public class BookieClientTest {
         synchronized (arc) {
             bc.readEntry(addr, 2, 13, recb, arc);
             arc.wait(1000);
-            assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
+            assertEquals(BKException.Code.NoSuchLedgerExistsException, arc.rc);
         }
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java Thu Jul 24 22:34:19 2014
@@ -259,6 +259,103 @@ public class TestBackwardCompat {
     }
 
     /**
+     * Version 4.2.0 classes
+     */
+    static class Server420 {
+        org.apache.bk_v4_2_0.bookkeeper.conf.ServerConfiguration conf;
+        org.apache.bk_v4_2_0.bookkeeper.proto.BookieServer server = null;
+
+        Server420(File journalDir, File ledgerDir, int port) throws Exception {
+            conf = new org.apache.bk_v4_2_0.bookkeeper.conf.ServerConfiguration();
+            conf.setBookiePort(port);
+            conf.setZkServers(zkUtil.getZooKeeperConnectString());
+            conf.setJournalDirName(journalDir.getPath());
+            conf.setLedgerDirNames(new String[] { ledgerDir.getPath() });
+        }
+
+        void start() throws Exception {
+            server = new org.apache.bk_v4_2_0.bookkeeper.proto.BookieServer(conf);
+            server.start();
+            waitUp(conf.getBookiePort());
+        }
+
+        org.apache.bk_v4_2_0.bookkeeper.conf.ServerConfiguration getConf() {
+            return conf;
+        }
+
+        void stop() throws Exception {
+            if (server != null) {
+                server.shutdown();
+            }
+        }
+    }
+
+    static class Ledger420 {
+        org.apache.bk_v4_2_0.bookkeeper.client.BookKeeper bk;
+        org.apache.bk_v4_2_0.bookkeeper.client.LedgerHandle lh;
+
+        private Ledger420(org.apache.bk_v4_2_0.bookkeeper.client.BookKeeper bk,
+                          org.apache.bk_v4_2_0.bookkeeper.client.LedgerHandle lh) {
+            this.bk = bk;
+            this.lh = lh;
+        }
+
+        static Ledger420 newLedger() throws Exception {
+            org.apache.bk_v4_2_0.bookkeeper.client.BookKeeper newbk
+                = new org.apache.bk_v4_2_0.bookkeeper.client.BookKeeper(zkUtil.getZooKeeperConnectString());
+            org.apache.bk_v4_2_0.bookkeeper.client.LedgerHandle newlh
+                = newbk.createLedger(1, 1,
+                                  org.apache.bk_v4_2_0.bookkeeper.client.BookKeeper.DigestType.CRC32,
+                                  "foobar".getBytes());
+            return new Ledger420(newbk, newlh);
+        }
+
+        static Ledger420 openLedger(long id) throws Exception {
+            org.apache.bk_v4_2_0.bookkeeper.client.BookKeeper newbk
+                = new org.apache.bk_v4_2_0.bookkeeper.client.BookKeeper(zkUtil.getZooKeeperConnectString());
+            org.apache.bk_v4_2_0.bookkeeper.client.LedgerHandle newlh
+                = newbk.openLedger(id,
+                                org.apache.bk_v4_2_0.bookkeeper.client.BookKeeper.DigestType.CRC32,
+                                "foobar".getBytes());
+            return new Ledger420(newbk, newlh);
+        }
+
+        long getId() {
+            return lh.getId();
+        }
+
+        void write100() throws Exception {
+            for (int i = 0; i < 100; i++) {
+                lh.addEntry(ENTRY_DATA);
+            }
+        }
+
+        long readAll() throws Exception {
+            long count = 0;
+            Enumeration<org.apache.bk_v4_2_0.bookkeeper.client.LedgerEntry> entries
+                = lh.readEntries(0, lh.getLastAddConfirmed());
+            while (entries.hasMoreElements()) {
+                assertTrue("entry data doesn't match",
+                           Arrays.equals(entries.nextElement().getEntry(), ENTRY_DATA));
+                count++;
+            }
+            return count;
+        }
+
+        void close() throws Exception {
+            try {
+                if (lh != null) {
+                    lh.close();
+                }
+            } finally {
+                if (bk != null) {
+                    bk.close();
+                }
+            }
+        }
+    }
+
+    /**
      * Current verion classes
      */
     static class ServerCurrent {
@@ -527,7 +624,12 @@ public class TestBackwardCompat {
 
         // Check that current client can to write to old server
         LedgerCurrent lcur = LedgerCurrent.newLedger();
-        lcur.write100();
+        try {
+            lcur.write100();
+            fail("Shouldn't be able to write");
+        } catch (Exception e) {
+            // correct behaviour
+        }
         lcur.close();
 
         s410.stop();
@@ -619,12 +721,12 @@ public class TestBackwardCompat {
     }
 
     /**
-     * Test compatability between version 4.1.0 and the current version. - 4.1.0
-     * server restarts with useHostNameAsBookieID=true. Read ledgers with old
-     * and new clients
+     * Test compatability between old versions and the current version.
+     * - old server restarts with useHostNameAsBookieID=true.
+     * - Read ledgers with old and new clients
      */
     @Test(timeout = 60000)
-    public void testCompat410ReadLedgerOnRestartedServer() throws Exception {
+    public void testCompatReads() throws Exception {
         File journalDir = File.createTempFile("bookie", "journal");
         journalDir.delete();
         journalDir.mkdir();
@@ -642,10 +744,11 @@ public class TestBackwardCompat {
         long oldLedgerId = l410.getId();
         l410.close();
 
-        // Check that current client can to write to old server
-        LedgerCurrent lcur = LedgerCurrent.newLedger();
-        lcur.write100();
-        lcur.close();
+        // Check that 420 client can to write to 410 server
+        Ledger420 l420 = Ledger420.newLedger();
+        l420.write100();
+        long lid420 = l420.getId();
+        l420.close();
 
         s410.stop();
 
@@ -659,19 +762,25 @@ public class TestBackwardCompat {
         assertEquals(100, l410.readAll());
         l410.close();
 
+        // Check that 420 client can read old ledgers on new server
+        l420 = Ledger420.openLedger(lid420);
+        assertEquals("Failed to read entries!", 100, l420.readAll());
+        l420.close();
+
         // Check that current client can read old ledgers on new server
-        final LedgerCurrent curledger = LedgerCurrent.openLedger(lcur.getId());
+        final LedgerCurrent curledger = LedgerCurrent.openLedger(lid420);
         assertEquals("Failed to read entries!", 100, curledger.readAll());
         curledger.close();
     }
 
     /**
-     * Test compatability between version 4.1.0 and the current version. - 4.1.0
-     * server restarts with useHostNameAsBookieID=true. Write ledgers with old
-     * and new clients
+     * Test compatability between version old version and the current version.
+     * - 4.1.0 server restarts with useHostNameAsBookieID=true.
+     * - Write ledgers with old and new clients
+     * - Read ledgers written by old clients.
      */
     @Test(timeout = 60000)
-    public void testCompat410WriteLedgerOnRestartedServer() throws Exception {
+    public void testCompatWrites() throws Exception {
         File journalDir = File.createTempFile("bookie", "journal");
         journalDir.delete();
         journalDir.mkdir();
@@ -697,15 +806,26 @@ public class TestBackwardCompat {
         final LedgerCurrent curledger = LedgerCurrent.openLedger(lcur.getId());
         assertEquals("Failed to read entries!", 100, curledger.readAll());
 
-        // Check that current client can write to server
+        // Check that 410 client can write to server
         Ledger410 l410 = Ledger410.newLedger();
         l410.write100();
         long oldLedgerId = l410.getId();
         l410.close();
 
+        // Check that 420 client can write to server
+        Ledger410 l420 = Ledger410.newLedger();
+        l420.write100();
+        long lid420 = l420.getId();
+        l420.close();
+
         // check that new client can read old ledgers on new server
         LedgerCurrent oldledger = LedgerCurrent.openLedger(oldLedgerId);
         assertEquals("Failed to read entries!", 100, oldledger.readAll());
         oldledger.close();
+
+        // check that new client can read old ledgers on new server
+        oldledger = LedgerCurrent.openLedger(lid420);
+        assertEquals("Failed to read entries!", 100, oldledger.readAll());
+        oldledger.close();
     }
 }

Modified: zookeeper/bookkeeper/trunk/compat-deps/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/compat-deps/pom.xml?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/compat-deps/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/compat-deps/pom.xml Thu Jul 24 22:34:19 2014
@@ -33,8 +33,10 @@
   <modules>
     <module>bookkeeper-server-compat-4.0.0</module>
     <module>bookkeeper-server-compat-4.1.0</module>
+    <module>bookkeeper-server-compat-4.2.0</module>
     <module>hedwig-server-compat-4.0.0</module>
     <module>hedwig-server-compat-4.1.0</module>
+    <module>hedwig-server-compat-4.2.0</module>
   </modules>
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

Modified: zookeeper/bookkeeper/trunk/hedwig-server/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/pom.xml?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/pom.xml Thu Jul 24 22:34:19 2014
@@ -125,6 +125,30 @@
     </dependency>
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>hedwig-server-compat420</artifactId>
+      <version>4.2.0</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.bookkeeper</groupId>
+          <artifactId>bookkeeper-server</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.bookkeeper</groupId>
+          <artifactId>hedwig-server</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.bookkeeper</groupId>
+          <artifactId>hedwig-protocol</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.bookkeeper</groupId>
+          <artifactId>hedwig-client</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
       <artifactId>hedwig-server-compat410</artifactId>
       <version>4.1.0</version>
       <scope>test</scope>