You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/01/31 03:02:00 UTC

[1/3] bookkeeper git commit: BOOKKEEPER-874: Explict LAC from Writer to Bookies

Repository: bookkeeper
Updated Branches:
  refs/heads/master 42e8f1294 -> c813b3d32


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index a4fb761..3fb73e4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.auth.ClientAuthProvider;
+import com.google.protobuf.ByteString;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperClientStats;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -39,13 +40,22 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -124,7 +134,11 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     private final OpStatsLogger readEntryOpLogger;
     private final OpStatsLogger readTimeoutOpLogger;
     private final OpStatsLogger addEntryOpLogger;
+    private final OpStatsLogger writeLacOpLogger;
+    private final OpStatsLogger readLacOpLogger;
     private final OpStatsLogger addTimeoutOpLogger;
+    private final OpStatsLogger writeLacTimeoutOpLogger;
+    private final OpStatsLogger readLacTimeoutOpLogger;
 
     /**
      * The following member variables do not need to be concurrent, or volatile
@@ -192,8 +206,12 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
 
         readEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_OP);
         addEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_ADD_OP);
+        writeLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_WRITE_LAC_OP);
+        readLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_LAC_OP);
         readTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ);
         addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD);
+        writeLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_WRITE_LAC);
+        readLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ_LAC);
 
         this.pcbcPool = pcbcPool;
 
@@ -238,6 +256,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     }
 
     private void completeOperation(GenericCallback<PerChannelBookieClient> op, int rc) {
+        //Thread.dumpStack();
         closeLock.readLock().lock();
         try {
             if (ConnectionState.CLOSED == state) {
@@ -365,6 +384,60 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
 
     }
 
+    void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ChannelBuffer toSend, WriteLacCallback cb, Object ctx) {
+        final long txnId = getTxnId();
+        final int entrySize = toSend.readableBytes();
+        final CompletionKey completionKey = new CompletionKey(txnId, OperationType.WRITE_LAC);
+        // writeLac is mostly like addEntry hence uses addEntryTimeout
+        completionObjects.put(completionKey,
+                new WriteLacCompletion(writeLacOpLogger, cb, ctx, lac, scheduleTimeout(completionKey, addEntryTimeout)));
+
+        // Build the request
+        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.WRITE_LAC)
+                .setTxnId(txnId);
+        WriteLacRequest.Builder writeLacBuilder = WriteLacRequest.newBuilder()
+                .setLedgerId(ledgerId)
+                .setLac(lac)
+                .setMasterKey(ByteString.copyFrom(masterKey))
+                .setBody(ByteString.copyFrom(toSend.toByteBuffer()));
+
+        final Request writeLacRequest = Request.newBuilder()
+                .setHeader(headerBuilder)
+                .setWriteLacRequest(writeLacBuilder)
+                .build();
+
+        final Channel c = channel;
+        if (c == null) {
+            errorOutWriteLacKey(completionKey);
+            return;
+        }
+        try {
+            ChannelFuture future = c.write(writeLacRequest);
+            future.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (future.isSuccess()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Successfully wrote request for writeLac LedgerId: {} bookie: {}",
+                                    ledgerId, c.getRemoteAddress());
+                        }
+                    } else {
+                        if (!(future.getCause() instanceof ClosedChannelException)) {
+                            LOG.warn("Writing Lac(lid={} to channel {} failed : ",
+                                    new Object[] { ledgerId, c, future.getCause() });
+                        }
+                        errorOutWriteLacKey(completionKey);
+                    }
+                }
+            });
+        } catch (Throwable e) {
+            LOG.warn("writeLac operation failed", e);
+            errorOutWriteLacKey(completionKey);
+        }
+    }
+
     /**
      * This method should be called only after connection has been checked for
      * {@link #connectIfNeededAndDoOp(GenericCallback)}
@@ -502,6 +575,52 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         }
     }
 
+    public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
+        final long txnId = getTxnId();
+        final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_LAC);
+        completionObjects.put(completionKey,
+                new ReadLacCompletion(readLacOpLogger, cb, ctx, ledgerId,
+                        scheduleTimeout(completionKey, readEntryTimeout)));
+        // Build the request and calculate the total size to be included in the packet.
+        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.READ_LAC)
+                .setTxnId(txnId);
+        ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder()
+                .setLedgerId(ledgerId);
+        final Request readLacRequest = Request.newBuilder()
+                .setHeader(headerBuilder)
+                .setReadLacRequest(readLacBuilder)
+                .build();
+        final Channel c = channel;
+        if (c == null) {
+            errorOutReadLacKey(completionKey);
+            return;
+        }
+
+        try {
+            ChannelFuture future = c.write(readLacRequest);
+            future.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (future.isSuccess()) {
+                        LOG.debug("Succssfully wrote request {} to {}",
+                                    readLacRequest, c.getRemoteAddress());
+                    } else {
+                        if (!(future.getCause() instanceof ClosedChannelException)) {
+                            LOG.warn("Writing readLac(lid = {}) to channel {} failed : ",
+                                    new Object[] { ledgerId, c, future.getCause() });
+                        }
+                        errorOutReadLacKey(completionKey);
+                    }
+                }
+            });
+        } catch(Throwable e) {
+            LOG.warn("Read LAC operation {} failed", readLacRequest, e);
+            errorOutReadLacKey(completionKey);
+        }
+    }
+
     public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) {
         final long txnId = getTxnId();
         final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
@@ -649,6 +768,54 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         });
     }
 
+    void errorOutWriteLacKey(final CompletionKey key) {
+        errorOutWriteLacKey(key, BKException.Code.BookieHandleNotAvailableException);
+    }
+
+    void errorOutWriteLacKey(final CompletionKey key, final int rc) {
+        final WriteLacCompletion writeLacCompletion = (WriteLacCompletion)completionObjects.remove(key);
+        if (null == writeLacCompletion) {
+            return;
+        }
+        executor.submitOrdered(writeLacCompletion.ledgerId, new SafeRunnable() {
+            @Override
+            public void safeRun() {
+                String bAddress = "null";
+                Channel c = channel;
+                if (c != null) {
+                    bAddress = c.getRemoteAddress().toString();
+                }
+                LOG.debug("Could not write request writeLac for ledgerId: {} bookie: {}",
+                          new Object[] { writeLacCompletion.ledgerId, bAddress});
+                writeLacCompletion.cb.writeLacComplete(rc, writeLacCompletion.ledgerId, addr, writeLacCompletion.ctx);
+            }
+        });
+    }
+
+    void errorOutReadLacKey(final CompletionKey key) {
+        errorOutReadLacKey(key, BKException.Code.BookieHandleNotAvailableException);
+    }
+
+    void errorOutReadLacKey(final CompletionKey key, final int rc) {
+        final ReadLacCompletion readLacCompletion = (ReadLacCompletion)completionObjects.remove(key);
+        if (null == readLacCompletion) {
+            return;
+        }
+        executor.submitOrdered(readLacCompletion.ledgerId, new SafeRunnable() {
+            @Override
+            public void safeRun() {
+                String bAddress = "null";
+                Channel c = channel;
+                if (c != null) {
+                    bAddress = c.getRemoteAddress().toString();
+                }
+                LOG.debug("Could not write request readLac for ledgerId: {} bookie: {}",
+                          new Object[] { readLacCompletion.ledgerId, bAddress});
+                readLacCompletion.cb.readLacComplete(rc, readLacCompletion.ledgerId, null, null, readLacCompletion.ctx);
+            }
+        });
+    }
+
     void errorOutAddKey(final CompletionKey key) {
         errorOutAddKey(key, BKException.Code.BookieHandleNotAvailableException);
     }
@@ -836,6 +1003,12 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
                         case READ_ENTRY:
                             handleReadResponse(response, completionValue);
                             break;
+                        case WRITE_LAC:
+                            handleWriteLacResponse(response.getWriteLacResponse(), completionValue);
+                            break;
+                        case READ_LAC:
+                            handleReadLacResponse(response.getReadLacResponse(), completionValue);
+                            break;
                         default:
                             LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring",
                                       type, addr);
@@ -853,7 +1026,26 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         }
     }
 
-    void handleAddResponse(Response response, CompletionValue completionValue) {
+    void handleWriteLacResponse(WriteLacResponse writeLacResponse, CompletionValue completionValue) {
+        // The completion value should always be an instance of an WriteLacCompletion object when we reach here.
+        WriteLacCompletion plc = (WriteLacCompletion)completionValue;
+
+        long ledgerId = writeLacResponse.getLedgerId();
+        StatusCode status = writeLacResponse.getStatus();
+
+        LOG.debug("Got response for writeLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status);
+
+        // convert to BKException code
+        Integer rcToRet = statusCodeToExceptionCode(status);
+        if (null == rcToRet) {
+            LOG.error("writeLac for ledger: " + ledgerId + " failed on bookie: " + addr
+                        + " with code:" + status);
+            rcToRet = BKException.Code.WriteException;
+        }
+        plc.cb.writeLacComplete(rcToRet, ledgerId, addr, plc.ctx);
+    }
+
+ void handleAddResponse(Response response, CompletionValue completionValue) {
         // The completion value should always be an instance of an AddCompletion object when we reach here.
         AddCompletion ac = (AddCompletion)completionValue;
         AddResponse addResponse = response.getAddResponse();
@@ -866,7 +1058,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
             LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
                     + entryId + " rc: " + status);
         }
-        // convert to BKException code because thats what the uppper
+        // convert to BKException code because thats what the upper
         // layers expect. This is UGLY, there should just be one set of
         // error codes.
         Integer rcToRet = statusCodeToExceptionCode(status);
@@ -880,6 +1072,36 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx);
     }
 
+    void handleReadLacResponse(ReadLacResponse readLacResponse, CompletionValue completionValue) {
+        // The completion value should always be an instance of an WriteLacCompletion object when we reach here.
+        ReadLacCompletion glac = (ReadLacCompletion)completionValue;
+
+        long ledgerId = readLacResponse.getLedgerId();
+        StatusCode status = readLacResponse.getStatus();
+        ChannelBuffer lacBuffer = ChannelBuffers.buffer(0);
+        ChannelBuffer lastEntryBuffer = ChannelBuffers.buffer(0);
+
+       // Thread.dumpStack();
+
+        if (readLacResponse.hasLacBody()) {
+            lacBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
+        }
+
+        if (readLacResponse.hasLastEntryBody()) {
+            lastEntryBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
+        }
+
+        LOG.debug("Got response for readLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status);
+        // convert to BKException code
+        Integer rcToRet = statusCodeToExceptionCode(status);
+        if (null == rcToRet) {
+            LOG.debug("readLac for ledger: " + ledgerId + " failed on bookie: " + addr
+                      + " with code:" + status);
+            rcToRet = BKException.Code.ReadException;
+        }
+        glac.cb.readLacComplete(rcToRet, ledgerId, lacBuffer.slice(), lastEntryBuffer.slice(), glac.ctx);
+    }
+
     void handleReadResponse(Response response, CompletionValue completionValue) {
         // The completion value should always be an instance of a ReadCompletion object when we reach here.
         ReadCompletion rc = (ReadCompletion)completionValue;
@@ -940,6 +1162,63 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     }
 
     // visible for testing
+    static class WriteLacCompletion extends CompletionValue {
+        final WriteLacCallback cb;
+
+        public WriteLacCompletion(WriteLacCallback cb, Object ctx, long ledgerId) {
+            this(null, cb, ctx, ledgerId, null);
+        }
+
+        public WriteLacCompletion(final OpStatsLogger writeLacOpLogger, final WriteLacCallback originalCallback,
+                final Object originalCtx, final long ledgerId, final Timeout timeout) {
+            super(originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, timeout);
+            final long startTime = MathUtils.nowInNano();
+            this.cb = null == writeLacOpLogger ? originalCallback : new WriteLacCallback() {
+                @Override
+                public void writeLacComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx) {
+                    cancelTimeout();
+                    long latency = MathUtils.elapsedNanos(startTime);
+                    if (rc != BKException.Code.OK) {
+                        writeLacOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
+                    } else {
+                        writeLacOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
+                    }
+                    originalCallback.writeLacComplete(rc, ledgerId, addr, originalCtx);
+                }
+            };
+
+        }
+    }
+
+    // visible for testing
+    static class ReadLacCompletion extends CompletionValue {
+        final ReadLacCallback cb;
+
+        public ReadLacCompletion(ReadLacCallback cb, Object ctx, long ledgerId) {
+            this (null, cb, ctx, ledgerId, null);
+        }
+
+        public ReadLacCompletion(final OpStatsLogger readLacOpLogger, final ReadLacCallback originalCallback,
+                final Object ctx, final long ledgerId, final Timeout timeout) {
+            super(ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, timeout);
+            final long startTime = MathUtils.nowInNano();
+            this.cb = null == readLacOpLogger ? originalCallback : new ReadLacCallback() {
+                @Override
+                public void readLacComplete(int rc, long ledgerId, ChannelBuffer lacBuffer, ChannelBuffer lastEntryBuffer, Object ctx) {
+                    cancelTimeout();
+                    long latency = MathUtils.elapsedNanos(startTime);
+                    if (rc != BKException.Code.OK) {
+                        readLacOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
+                    } else {
+                        readLacOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
+                    }
+                    originalCallback.readLacComplete(rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
+                }
+            };
+        }
+    }
+
+    // visible for testing
     static class ReadCompletion extends CompletionValue {
         final ReadEntryCallback cb;
 
@@ -1070,11 +1349,17 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
             if (OperationType.ADD_ENTRY == operationType) {
                 errorOutAddKey(this, BKException.Code.TimeoutException);
                 addTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
-            } else {
+            } else if (OperationType.READ_ENTRY == operationType) {
                 errorOutReadKey(this, BKException.Code.TimeoutException);
                 readTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
+            } else if (OperationType.WRITE_LAC == operationType) {
+                errorOutWriteLacKey(this, BKException.Code.TimeoutException);
+                writeLacTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
+            } else {
+                errorOutReadLacKey(this, BKException.Code.TimeoutException);
+                readLacTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
             }
-        }
+	}
     }
 
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
new file mode 100644
index 0000000..e9a4c13
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.util.MathUtils;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+class ReadLacProcessorV3 extends PacketProcessorBaseV3 {
+    private final static Logger logger = LoggerFactory.getLogger(ReadLacProcessorV3.class);
+
+    public ReadLacProcessorV3(Request request, Channel channel,
+                             BookieRequestProcessor requestProcessor) {
+        super(request, channel, requestProcessor);
+    }
+
+    // Returns null if there is no exception thrown
+    private ReadLacResponse getReadLacResponse() {
+        final long startTimeNanos = MathUtils.nowInNano();
+        ReadLacRequest readLacRequest = request.getReadLacRequest();
+        long ledgerId = readLacRequest.getLedgerId();
+
+        final ReadLacResponse.Builder readLacResponse = ReadLacResponse.newBuilder().setLedgerId(ledgerId);
+
+        if (!isVersionCompatible()) {
+            readLacResponse.setStatus(StatusCode.EBADVERSION);
+            return readLacResponse.build();
+        }
+
+        logger.debug("Received ReadLac request: {}", request);
+        StatusCode status = StatusCode.EOK;
+        ByteBuffer lastEntry;
+        ByteBuffer lac;
+        try {
+            lastEntry = requestProcessor.bookie.readEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
+            lac = requestProcessor.bookie.getExplicitLac(ledgerId);
+            if (lac != null) {
+                readLacResponse.setLacBody(ByteString.copyFrom(lac));
+                readLacResponse.setLastEntryBody(ByteString.copyFrom(lastEntry));
+            } else {
+                status = StatusCode.ENOENTRY;
+            }
+        } catch (Bookie.NoLedgerException e) {
+            status = StatusCode.ENOLEDGER;
+            logger.error("No ledger found while performing readLac from ledger: {}", ledgerId);
+        } catch (IOException e) {
+            status = StatusCode.EIO;
+            logger.error("IOException while performing readLac from ledger: {}", ledgerId);
+        }
+        if (status == StatusCode.EOK) {
+            requestProcessor.readLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
+                    TimeUnit.NANOSECONDS);
+        } else {
+            requestProcessor.readLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+                    TimeUnit.NANOSECONDS);
+        }
+        // Finally set the status and return
+        readLacResponse.setStatus(status);
+        return readLacResponse.build();
+    }
+
+    @Override
+    public void safeRun() {
+        ReadLacResponse readLacResponse = getReadLacResponse();
+        sendResponse(readLacResponse);
+    }
+
+    private void sendResponse(ReadLacResponse readLacResponse) {
+        Response.Builder response = Response.newBuilder()
+            .setHeader(getHeader())
+            .setStatus(readLacResponse.getStatus())
+            .setReadLacResponse(readLacResponse);
+        sendResponse(response.getStatus(),
+                response.build(),
+                requestProcessor.readRequestStats);
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
new file mode 100644
index 0000000..104f561
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.util.MathUtils;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class WriteLacProcessorV3 extends PacketProcessorBaseV3 {
+    private final static Logger logger = LoggerFactory.getLogger(WriteLacProcessorV3.class);
+
+    public WriteLacProcessorV3(Request request, Channel channel,
+                             BookieRequestProcessor requestProcessor) {
+        super(request, channel, requestProcessor);
+    }
+
+    // Returns null if there is no exception thrown
+    private WriteLacResponse getWriteLacResponse() {
+        final long startTimeNanos = MathUtils.nowInNano();
+        WriteLacRequest writeLacRequest = request.getWriteLacRequest();
+        long lac = writeLacRequest.getLac();
+        long ledgerId = writeLacRequest.getLedgerId();
+
+        final WriteLacResponse.Builder writeLacResponse = WriteLacResponse.newBuilder().setLedgerId(ledgerId);
+
+        if (!isVersionCompatible()) {
+            writeLacResponse.setStatus(StatusCode.EBADVERSION);
+            return writeLacResponse.build();
+        }
+
+        if (requestProcessor.bookie.isReadOnly()) {
+            logger.warn("BookieServer is running as readonly mode, so rejecting the request from the client!");
+            writeLacResponse.setStatus(StatusCode.EREADONLY);
+            return writeLacResponse.build();
+        }
+
+        StatusCode status = null;
+        ByteBuffer lacToAdd = writeLacRequest.getBody().asReadOnlyByteBuffer();
+        byte[] masterKey = writeLacRequest.getMasterKey().toByteArray();
+
+        try {
+            requestProcessor.bookie.setExplicitLac(lacToAdd, channel, masterKey);
+            status = StatusCode.EOK;
+        } catch (IOException e) {
+            logger.error("Error saving lac for ledger:{}",
+                          new Object[] { lac, ledgerId, e });
+            status = StatusCode.EIO;
+        } catch (BookieException e) {
+            logger.error("Unauthorized access to ledger:{} while adding lac:{}",
+                                                  ledgerId, lac);
+            status = StatusCode.EUA;
+        } catch (Throwable t) {
+            logger.error("Unexpected exception while writing {}@{} : ",
+                    new Object[] { lac, t });
+            // some bad request which cause unexpected exception
+            status = StatusCode.EBADREQ;
+        }
+
+        // If everything is okay, we return null so that the calling function
+        // dosn't return a response back to the caller.
+        if (status.equals(StatusCode.EOK)) {
+            requestProcessor.writeLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+        } else {
+            requestProcessor.writeLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+        }
+        writeLacResponse.setStatus(status);
+        return writeLacResponse.build();
+    }
+
+    @Override
+    public void safeRun() {
+        WriteLacResponse writeLacResponse = getWriteLacResponse();
+        if (null != writeLacResponse) {
+            Response.Builder response = Response.newBuilder()
+                    .setHeader(getHeader())
+                    .setStatus(writeLacResponse.getStatus())
+                    .setWriteLacResponse(writeLacResponse);
+            Response resp = response.build();
+            sendResponse(writeLacResponse.getStatus(), resp, requestProcessor.writeLacStats);
+        }
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
index f1d0e9f..64e524d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
@@ -21,12 +21,15 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Random;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -58,9 +61,8 @@ import org.slf4j.LoggerFactory;
 public class OrderedSafeExecutor {
     final static long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
     final String name;
-    final ThreadPoolExecutor threads[];
+    final ScheduledThreadPoolExecutor threads[];
     final long threadIds[];
-    final BlockingQueue<Runnable> queues[];
     final Random rand = new Random();
     final OpStatsLogger taskExecutionStats;
     final OpStatsLogger taskPendingStats;
@@ -173,17 +175,15 @@ public class OrderedSafeExecutor {
 
         this.warnTimeMicroSec = warnTimeMicroSec;
         name = baseName;
-        threads = new ThreadPoolExecutor[numThreads];
+        threads = new ScheduledThreadPoolExecutor[numThreads];
         threadIds = new long[numThreads];
-        queues = new BlockingQueue[numThreads];
         for (int i = 0; i < numThreads; i++) {
-            queues[i] = new LinkedBlockingQueue<Runnable>();
-            threads[i] =  new ThreadPoolExecutor(1, 1,
-                    0L, TimeUnit.MILLISECONDS, queues[i],
+            threads[i] =  new ScheduledThreadPoolExecutor(1,
                     new ThreadFactoryBuilder()
                         .setNameFormat(name + "-orderedsafeexecutor-" + i + "-%d")
                         .setThreadFactory(threadFactory)
                         .build());
+            threads[i].setMaximumPoolSize(1);
 
             // Save thread ids
             final int idx = i;
@@ -209,7 +209,7 @@ public class OrderedSafeExecutor {
 
                 @Override
                 public Number getSample() {
-                    return queues[idx].size();
+                    return threads[idx].getQueue().size();
                 }
             });
             statsLogger.registerGauge(String.format("%s-completed-tasks-%d", name, idx), new Gauge<Number>() {
@@ -242,7 +242,7 @@ public class OrderedSafeExecutor {
         this.traceTaskExecution = traceTaskExecution;
     }
 
-    ExecutorService chooseThread() {
+    ScheduledExecutorService chooseThread() {
         // skip random # generation in this special case
         if (threads.length == 1) {
             return threads[0];
@@ -252,7 +252,7 @@ public class OrderedSafeExecutor {
 
     }
 
-    ExecutorService chooseThread(Object orderingKey) {
+    ScheduledExecutorService chooseThread(Object orderingKey) {
         // skip hashcode generation in this special case
         if (threads.length == 1) {
             return threads[0];
@@ -286,6 +286,104 @@ public class OrderedSafeExecutor {
         chooseThread(orderingKey).submit(timedRunnable(r));
     }
 
+    /**
+     * Creates and executes a one-shot action that becomes enabled after the given delay.
+     * 
+     * @param command - the SafeRunnable to execute
+     * @param delay - the time from now to delay execution
+     * @param unit - the time unit of the delay parameter
+     * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion
+     */
+    public ScheduledFuture<?> schedule(SafeRunnable command, long delay, TimeUnit unit) {
+        return chooseThread().schedule(command, delay, unit);
+    }
+
+    /**
+     * Creates and executes a one-shot action that becomes enabled after the given delay.
+     * 
+     * @param orderingKey - the key used for ordering 
+     * @param command - the SafeRunnable to execute
+     * @param delay - the time from now to delay execution
+     * @param unit - the time unit of the delay parameter
+     * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion
+     */
+    public ScheduledFuture<?> scheduleOrdered(Object orderingKey, SafeRunnable command, long delay, TimeUnit unit) {
+        return chooseThread(orderingKey).schedule(command, delay, unit);
+    }
+
+    /** 
+     * Creates and executes a periodic action that becomes enabled first after
+     * the given initial delay, and subsequently with the given period; 
+     * 
+     * For more details check scheduleAtFixedRate in interface ScheduledExecutorService
+     * 
+     * @param command - the SafeRunnable to execute
+     * @param initialDelay - the time to delay first execution
+     * @param period - the period between successive executions
+     * @param unit - the time unit of the initialDelay and period parameters
+     * @return a ScheduledFuture representing pending completion of the task, and whose get() 
+     * method will throw an exception upon cancellation
+     */
+    public ScheduledFuture<?> scheduleAtFixedRate(SafeRunnable command, long initialDelay, long period, TimeUnit unit) {
+        return chooseThread().scheduleAtFixedRate(command, initialDelay, period, unit);
+    }
+
+    /** 
+     * Creates and executes a periodic action that becomes enabled first after
+     * the given initial delay, and subsequently with the given period; 
+     * 
+     * For more details check scheduleAtFixedRate in interface ScheduledExecutorService
+     * 
+     * @param orderingKey - the key used for ordering
+     * @param command - the SafeRunnable to execute
+     * @param initialDelay - the time to delay first execution
+     * @param period - the period between successive executions
+     * @param unit - the time unit of the initialDelay and period parameters
+     * @return a ScheduledFuture representing pending completion of the task, and whose get() method 
+     * will throw an exception upon cancellation
+     */
+    public ScheduledFuture<?> scheduleAtFixedRateOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
+            long period, TimeUnit unit) {
+        return chooseThread(orderingKey).scheduleAtFixedRate(command, initialDelay, period, unit);
+    }
+
+    /**
+     * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently 
+     * with the given delay between the termination of one execution and the commencement of the next.
+     * 
+     * For more details check scheduleWithFixedDelay in interface ScheduledExecutorService
+     * 
+     * @param command - the SafeRunnable to execute
+     * @param initialDelay - the time to delay first execution
+     * @param delay - the delay between the termination of one execution and the commencement of the next
+     * @param unit - the time unit of the initialDelay and delay parameters
+     * @return a ScheduledFuture representing pending completion of the task, and whose get() method 
+     * will throw an exception upon cancellation
+     */
+    public ScheduledFuture<?> scheduleWithFixedDelay(SafeRunnable command, long initialDelay, long delay,
+            TimeUnit unit) {
+        return chooseThread().scheduleWithFixedDelay(command, initialDelay, delay, unit);
+    }
+
+    /**
+     * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently 
+     * with the given delay between the termination of one execution and the commencement of the next.
+     * 
+     * For more details check scheduleWithFixedDelay in interface ScheduledExecutorService
+     * 
+     * @param orderingKey - the key used for ordering
+     * @param command - the SafeRunnable to execute
+     * @param initialDelay - the time to delay first execution
+     * @param delay - the delay between the termination of one execution and the commencement of the next
+     * @param unit - the time unit of the initialDelay and delay parameters
+     * @return a ScheduledFuture representing pending completion of the task, and whose get() method 
+     * will throw an exception upon cancellation
+     */
+    public ScheduledFuture<?> scheduleWithFixedDelayOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
+            long delay, TimeUnit unit) {
+        return chooseThread(orderingKey).scheduleWithFixedDelay(command, initialDelay, delay, unit);
+    }
+
     private long getThreadID(Object orderingKey) {
         // skip hashcode generation in this special case
         if (threadIds.length == 1) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
index aabf80b..9ce9baf 100644
--- a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
@@ -57,6 +57,8 @@ enum OperationType {
     RANGE_ADD_ENTRY = 4;
 
     AUTH = 5;
+    WRITE_LAC = 6;
+    READ_LAC = 7;
 }
 
 /**
@@ -74,6 +76,8 @@ message Request {
     optional ReadRequest readRequest = 100;
     optional AddRequest addRequest = 101;
     optional AuthMessage authRequest = 102;
+    optional WriteLacRequest writeLacRequest = 103;
+    optional ReadLacRequest readLacRequest = 104;
 }
 
 message ReadRequest {
@@ -99,6 +103,17 @@ message AddRequest {
     required bytes body = 4;
 }
 
+message WriteLacRequest {
+    required int64 ledgerId = 1;
+    required int64 lac = 2;
+    required bytes masterKey = 3;
+    required bytes body = 4;
+}
+
+message ReadLacRequest {
+    required int64 ledgerId = 1;
+}
+
 message Response {
 
     required BKPacketHeader header = 1;
@@ -109,6 +124,8 @@ message Response {
     optional ReadResponse readResponse = 100;
     optional AddResponse addResponse = 101;
     optional AuthMessage authResponse = 102;
+    optional WriteLacResponse writeLacResponse = 103;
+    optional ReadLacResponse readLacResponse = 104;
 }
 
 message ReadResponse {
@@ -127,4 +144,16 @@ message AddResponse {
 message AuthMessage {
     required string authPluginName = 1;
     required bytes payload = 2;
-}
\ No newline at end of file
+}
+
+message WriteLacResponse {
+    required StatusCode status = 1;
+    required int64 ledgerId = 2;
+}
+
+message ReadLacResponse {
+    required StatusCode status = 1;
+    required int64 ledgerId = 2;
+    optional bytes lacBody = 3; // lac sent by PutLacRequest
+    optional bytes lastEntryBody = 4; // Actual last entry on the disk
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index 988c2a2..1ce30e9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -334,6 +334,15 @@ public class TestSyncThread {
         }
 
         @Override
+        public void setExplicitlac(long ledgerId, ByteBuffer lac) {
+        }
+
+        @Override
+        public ByteBuffer getExplicitLac(long ledgerId) {
+            return null;
+        }
+
+        @Override
         public Checkpoint checkpoint(Checkpoint checkpoint)
                 throws IOException {
             return checkpoint;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index e87fdc0..a2532c9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -309,4 +309,128 @@ public class BookKeeperTest extends BaseTestCase {
         }
         Assert.assertTrue("BookKeeper should be closed!", _bkc.closed);
     }
+
+    @Test(timeout = 60000)
+    public void testReadHandleWithNoExplicitLAC() throws Exception {
+        ClientConfiguration confWithNoExplicitLAC = new ClientConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString());
+        confWithNoExplicitLAC.setExplictLacInterval(0);
+
+        BookKeeper bkcWithNoExplicitLAC = new BookKeeper(confWithNoExplicitLAC);
+
+        LedgerHandle wlh = bkcWithNoExplicitLAC.createLedger(digestType, "testPasswd".getBytes());
+        long ledgerId = wlh.getId();
+        int numOfEntries = 5;
+        for (int i = 0; i < numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        LedgerHandle rlh = bkcWithNoExplicitLAC.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());
+        Assert.assertTrue(
+                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+        Enumeration<LedgerEntry> entries = rlh.readEntries(0, numOfEntries - 2);
+        int entryId = 0;
+        while (entries.hasMoreElements()) {
+            LedgerEntry entry = entries.nextElement();
+            String entryString = new String(entry.getEntry());
+            Assert.assertTrue("Expected entry String: " + ("foobar" + entryId) + " actual entry String: " + entryString,
+                    entryString.equals("foobar" + entryId));
+            entryId++;
+        }
+
+        for (int i = numOfEntries; i < 2 * numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        Thread.sleep(3000);
+        // since explicitlacflush policy is not enabled for writeledgerhandle, when we try
+        // to read explicitlac for rlh, it will be LedgerHandle.INVALID_ENTRY_ID. But it
+        // wont throw some exception.
+        long explicitlac = rlh.readExplicitLastConfirmed();
+        Assert.assertTrue(
+                "Expected Explicit LAC of rlh: " + LedgerHandle.INVALID_ENTRY_ID + " actual ExplicitLAC of rlh: " + explicitlac,
+                (explicitlac == LedgerHandle.INVALID_ENTRY_ID));
+        Assert.assertTrue(
+                "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + wlh.getLastAddConfirmed(),
+                (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
+        Assert.assertTrue(
+                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+        try {
+            rlh.readEntries(numOfEntries - 1, numOfEntries - 1);
+            fail("rlh readEntries beyond " + (numOfEntries - 2) + " should fail with ReadException");
+        } catch (BKException.BKReadException readException) {
+        }
+
+        rlh.close();
+        wlh.close();
+        bkcWithNoExplicitLAC.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testReadHandleWithExplicitLAC() throws Exception {
+        ClientConfiguration confWithExplicitLAC = new ClientConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString());
+        int explictLacInterval = 1;
+        confWithExplicitLAC.setExplictLacInterval(explictLacInterval);
+
+        BookKeeper bkcWithExplicitLAC = new BookKeeper(confWithExplicitLAC);
+
+        LedgerHandle wlh = bkcWithExplicitLAC.createLedger(digestType, "testPasswd".getBytes());
+        long ledgerId = wlh.getId();
+        int numOfEntries = 5;
+        for (int i = 0; i < numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        LedgerHandle rlh = bkcWithExplicitLAC.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());
+
+        Assert.assertTrue(
+                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+        for (int i = numOfEntries; i < 2 * numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        // we need to wait for atleast 2 explicitlacintervals,
+        // since in writehandle for the first call
+        // lh.getExplicitLastAddConfirmed() will be <
+        // lh.getPiggyBackedLastAddConfirmed(),
+        // so it wont make explicit writelac in the first run
+        Thread.sleep((2 * explictLacInterval + 1) * 1000);
+        Assert.assertTrue(
+                "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of wlh: " + wlh.getLastAddConfirmed(),
+                (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
+        // readhandle's lastaddconfirmed wont be updated until readExplicitLastConfirmed call is made   
+        Assert.assertTrue(
+                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+        
+        long explicitlac = rlh.readExplicitLastConfirmed();
+        Assert.assertTrue(
+                "Expected Explicit LAC of rlh: " + (2 * numOfEntries - 1) + " actual ExplicitLAC of rlh: " + explicitlac,
+                (explicitlac == (2 * numOfEntries - 1)));
+        // readExplicitLastConfirmed updates the lac of rlh.
+        Assert.assertTrue(
+                "Expected LAC of rlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
+        
+        Enumeration<LedgerEntry> entries = rlh.readEntries(numOfEntries, 2 * numOfEntries - 1);
+        int entryId = numOfEntries;
+        while (entries.hasMoreElements()) {
+            LedgerEntry entry = entries.nextElement();
+            String entryString = new String(entry.getEntry());
+            Assert.assertTrue("Expected entry String: " + ("foobar" + entryId) + " actual entry String: " + entryString,
+                    entryString.equals("foobar" + entryId));
+            entryId++;
+        }
+
+        rlh.close();
+        wlh.close();
+        bkcWithExplicitLAC.close();
+    }	
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 5387424..4c2ddaa 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -206,5 +206,17 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
         @Override
         public void flushEntriesLocationsIndex() throws IOException {
         }
+
+        @Override
+        public void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException {
+            // TODO Auto-generated method stub
+
+        }
+
+        @Override
+        public ByteBuffer getExplicitLac(long ledgerId) {
+            // TODO Auto-generated method stub
+            return null;
+        }
     }
 }


[3/3] bookkeeper git commit: BOOKKEEPER-874: Explict LAC from Writer to Bookies

Posted by si...@apache.org.
BOOKKEEPER-874: Explict LAC from Writer to Bookies

Introduce a new feature for sending explicit LAC to bookies.
A writable LedgerHandle creates a timer thread to send explicit LACs
at the intervals specified through configuration paramenter,
explicitLacInterval. If this is set to zero, this feature is disabled,
no timer thread is created.

Explicit LAC is sent only if the client did not get a chance to send
LAC through piggyback method for "explicitLacInterval" time.
To implement this, introduced two new protocol messages to the
Bookkeeper protocol -  WRITE_LAC and READ_LAC, in addition to its
current READ_ENTRY and ADD_ENTRY.

Reviewed-by: Charan Reddy Guttapalem <cguttapalemsalesforce.com>
Signed-off-by: Venkateswararao Jujjuri (JV) <vjujjurisalesforce.com>
Co-Author : Charan Reddy Guttapalem <cguttapalemsalesforce.com>

Author: JV <vj...@salesforce.com>

Reviewers: Sijie Guo <si...@apache.org>

Closes #89 from reddycharan/explicitlacsinglecommit


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/c813b3d3
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/c813b3d3
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/c813b3d3

Branch: refs/heads/master
Commit: c813b3d3298586ded02032a57a99d5fc6c974581
Parents: 42e8f12
Author: JV <vj...@salesforce.com>
Authored: Mon Jan 30 19:01:48 2017 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Mon Jan 30 19:01:48 2017 -0800

----------------------------------------------------------------------
 bookkeeper-server/conf/bk_server.conf           |    3 +
 .../bookie/BookKeeperServerStats.java           |    2 +
 .../org/apache/bookkeeper/bookie/Bookie.java    |   26 +
 .../org/apache/bookkeeper/bookie/FileInfo.java  |   33 +
 .../bookkeeper/bookie/IndexPersistenceMgr.java  |   28 +
 .../bookie/InterleavedLedgerStorage.java        |    8 +
 .../apache/bookkeeper/bookie/LedgerCache.java   |    3 +
 .../bookkeeper/bookie/LedgerCacheImpl.java      |    9 +
 .../bookkeeper/bookie/LedgerDescriptor.java     |    5 +
 .../bookkeeper/bookie/LedgerDescriptorImpl.java |    9 +
 .../apache/bookkeeper/bookie/LedgerStorage.java |    4 +
 .../apache/bookkeeper/client/AsyncCallback.java |   14 +
 .../apache/bookkeeper/client/BKException.java   |    2 +-
 .../apache/bookkeeper/client/BookKeeper.java    |   21 +-
 .../client/BookKeeperClientStats.java           |    6 +
 .../apache/bookkeeper/client/DigestManager.java |   55 +
 .../client/ExplicitLacFlushPolicy.java          |  153 +
 .../apache/bookkeeper/client/LedgerHandle.java  |  140 +-
 .../bookkeeper/client/PendingReadLacOp.java     |  145 +
 .../bookkeeper/client/PendingWriteLacOp.java    |  114 +
 .../bookkeeper/client/ReadOnlyLedgerHandle.java |    8 +
 .../bookkeeper/conf/ClientConfiguration.java    |   26 +-
 .../apache/bookkeeper/proto/BookieClient.java   |   71 +
 .../proto/BookieRequestProcessor.java           |   31 +
 .../proto/BookkeeperInternalCallbacks.java      |    8 +
 .../bookkeeper/proto/BookkeeperProtocol.java    | 6115 +++++++++++++-----
 .../proto/PerChannelBookieClient.java           |  293 +-
 .../bookkeeper/proto/ReadLacProcessorV3.java    |  108 +
 .../bookkeeper/proto/WriteLacProcessorV3.java   |  113 +
 .../bookkeeper/util/OrderedSafeExecutor.java    |  118 +-
 .../src/main/proto/BookkeeperProtocol.proto     |   31 +-
 .../bookkeeper/bookie/TestSyncThread.java       |    9 +
 .../bookkeeper/client/BookKeeperTest.java       |  124 +
 .../bookkeeper/meta/LedgerManagerTestCase.java  |   12 +
 34 files changed, 6354 insertions(+), 1493 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/conf/bk_server.conf
----------------------------------------------------------------------
diff --git a/bookkeeper-server/conf/bk_server.conf b/bookkeeper-server/conf/bk_server.conf
index e2a2be6..c7fd2ca 100644
--- a/bookkeeper-server/conf/bk_server.conf
+++ b/bookkeeper-server/conf/bk_server.conf
@@ -91,6 +91,9 @@ ledgerDirectories=/tmp/bk-data
 # If it is set to less than zero, the minor compaction is disabled. 
 # minorCompactionInterval=3600
 
+# Interval between sending an explicit LAC in seconds
+explicitLacInterval = 1
+
 # Threshold of major compaction
 # For those entry log files whose remaining size percentage reaches below
 # this threshold will be compacted in a major compaction.

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 239f923..9f1dbbb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -35,6 +35,8 @@ public interface BookKeeperServerStats {
     public final static String READ_ENTRY_FENCE_REQUEST = "READ_ENTRY_FENCE_REQUEST";
     public final static String READ_ENTRY_FENCE_WAIT = "READ_ENTRY_FENCE_WAIT";
     public final static String READ_ENTRY_FENCE_READ = "READ_ENTRY_FENCE_READ";
+    public final static String WRITE_LAC = "WRITE_LAC";
+    public final static String READ_LAC = "READ_LAC";
 
     // Bookie Operations
     public final static String BOOKIE_ADD_ENTRY_BYTES = "BOOKIE_ADD_ENTRY_BYTES";

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 82db3b0..bbbfa51 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -57,6 +57,7 @@ import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNS;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -1325,6 +1326,30 @@ public class Bookie extends BookieCriticalThread {
         }
     }
 
+    public void setExplicitLac(ByteBuffer entry, Object ctx, byte[] masterKey)
+            throws IOException, BookieException {
+        try {
+            long ledgerId = entry.getLong();
+            LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
+            entry.rewind();
+            synchronized (handle) {
+                handle.setExplicitLac(entry);
+            }
+        } catch (NoWritableLedgerDirException e) {
+            transitionToReadOnlyMode();
+            throw new IOException(e);
+        }
+    }
+
+    public ByteBuffer getExplicitLac(long ledgerId) throws IOException, Bookie.NoLedgerException {
+        ByteBuffer lac;
+        LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
+        synchronized (handle) {
+            lac = handle.getExplicitLac();
+        }
+        return lac;
+    }
+
     /**
      * Add entry to a ledger.
      * @throws BookieException.LedgerFencedException if the ledger is fenced
@@ -1566,4 +1591,5 @@ public class Bookie extends BookieCriticalThread {
     public int getExitCode() {
         return exitCode;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index 38ff0d9..307b46b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -62,6 +62,8 @@ class FileInfo {
 
     private FileChannel fc;
     private File lf;
+    private ByteBuffer explicitLac = null;
+
     byte[] masterKey;
 
     /**
@@ -112,6 +114,37 @@ class FileInfo {
         return sizeSinceLastwrite;
     }
 
+    public ByteBuffer getExplicitLac() {
+        LOG.debug("fileInfo:GetLac: {}", explicitLac);
+        ByteBuffer retLac = null;
+        synchronized(this) {
+            if (explicitLac != null) {
+                retLac = ByteBuffer.allocate(explicitLac.capacity());
+                explicitLac.rewind();//copy from the beginning
+                retLac.put(explicitLac);
+                explicitLac.rewind();
+                retLac.flip();
+            }
+        }
+        return retLac;
+    }
+
+    public void setExplicitLac(ByteBuffer lac) {
+        synchronized(this) {
+            if (explicitLac == null) {
+                explicitLac = ByteBuffer.allocate(lac.capacity());
+            }
+            explicitLac.put(lac);
+            explicitLac.rewind();
+            
+            long ledgerId = explicitLac.getLong();            
+            long explicitLacValue = explicitLac.getLong();
+            setLastAddConfirmed(explicitLacValue);
+            explicitLac.rewind();
+        }
+        LOG.debug("fileInfo:SetLac: {}", explicitLac);
+    }
+
     synchronized public void readHeader() throws IOException {
         if (lf.exists()) {
             if (fc != null) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index f1ffcde..1ea000c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -389,6 +389,34 @@ public class IndexPersistenceMgr {
         }
     }
 
+    void setExplicitLac(long ledgerId, ByteBuffer lac) throws IOException {
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(ledgerId, null);
+            fi.setExplicitLac(lac);
+            return;
+        } finally {
+            if (null != fi) {
+                fi.release();
+            }
+        }
+    }
+
+    public ByteBuffer getExplicitLac(long ledgerId) {
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(ledgerId, null);
+            return fi.getExplicitLac();
+        } catch (IOException e) {
+            LOG.error("Exception during getLastAddConfirmed: {}", e);
+            return null;
+        } finally {
+            if (null != fi) {
+                fi.release();
+            }
+        }
+    }
+
     int getOpenFileLimit() {
         return openFileLimit;
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index ee6edd5..308110b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -215,6 +215,14 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
         return ledgerCache.isFenced(ledgerId);
     }
 
+    public void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException {
+        ledgerCache.setExplicitLac(ledgerId, lac);
+    }
+
+    public ByteBuffer getExplicitLac(long ledgerId) {
+        return ledgerCache.getExplicitLac(ledgerId);
+    }
+
     @Override
     public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
         ledgerCache.setMasterKey(ledgerId, masterKey);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
index 4e0fdc1..e004cb6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.bookie;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 /**
  * This class maps a ledger entry number into a location (entrylogid, offset) in
@@ -50,4 +51,6 @@ interface LedgerCache extends Closeable {
     void deleteLedger(long ledgerId) throws IOException;
 
     LedgerCacheBean getJMXBean();
+    void setExplicitLac(long ledgerId, ByteBuffer lac) throws IOException;
+    ByteBuffer getExplicitLac(long ledgerId);
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
index db84268..cece79f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
@@ -22,6 +22,7 @@
 package org.apache.bookkeeper.bookie;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -136,6 +137,14 @@ public class LedgerCacheImpl implements LedgerCache {
         return indexPersistenceManager.isFenced(ledgerId);
     }
 
+    public void setExplicitLac(long ledgerId, ByteBuffer lac) throws IOException {
+        indexPersistenceManager.setExplicitLac(ledgerId, lac);
+    }
+
+    public ByteBuffer getExplicitLac(long ledgerId) {
+        return indexPersistenceManager.getExplicitLac(ledgerId);
+    }
+
     @Override
     public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
         indexPersistenceManager.setMasterKey(ledgerId, masterKey);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index c7a8c97..bcb0c30 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -59,5 +59,10 @@ public abstract class LedgerDescriptor {
 
     abstract long addEntry(ByteBuffer entry) throws IOException;
     abstract ByteBuffer readEntry(long entryId) throws IOException;
+
     abstract long getLastAddConfirmed() throws IOException;
+
+    abstract void setExplicitLac(ByteBuffer entry) throws IOException;
+
+    abstract  ByteBuffer getExplicitLac();
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index 266236d..bf1c129 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -69,6 +69,15 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
     }
 
     @Override
+    void setExplicitLac(ByteBuffer lac) throws IOException {
+        ledgerStorage.setExplicitlac(ledgerId, lac);
+    }
+
+    @Override
+    ByteBuffer getExplicitLac() {
+        return ledgerStorage.getExplicitLac(ledgerId);
+    }
+    @Override
     long addEntry(ByteBuffer entry) throws IOException {
         long ledgerId = entry.getLong();
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index b0015bd..84a309f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -149,4 +149,8 @@ public interface LedgerStorage {
      * Get the JMX management bean for this LedgerStorage
      */
     BKMBeanInfo getJMXBean();
+
+    void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException;
+
+    ByteBuffer getExplicitLac(long ledgerId);
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
index d3f1728..05067d0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
@@ -36,6 +36,20 @@ public interface AsyncCallback {
         void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx);
     }
 
+    public interface AddLacCallback {
+        /**
+         * Callback declaration
+         *
+         * @param rc
+         *          return code
+         * @param lh
+         *          ledger handle
+         * @param ctx
+         *          context object
+         */
+        void addLacComplete(int rc, LedgerHandle lh, Object ctx);
+    }
+
     public interface CloseCallback {
         /**
          * Callback definition

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index 349709d..2377c1c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -40,7 +40,7 @@ public abstract class BKException extends Exception {
     /**
      * Create an exception from an error code
      * @param code return error code
-     * @return correponding exception
+     * @return corresponding exception
      */
     public static BKException create(int code) {
         switch (code) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 2f8a0b8..8959462 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -95,6 +95,9 @@ public class BookKeeper implements AutoCloseable {
     private OpStatsLogger deleteOpLogger;
     private OpStatsLogger readOpLogger;
     private OpStatsLogger addOpLogger;
+    private OpStatsLogger writeLacOpLogger;
+    private OpStatsLogger readLacOpLogger;
+
 
     // whether the socket factory is one we created, or is owned by whoever
     // instantiated us
@@ -121,6 +124,7 @@ public class BookKeeper implements AutoCloseable {
     final EnsemblePlacementPolicy placementPolicy;
 
     final ClientConfiguration conf;
+    final int explicitLacInterval;
 
     // Close State
     boolean closed = false;
@@ -275,7 +279,7 @@ public class BookKeeper implements AutoCloseable {
     }
 
     /**
-     * Contructor for use with the builder. Other constructors also use it.
+     * Constructor for use with the builder. Other constructors also use it.
      */
     private BookKeeper(ClientConfiguration conf,
                        ZooKeeper zkc,
@@ -369,10 +373,16 @@ public class BookKeeper implements AutoCloseable {
         this.ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
         this.ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
         this.ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator();
+        this.explicitLacInterval = conf.getExplictLacInterval();
+        LOG.debug("Explicit LAC Interval : {}", this.explicitLacInterval);
 
         scheduleBookieHealthCheckIfEnabled();
     }
 
+    public int getExplicitLacInterval() {
+        return explicitLacInterval;
+    }
+
     private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
                                                                       DNSToSwitchMapping dnsResolver,
                                                                       HashedWheelTimer timer,
@@ -906,8 +916,11 @@ public class BookKeeper implements AutoCloseable {
      * to add entries to the ledger. Any attempt to add entries will throw an
      * exception.
      *
-     * Reads from the returned ledger will only be able to read entries up until
+     * Reads from the returned ledger will be able to read entries up until
      * the lastConfirmedEntry at the point in time at which the ledger was opened.
+     * If an attempt is made to read beyond the ledger handle's LAC, an attempt is made
+     * to get the latest LAC from bookies or metadata, and if the entry_id of the read request
+     * is less than or equal to the new LAC, read will be allowed to proceed.
      *
      * @param lId
      *          ledger identifier
@@ -1199,6 +1212,8 @@ public class BookKeeper implements AutoCloseable {
         openOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.OPEN_OP);
         readOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_OP);
         addOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.ADD_OP);
+        writeLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.WRITE_LAC_OP);
+        readLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_LAC_OP);
     }
 
     OpStatsLogger getCreateOpLogger() { return createOpLogger; }
@@ -1206,4 +1221,6 @@ public class BookKeeper implements AutoCloseable {
     OpStatsLogger getDeleteOpLogger() { return deleteOpLogger; }
     OpStatsLogger getReadOpLogger() { return readOpLogger; }
     OpStatsLogger getAddOpLogger() { return addOpLogger; }
+    OpStatsLogger getWriteLacOpLogger() { return writeLacOpLogger; }
+    OpStatsLogger getReadLacOpLogger() { return readLacOpLogger; }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index e245ea3..a020425 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -28,6 +28,8 @@ public interface BookKeeperClientStats {
     public final static String OPEN_OP = "LEDGER_OPEN";
     public final static String ADD_OP = "ADD_ENTRY";
     public final static String READ_OP = "READ_ENTRY";
+    public final static String WRITE_LAC_OP = "WRITE_LAC";
+    public final static String READ_LAC_OP = "READ_LAC";
     public final static String PENDING_ADDS = "NUM_PENDING_ADD";
     public final static String ENSEMBLE_CHANGES = "NUM_ENSEMBLE_CHANGE";
     public final static String LAC_UPDATE_HITS = "LAC_UPDATE_HITS";
@@ -40,4 +42,8 @@ public interface BookKeeperClientStats {
     public final static String CHANNEL_TIMEOUT_READ = "TIMEOUT_READ_ENTRY";
     public final static String CHANNEL_ADD_OP = "ADD_ENTRY";
     public final static String CHANNEL_TIMEOUT_ADD = "TIMEOUT_ADD_ENTRY";
+    public final static String CHANNEL_WRITE_LAC_OP = "WRITE_LAC";
+    public final static String CHANNEL_TIMEOUT_WRITE_LAC = "TIMEOUT_WRITE_LAC";
+    public final static String CHANNEL_READ_LAC_OP = "READ_LAC";
+    public final static String CHANNEL_TIMEOUT_READ_LAC = "TIMEOUT_READ_LAC";
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
index 2753680..c72f31a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
@@ -40,6 +40,7 @@ abstract class DigestManager {
     static final Logger logger = LoggerFactory.getLogger(DigestManager.class);
 
     static final int METADATA_LENGTH = 32;
+    static final int LAC_METADATA_LENGTH = 16;
 
     long ledgerId;
 
@@ -102,6 +103,32 @@ abstract class DigestManager {
         return ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer), ChannelBuffers.wrappedBuffer(data, doffset, dlength));
     }
 
+    /**
+     * Computes the digest for writeLac for sending.
+     *
+     * @param lac
+     * @return
+     */
+
+    public ChannelBuffer computeDigestAndPackageForSendingLac(long lac) {
+
+        byte[] bufferArray = new byte[LAC_METADATA_LENGTH + macCodeLength];
+        ByteBuffer buffer = ByteBuffer.wrap(bufferArray);
+        buffer.putLong(ledgerId);
+        buffer.putLong(lac);
+        buffer.flip();
+
+        update(buffer.array(), 0, LAC_METADATA_LENGTH);
+        byte[] digest = getValueAndReset();
+
+        buffer.limit(buffer.capacity());
+        buffer.position(LAC_METADATA_LENGTH);
+        buffer.put(digest);
+        buffer.flip();
+
+        return ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer));
+    }
+
     private void verifyDigest(ChannelBuffer dataReceived) throws BKDigestMatchException {
         verifyDigest(LedgerHandle.INVALID_ENTRY_ID, dataReceived, true);
     }
@@ -153,6 +180,34 @@ abstract class DigestManager {
 
     }
 
+    long verifyDigestAndReturnLac(ChannelBuffer dataReceived) throws BKDigestMatchException{
+        ByteBuffer dataReceivedBuffer = dataReceived.toByteBuffer();
+        byte[] digest;
+        if ((LAC_METADATA_LENGTH + macCodeLength) > dataReceived.readableBytes()) {
+            logger.error("Data received is smaller than the minimum for this digest type."
+                    + " Either the packet it corrupt, or the wrong digest is configured. "
+                    + " Digest type: {}, Packet Length: {}",
+                    this.getClass().getName(), dataReceived.readableBytes());
+            throw new BKDigestMatchException();
+        }
+        update(dataReceivedBuffer.array(), dataReceivedBuffer.position(), LAC_METADATA_LENGTH);
+        digest = getValueAndReset();
+        for (int i = 0; i < digest.length; i++) {
+            if (digest[i] != dataReceived.getByte(LAC_METADATA_LENGTH + i)) {
+                logger.error("Mac mismatch for ledger-id LAC: " + ledgerId);
+                throw new BKDigestMatchException();
+            }
+        }
+        long actualLedgerId = dataReceived.readLong();
+        long lac = dataReceived.readLong();
+        if (actualLedgerId != ledgerId) {
+            logger.error("Ledger-id mismatch in authenticated message, expected: " + ledgerId + " , actual: "
+                         + actualLedgerId);
+            throw new BKDigestMatchException();
+        }
+        return lac;
+    }
+
     /**
      * Verify that the digest matches and returns the data in the entry.
      *

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
new file mode 100644
index 0000000..65ef8af
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
@@ -0,0 +1,153 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+
+import org.apache.bookkeeper.client.LedgerHandle.LastAddConfirmedCallback;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+interface ExplicitLacFlushPolicy {
+    void stopExplicitLacFlush();
+
+    void updatePiggyBackedLac(long piggyBackedLac);
+
+    static final ExplicitLacFlushPolicy VOID_EXPLICITLAC_FLUSH_POLICY = new ExplicitLacFlushPolicy() {
+        @Override
+        public void stopExplicitLacFlush() {
+            // void method
+        }
+
+        @Override
+        public void updatePiggyBackedLac(long piggyBackedLac) {
+            // void method
+        }
+    };
+
+    class ExplicitLacFlushPolicyImpl implements ExplicitLacFlushPolicy {
+        final static Logger LOG = LoggerFactory.getLogger(ExplicitLacFlushPolicyImpl.class);
+
+        volatile long piggyBackedLac = LedgerHandle.INVALID_ENTRY_ID;
+        volatile long explicitLac = LedgerHandle.INVALID_ENTRY_ID;
+        final LedgerHandle lh;
+        ScheduledFuture<?> scheduledFuture;
+
+        ExplicitLacFlushPolicyImpl(LedgerHandle lh) {
+            this.lh = lh;
+            scheduleExplictLacFlush();
+            LOG.debug("Scheduled Explicit Last Add Confirmed Update");
+        }
+
+        private long getExplicitLac() {
+            return explicitLac;
+        }
+
+        private void setExplicitLac(long explicitLac) {
+            this.explicitLac = explicitLac;
+        }
+
+        private long getPiggyBackedLac() {
+            return piggyBackedLac;
+        }
+
+        public void setPiggyBackedLac(long piggyBackedLac) {
+            this.piggyBackedLac = piggyBackedLac;
+        }
+
+        private void scheduleExplictLacFlush() {
+            int explicitLacIntervalInSec = lh.bk.getExplicitLacInterval();
+            final SafeRunnable updateLacTask = new SafeRunnable() {
+                @Override
+                public void safeRun() {
+                    // Made progress since previous explicitLAC through
+                    // Piggyback, so no need to send an explicit LAC update to
+                    // bookies.
+                    if (getExplicitLac() < getPiggyBackedLac()) {
+                        LOG.debug("ledgerid: {}", lh.getId());
+                        LOG.debug("explicitLac:{} piggybackLac:{}", getExplicitLac(),
+                                getPiggyBackedLac());
+                        setExplicitLac(getPiggyBackedLac());
+                        return;
+                    }
+
+                    if (lh.getLastAddConfirmed() > getExplicitLac()) {
+                        // Send Explicit LAC
+                        LOG.debug("ledgerid: {}", lh.getId());
+                        asyncExplicitLacFlush(lh.getLastAddConfirmed());
+                        setExplicitLac(lh.getLastAddConfirmed());
+                        LOG.debug("After sending explict LAC lac: {}  explicitLac:{}", lh.getLastAddConfirmed(),
+                                getExplicitLac());
+                    }
+                }
+
+                @Override
+                public String toString() {
+                    return String.format("UpdateLacTask ledgerId - (%d)", lh.getId());
+                }
+            };
+            try {
+                scheduledFuture = lh.bk.mainWorkerPool.scheduleAtFixedRateOrdered(lh.getId(), updateLacTask,
+                        explicitLacIntervalInSec, explicitLacIntervalInSec, SECONDS);
+            } catch (RejectedExecutionException re) {
+                LOG.error("Scheduling of ExplictLastAddConfirmedFlush for ledger: {} has failed because of {}",
+                        lh.getId(), re);
+            }
+        }
+
+        /**
+         * Make a LastAddUpdate request.
+         */
+        void asyncExplicitLacFlush(final long explicitLac) {
+            final LastAddConfirmedCallback cb = LastAddConfirmedCallback.INSTANCE;
+            final PendingWriteLacOp op = new PendingWriteLacOp(lh, cb, null);
+            op.setLac(explicitLac);
+            try {
+                LOG.debug("Sending Explicit LAC: {}", explicitLac);
+                lh.bk.mainWorkerPool.submit(new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        ChannelBuffer toSend = lh.macManager
+                                .computeDigestAndPackageForSendingLac(lh.getLastAddConfirmed());
+                        op.initiate(toSend);
+                    }
+                });
+            } catch (RejectedExecutionException e) {
+                cb.addLacComplete(lh.bk.getReturnRc(BKException.Code.InterruptedException), lh, null);
+            }
+        }
+
+        @Override
+        public void stopExplicitLacFlush() {
+            scheduledFuture.cancel(true);
+        }
+
+        @Override
+        public void updatePiggyBackedLac(long piggyBackedLac) {
+            setPiggyBackedLac(piggyBackedLac);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 11212a7..290caa9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -31,12 +31,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
@@ -55,8 +57,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
 /**
  * Ledger handle contains ledger metadata and is used to access the read and
@@ -71,6 +71,7 @@ public class LedgerHandle implements AutoCloseable {
     final long ledgerId;
     long lastAddPushed;
     volatile long lastAddConfirmed;
+
     long length;
     final DigestManager macManager;
     final DistributionSchedule distributionSchedule;
@@ -85,6 +86,7 @@ public class LedgerHandle implements AutoCloseable {
 
     final AtomicInteger blockAddCompletions = new AtomicInteger(0);
     Queue<PendingAddOp> pendingAddOps;
+    ExplicitLacFlushPolicy explicitLacFlushPolicy;
 
     final Counter ensembleChangeCounter;
     final Counter lacUpdateHitsCounter;
@@ -97,6 +99,7 @@ public class LedgerHandle implements AutoCloseable {
         this.metadata = metadata;
         this.pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>();
 
+
         if (metadata.isClosed()) {
             lastAddConfirmed = lastAddPushed = metadata.getLastEntryId();
             length = metadata.getLength();
@@ -130,6 +133,15 @@ public class LedgerHandle implements AutoCloseable {
                                                   return pendingAddOps.size();
                                               }
                                           });
+        initializeExplicitLacFlushPolicy();
+    }
+
+    protected void initializeExplicitLacFlushPolicy() {
+        if (!metadata.isClosed() && bk.getExplicitLacInterval() > 0) {
+            explicitLacFlushPolicy = new ExplicitLacFlushPolicy.ExplicitLacFlushPolicyImpl(this);
+        } else {
+            explicitLacFlushPolicy = ExplicitLacFlushPolicy.VOID_EXPLICITLAC_FLUSH_POLICY;
+        }
     }
 
     /**
@@ -272,6 +284,8 @@ public class LedgerHandle implements AutoCloseable {
 
         asyncClose(new SyncCloseCallback(), counter);
 
+        explicitLacFlushPolicy.stopExplicitLacFlush();
+        
         SynchCallbackUtils.waitForResult(counter);
     }
 
@@ -478,11 +492,18 @@ public class LedgerHandle implements AutoCloseable {
      * @param ctx
      *          control object
      */
-    public void asyncReadEntries(long firstEntry, long lastEntry,
-                                 ReadCallback cb, Object ctx) {
+    public void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx) {
         // Little sanity check
-        if (firstEntry < 0 || lastEntry > lastAddConfirmed
-                || firstEntry > lastEntry) {
+        if (firstEntry < 0 || firstEntry > lastEntry) {
+            LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}",
+                    new Object[] { ledgerId, firstEntry, lastEntry });
+            cb.readComplete(BKException.Code.IncorrectParameterException, this, null, ctx);
+            return;
+        }
+
+        if (lastEntry > lastAddConfirmed) {
+            LOG.error("ReadException on ledgerId:{} firstEntry:{} lastEntry:{}",
+                    new Object[] { ledgerId, firstEntry, lastEntry });
             cb.readComplete(BKException.Code.ReadException, this, null, ctx);
             return;
         }
@@ -929,6 +950,86 @@ public class LedgerHandle implements AutoCloseable {
         return ctx.getlastConfirmed();
     }
 
+    /**
+     * Obtains asynchronously the explicit last add confirmed from a quorum of
+     * bookies. This call obtains the the explicit last add confirmed each
+     * bookie has received for this ledger and returns the maximum. If in the
+     * write LedgerHandle, explicitLAC feature is not enabled then this will
+     * return {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID}. If the read explicit
+     * lastaddconfirmed is greater than getLastAddConfirmed, then it updates the
+     * lastAddConfirmed of this ledgerhandle. If the ledger has been closed, it
+     * returns the value of the last add confirmed from the metadata.
+     *
+     * @see #getLastAddConfirmed()
+     * 
+     * @param cb
+     *          callback to return read explicit last confirmed
+     * @param ctx
+     *          callback context
+     */
+    public void asyncReadExplicitLastConfirmed(final ReadLastConfirmedCallback cb, final Object ctx) {
+        boolean isClosed;
+        synchronized (this) {
+            isClosed = metadata.isClosed();
+            if (isClosed) {
+                lastAddConfirmed = metadata.getLastEntryId();
+                length = metadata.getLength();
+            }
+        }
+        if (isClosed) {
+            cb.readLastConfirmedComplete(BKException.Code.OK, lastAddConfirmed, ctx);
+            return;
+        }
+
+        PendingReadLacOp.LacCallback innercb = new PendingReadLacOp.LacCallback() {
+
+            @Override
+            public void getLacComplete(int rc, long lac) {
+                if (rc == BKException.Code.OK) {
+                    // here we are trying to update lac only but not length 
+                    updateLastConfirmed(lac, 0);
+                    cb.readLastConfirmedComplete(rc, lac, ctx);
+                } else {
+                    cb.readLastConfirmedComplete(rc, INVALID_ENTRY_ID, ctx);
+                }
+            }
+        };
+        new PendingReadLacOp(this, innercb).initiate();
+    }
+
+    /**
+     * Obtains synchronously the explicit last add confirmed from a quorum of
+     * bookies. This call obtains the the explicit last add confirmed each
+     * bookie has received for this ledger and returns the maximum. If in the
+     * write LedgerHandle, explicitLAC feature is not enabled then this will
+     * return {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID}. If the read explicit
+     * lastaddconfirmed is greater than getLastAddConfirmed, then it updates the
+     * lastAddConfirmed of this ledgerhandle. If the ledger has been closed, it
+     * returns the value of the last add confirmed from the metadata.
+     *
+     * @see #getLastAddConfirmed()
+     *
+     * @return The entry id of the explicit last confirmed write or
+     *         {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID} if no entry has been
+     *         confirmed or if explicitLAC feature is not enabled in write
+     *         LedgerHandle.
+     * @throws InterruptedException
+     * @throws BKException
+     */
+    public long readExplicitLastConfirmed() throws InterruptedException, BKException {
+        LastConfirmedCtx ctx = new LastConfirmedCtx();
+        asyncReadExplicitLastConfirmed(new SyncReadLastConfirmedCallback(), ctx);
+        synchronized (ctx) {
+            while (!ctx.ready()) {
+                ctx.wait();
+            }
+        }
+        if (ctx.getRC() != BKException.Code.OK) {
+            throw BKException.create(ctx.getRC());
+        }
+        return ctx.getlastConfirmed();
+    }
+
     // close the ledger and send fails to all the adds in the pipeline
     void handleUnrecoverableErrorDuringAdd(int rc) {
         if (metadata.isInRecovery()) {
@@ -976,8 +1077,11 @@ public class LedgerHandle implements AutoCloseable {
                 LOG.debug("Head of the queue entryId: {} is not lac: {} + 1", pendingAddOp.entryId, lastAddConfirmed);
                 return;
             }
+
             pendingAddOps.remove();
+            explicitLacFlushPolicy.updatePiggyBackedLac(lastAddConfirmed);
             lastAddConfirmed = pendingAddOp.entryId;
+
             pendingAddOp.submitCallback(BKException.Code.OK);
         }
 
@@ -1327,6 +1431,30 @@ public class LedgerHandle implements AutoCloseable {
         }
     }
 
+    static class LastAddConfirmedCallback implements AddLacCallback {
+        static final LastAddConfirmedCallback INSTANCE = new LastAddConfirmedCallback();
+        /**
+         * Implementation of callback interface for synchronous read method.
+         *
+         * @param rc
+         *          return code
+         * @param leder
+         *          ledger identifier
+         * @param entry
+         *          entry identifier
+         * @param ctx
+         *          control object
+         */
+        @Override
+        public void addLacComplete(int rc, LedgerHandle lh, Object ctx) {
+            if (rc != BKException.Code.OK) {
+                LOG.warn("LastAddConfirmedUpdate failed: {} ", BKException.getMessage(rc));
+            } else {
+                LOG.debug("Callback LAC Updated for: {} ", lh.getId());
+            }
+        }
+    }
+
     static class SyncReadCallback implements ReadCallback {
         /**
          * Implementation of callback interface for synchronous read method.

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
new file mode 100644
index 0000000..64e266f
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.client.DigestManager.RecoveryData;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * This represents a pending ReadLac operation.
+ *
+ * LAC is stored in two places on bookies.
+ * 1. WriteLac operation sends Explicit LAC and is stored in memory on each bookie.
+ * 2. Each AddEntry operation piggy-backs LAC which is stored on bookie's disk.
+ *
+ * This operation returns both of those entries and we pick the latest LAC out of
+ * available answers.
+ *
+ * This is an optional protocol operations to facilitate tailing readers
+ * to be up to date with the writer. This is best effort to get latest LAC
+ * from bookies, and doesn't affect the correctness of the protocol.
+ */
+
+class PendingReadLacOp implements ReadLacCallback {
+    static final Logger LOG = LoggerFactory.getLogger(PendingReadLacOp.class);
+    LedgerHandle lh;
+    LacCallback cb;
+    int numResponsesPending;
+    volatile boolean completed = false;
+    int lastSeenError = BKException.Code.ReadException;
+    final DistributionSchedule.QuorumCoverageSet coverageSet;
+    long maxLac = LedgerHandle.INVALID_ENTRY_ID;
+
+    /*
+     * Wrapper to get Lac from the request
+     */
+    interface LacCallback {
+        public void getLacComplete(int rc, long lac);
+    }
+
+    PendingReadLacOp(LedgerHandle lh, LacCallback cb) {
+        this.lh = lh;
+        this.cb = cb;
+        this.numResponsesPending = lh.metadata.getEnsembleSize();
+        this.coverageSet = lh.distributionSchedule.getCoverageSet();
+    }
+
+    public void initiate() {
+        for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
+            lh.bk.bookieClient.readLac(lh.metadata.currentEnsemble.get(i),
+                    lh.ledgerId, this, i);
+        }
+    }
+
+    @Override
+    public void readLacComplete(int rc, long ledgerId, final ChannelBuffer lacBuffer, final ChannelBuffer lastEntryBuffer, Object ctx) {
+        int bookieIndex = (Integer) ctx;
+        numResponsesPending--;
+        boolean heardValidResponse = false;
+
+        if (completed) {
+            return;
+        }
+
+        if (rc == BKException.Code.OK) {
+            try {
+                // Each bookie may have two store LAC in two places.
+                // One is in-memory copy in FileInfo and other is
+                // piggy-backed LAC on the last entry.
+                // This routine picks both of them and compares to return
+                // the latest Lac.
+
+                // Extract lac from FileInfo on the ledger.
+                long lac = lh.macManager.verifyDigestAndReturnLac(lacBuffer);
+                if (lac > maxLac) {
+                    maxLac = lac;
+                }
+
+                // Extract lac from last entry on the disk
+                RecoveryData recoveryData = lh.macManager.verifyDigestAndReturnLastConfirmed(lastEntryBuffer);
+                if (recoveryData.lastAddConfirmed > maxLac) {
+                    maxLac = recoveryData.lastAddConfirmed;
+                }
+                heardValidResponse = true;
+            } catch (BKDigestMatchException e) {
+                // Too bad, this bookie did not give us a valid answer, we
+                // still might be able to recover. So, continue
+                LOG.error("Mac mismatch while reading  ledger: " + ledgerId + " LAC from bookie: "
+                        + lh.metadata.currentEnsemble.get(bookieIndex));
+                rc = BKException.Code.DigestMatchException;
+            }
+        }
+
+        if (rc == BKException.Code.NoSuchLedgerExistsException || rc == BKException.Code.NoSuchEntryException) {
+            heardValidResponse = true;
+        }
+
+        if (rc == BKException.Code.UnauthorizedAccessException && !completed) {
+            cb.getLacComplete(rc, maxLac);
+            completed = true;
+            return;
+        }
+
+        if (!heardValidResponse && BKException.Code.OK != rc) {
+            lastSeenError = rc;
+        }
+
+        // We don't consider a success until we have coverage set responses.
+        if (heardValidResponse
+                && coverageSet.addBookieAndCheckCovered(bookieIndex)
+                && !completed) {
+            completed = true;
+            LOG.debug("Read LAC complete with enough validResponse for ledger: {} LAC: {}",
+                    ledgerId, maxLac);
+            cb.getLacComplete(BKException.Code.OK, maxLac);
+            return;
+        }
+
+        if (numResponsesPending == 0 && !completed) {
+            LOG.info("While readLac ledger: " + ledgerId + " did not hear success responses from all of ensemble");
+            cb.getLacComplete(lastSeenError, maxLac);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
new file mode 100644
index 0000000..dc7368b
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This represents a pending WriteLac operation. When it has got
+ * success from Ack Quorum bookies, sends success back to the application,
+ * otherwise failure is sent back to the caller.
+ *
+ * This is an optional protocol operations to facilitate tailing readers
+ * to be up to date with the writer. This is best effort to get latest LAC
+ * from bookies, and doesn't affect the correctness of the protocol.
+ */
+class PendingWriteLacOp implements WriteLacCallback {
+    private final static Logger LOG = LoggerFactory.getLogger(PendingWriteLacOp.class);
+    ChannelBuffer toSend;
+    AddLacCallback cb;
+    long lac;
+    Object ctx;
+    Set<Integer> writeSet;
+    Set<Integer> receivedResponseSet;
+
+    DistributionSchedule.AckSet ackSet;
+    boolean completed = false;
+    int lastSeenError = BKException.Code.WriteException;
+
+    LedgerHandle lh;
+    OpStatsLogger putLacOpLogger;
+
+    PendingWriteLacOp(LedgerHandle lh, AddLacCallback cb, Object ctx) {
+        this.lh = lh;
+        this.cb = cb;
+        this.ctx = ctx;
+        this.lac = LedgerHandle.INVALID_ENTRY_ID;
+        ackSet = lh.distributionSchedule.getAckSet();
+        putLacOpLogger = lh.bk.getWriteLacOpLogger();
+    }
+
+    void setLac(long lac) {
+        this.lac = lac;
+        this.writeSet = new HashSet<Integer>(lh.distributionSchedule.getWriteSet(lac));
+        this.receivedResponseSet = new HashSet<Integer>(writeSet);
+    }
+
+    void sendWriteLacRequest(int bookieIndex) {
+        lh.bk.bookieClient.writeLac(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey,
+                lac, toSend, this, bookieIndex);
+    }
+
+    void initiate(ChannelBuffer toSend) {
+        this.toSend = toSend;
+        for (int bookieIndex: writeSet) {
+            sendWriteLacRequest(bookieIndex);
+        }
+    }
+
+    @Override
+    public void writeLacComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx) {
+        int bookieIndex = (Integer) ctx;
+
+        if (completed) {
+            return;
+        }
+
+        if (BKException.Code.OK != rc) {
+            lastSeenError = rc;
+        }
+
+        // We got response.
+        receivedResponseSet.remove(bookieIndex);
+
+        if (rc == BKException.Code.OK) {
+            if (ackSet.addBookieAndCheck(bookieIndex) && !completed) {
+                completed = true;
+                cb.addLacComplete(rc, lh, ctx);
+                return;
+            }
+        } else {
+            LOG.warn("WriteLac did not succeed: Ledger {} on {}", new Object[] { ledgerId, addr });
+        }
+        
+        if(receivedResponseSet.isEmpty()){
+            completed = true;
+            cb.addLacComplete(lastSeenError, lh, ctx);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index 711f209..1834eff 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -54,6 +54,10 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements LedgerMetadataListene
                     ReadOnlyLedgerHandle.this.metadata.getVersion().compare(this.m.getVersion());
             if (Version.Occurred.BEFORE == occurred) {
                 LOG.info("Updated ledger metadata for ledger {} to {}.", ledgerId, this.m);
+                if (this.m.isClosed()) {
+                        ReadOnlyLedgerHandle.this.lastAddConfirmed = this.m.getLastEntryId();
+                        ReadOnlyLedgerHandle.this.length = this.m.getLength();
+                }
                 ReadOnlyLedgerHandle.this.metadata = this.m;
             }
         }
@@ -170,4 +174,8 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements LedgerMetadataListene
         return String.format("ReadOnlyLedgerHandle(lid = %d, id = %d)", ledgerId, super.hashCode());
     }
 
+    @Override
+    protected void initializeExplicitLacFlushPolicy() {
+        explicitLacFlushPolicy = ExplicitLacFlushPolicy.VOID_EXPLICITLAC_FLUSH_POLICY;
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 7353c3f..fa42dc9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -65,6 +65,7 @@ public class ClientConfiguration extends AbstractConfiguration {
     protected final static String ADD_ENTRY_QUORUM_TIMEOUT_SEC = "addEntryQuorumTimeoutSec";
     protected final static String READ_ENTRY_TIMEOUT_SEC = "readEntryTimeoutSec";
     protected final static String TIMEOUT_TASK_INTERVAL_MILLIS = "timeoutTaskIntervalMillis";
+    protected final static String EXPLICIT_LAC_INTERVAL = "explicitLacInterval";
     protected final static String PCBC_TIMEOUT_TIMER_TICK_DURATION_MS = "pcbcTimeoutTimerTickDurationMs";
     protected final static String PCBC_TIMEOUT_TIMER_NUM_TICKS = "pcbcTimeoutTimerNumTicks";
     protected final static String TIMEOUT_TIMER_TICK_DURATION_MS = "timeoutTimerTickDurationMs";
@@ -76,7 +77,7 @@ public class ClientConfiguration extends AbstractConfiguration {
     protected final static String BOOKIE_ERROR_THRESHOLD_PER_INTERVAL = "bookieErrorThresholdPerInterval";
     protected final static String BOOKIE_QUARANTINE_TIME_SECONDS = "bookieQuarantineTimeSeconds";
 
-    // Number Woker Threads
+    // Number Worker Threads
     protected final static String NUM_WORKER_THREADS = "numWorkerThreads";
 
     // Ensemble Placement Policy
@@ -595,6 +596,29 @@ public class ClientConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Get the configured interval between  explicit LACs to bookies.
+     * Generally LACs are piggy-backed on writes, and user can configure
+     * the interval between these protocol messages. A value of '0' disables
+     * sending any explicit LACs.
+     *
+     * @return interval between explicit LACs
+     */
+    public int getExplictLacInterval() {
+        return getInt(EXPLICIT_LAC_INTERVAL, 0);
+    }
+
+    /**
+     * Set the interval to check the need for sending an explicit LAC.
+     * @param interval
+     *        Number of seconds between checking the need for sending an explict LAC.
+     * @return Client configuration.
+     */
+    public ClientConfiguration setExplictLacInterval(int interval) {
+        setProperty(EXPLICIT_LAC_INTERVAL, interval);
+        return this;
+    }
+
+    /**
      * Get the tick duration in milliseconds that used for the
      * {@link org.jboss.netty.util.HashedWheelTimer} that used by PCBC to timeout
      * requests.

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 9b0865a..4a742da 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -33,10 +33,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.ClientAuthProvider;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -166,6 +169,41 @@ public class BookieClient implements PerChannelBookieClientFactory {
         return clientPool;
     }
 
+    public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
+            final long lac, final ChannelBuffer toSend, final WriteLacCallback cb, final Object ctx) {
+        closeLock.readLock().lock();
+        try {
+            final PerChannelBookieClientPool client = lookupClient(addr, lac);
+            if (client == null) {
+                cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
+                                  ledgerId, addr, ctx);
+                return;
+            }
+
+            client.obtain(new GenericCallback<PerChannelBookieClient>() {
+                @Override
+                public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
+                    if (rc != BKException.Code.OK) {
+                        try {
+                            executor.submitOrdered(ledgerId, new SafeRunnable() {
+                                @Override
+                                public void safeRun() {
+                                    cb.writeLacComplete(rc, ledgerId, addr, ctx);
+                                }
+                            });
+                        } catch (RejectedExecutionException re) {
+                            cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
+                        }
+                        return;
+                    }
+                    pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
+                }
+            });
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+
     public void addEntry(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
             final long entryId,
             final ChannelBuffer toSend, final WriteCallback cb, final Object ctx, final int options) {
@@ -243,6 +281,39 @@ public class BookieClient implements PerChannelBookieClientFactory {
         }
     }
 
+    public void readLac(final BookieSocketAddress addr, final long ledgerId, final ReadLacCallback cb, final Object ctx) {
+        closeLock.readLock().lock();
+        try {
+            final PerChannelBookieClientPool client = lookupClient(addr, BookieProtocol.LAST_ADD_CONFIRMED);
+            if (client == null) {
+                cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), ledgerId, null, null, ctx);
+                return;
+            }
+            client.obtain(new GenericCallback<PerChannelBookieClient>() {
+                @Override
+                public void operationComplete(final int rc,PerChannelBookieClient pcbc) {
+                    if (rc != BKException.Code.OK) {
+                        try {
+                            executor.submitOrdered(ledgerId, new SafeRunnable() {
+                                @Override
+                                public void safeRun() {
+                                    cb.readLacComplete(rc, ledgerId, null, null, ctx);
+                                }
+                            });
+                        } catch (RejectedExecutionException re) {
+                            cb.readLacComplete(getRc(BKException.Code.InterruptedException),
+                                    ledgerId, null, null, ctx);
+                        }
+                        return;
+                    }
+                    pcbc.readLac(ledgerId, cb, ctx);
+                }
+            });
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+
     public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
                           final ReadEntryCallback cb, final Object ctx) {
         closeLock.readLock().lock();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 7b227fa..210bc72 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -41,6 +41,9 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC;
+
 
 public class BookieRequestProcessor implements RequestProcessor {
 
@@ -73,6 +76,8 @@ public class BookieRequestProcessor implements RequestProcessor {
     final OpStatsLogger addEntryStats;
     final OpStatsLogger readRequestStats;
     final OpStatsLogger readEntryStats;
+    final OpStatsLogger writeLacStats;
+    final OpStatsLogger readLacStats;
 
     public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
                                   StatsLogger statsLogger) {
@@ -86,6 +91,8 @@ public class BookieRequestProcessor implements RequestProcessor {
         this.addRequestStats = statsLogger.getOpStatsLogger(ADD_ENTRY_REQUEST);
         this.readEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY);
         this.readRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_REQUEST);
+        this.writeLacStats = statsLogger.getOpStatsLogger(WRITE_LAC);
+        this.readLacStats = statsLogger.getOpStatsLogger(READ_LAC);
     }
 
     @Override
@@ -135,6 +142,12 @@ public class BookieRequestProcessor implements RequestProcessor {
                             .setAuthResponse(message);
                     c.write(authResponse.build());
                     break;
+                case WRITE_LAC:
+                    processWriteLacRequestV3(r,c);
+                    break;
+                case READ_LAC:
+                    processReadLacRequestV3(r,c);
+                    break;
                 default:
                     LOG.info("Unknown operation type {}", header.getOperation());
                     BookkeeperProtocol.Response.Builder response =
@@ -185,6 +198,24 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
+    private void processWriteLacRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
+        WriteLacProcessorV3 writeLac = new WriteLacProcessorV3(r, c, this);
+        if (null == writeThreadPool) {
+            writeLac.run();
+        } else {
+            writeThreadPool.submit(writeLac);
+        }
+    }
+
+    private void processReadLacRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
+        ReadLacProcessorV3 readLac = new ReadLacProcessorV3(r, c, this);
+        if (null == readThreadPool) {
+            readLac.run();
+        } else {
+            readThreadPool.submit(readLac);
+        }
+    }
+
     private void processAddRequest(final BookieProtocol.Request r, final Channel c) {
         WriteEntryProcessor write = new WriteEntryProcessor(r, c, this);
         if (null == writeThreadPool) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index e6e7802..261c93d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -65,6 +65,14 @@ public class BookkeeperInternalCallbacks {
         void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx);
     }
 
+    public interface ReadLacCallback {
+        void readLacComplete(int rc, long ledgerId, ChannelBuffer lac, ChannelBuffer buffer, Object ctx);
+    }
+
+    public interface WriteLacCallback {
+        void writeLacComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx);
+    }
+
     public interface GenericCallback<T> {
         void operationComplete(int rc, T result);
     }


[2/3] bookkeeper git commit: BOOKKEEPER-874: Explict LAC from Writer to Bookies

Posted by si...@apache.org.
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
index 5fedfff..b5c0008 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
@@ -304,6 +304,14 @@ public final class BookkeeperProtocol {
      * <code>AUTH = 5;</code>
      */
     AUTH(4, 5),
+    /**
+     * <code>WRITE_LAC = 6;</code>
+     */
+    WRITE_LAC(5, 6),
+    /**
+     * <code>READ_LAC = 7;</code>
+     */
+    READ_LAC(6, 7),
     ;
 
     /**
@@ -330,6 +338,14 @@ public final class BookkeeperProtocol {
      * <code>AUTH = 5;</code>
      */
     public static final int AUTH_VALUE = 5;
+    /**
+     * <code>WRITE_LAC = 6;</code>
+     */
+    public static final int WRITE_LAC_VALUE = 6;
+    /**
+     * <code>READ_LAC = 7;</code>
+     */
+    public static final int READ_LAC_VALUE = 7;
 
 
     public final int getNumber() { return value; }
@@ -341,6 +357,8 @@ public final class BookkeeperProtocol {
         case 3: return RANGE_READ_ENTRY;
         case 4: return RANGE_ADD_ENTRY;
         case 5: return AUTH;
+        case 6: return WRITE_LAC;
+        case 7: return READ_LAC;
         default: return null;
       }
     }
@@ -1064,6 +1082,32 @@ public final class BookkeeperProtocol {
      * <code>optional .AuthMessage authRequest = 102;</code>
      */
     org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthRequestOrBuilder();
+
+    /**
+     * <code>optional .WriteLacRequest writeLacRequest = 103;</code>
+     */
+    boolean hasWriteLacRequest();
+    /**
+     * <code>optional .WriteLacRequest writeLacRequest = 103;</code>
+     */
+    org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest getWriteLacRequest();
+    /**
+     * <code>optional .WriteLacRequest writeLacRequest = 103;</code>
+     */
+    org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequestOrBuilder getWriteLacRequestOrBuilder();
+
+    /**
+     * <code>optional .ReadLacRequest readLacRequest = 104;</code>
+     */
+    boolean hasReadLacRequest();
+    /**
+     * <code>optional .ReadLacRequest readLacRequest = 104;</code>
+     */
+    org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest getReadLacRequest();
+    /**
+     * <code>optional .ReadLacRequest readLacRequest = 104;</code>
+     */
+    org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequestOrBuilder getReadLacRequestOrBuilder();
   }
   /**
    * Protobuf type {@code Request}
@@ -1169,6 +1213,32 @@ public final class BookkeeperProtocol {
               bitField0_ |= 0x00000008;
               break;
             }
+            case 826: {
+              org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.Builder subBuilder = null;
+              if (((bitField0_ & 0x00000010) == 0x00000010)) {
+                subBuilder = writeLacRequest_.toBuilder();
+              }
+              writeLacRequest_ = input.readMessage(org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.PARSER, extensionRegistry);
+              if (subBuilder != null) {
+                subBuilder.mergeFrom(writeLacRequest_);
+                writeLacRequest_ = subBuilder.buildPartial();
+              }
+              bitField0_ |= 0x00000010;
+              break;
+            }
+            case 834: {
+              org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.Builder subBuilder = null;
+              if (((bitField0_ & 0x00000020) == 0x00000020)) {
+                subBuilder = readLacRequest_.toBuilder();
+              }
+              readLacRequest_ = input.readMessage(org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.PARSER, extensionRegistry);
+              if (subBuilder != null) {
+                subBuilder.mergeFrom(readLacRequest_);
+                readLacRequest_ = subBuilder.buildPartial();
+              }
+              bitField0_ |= 0x00000020;
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1305,11 +1375,55 @@ public final class BookkeeperProtocol {
       return authRequest_;
     }
 
+    public static final int WRITELACREQUEST_FIELD_NUMBER = 103;
+    private org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest writeLacRequest_;
+    /**
+     * <code>optional .WriteLacRequest writeLacRequest = 103;</code>
+     */
+    public boolean hasWriteLacRequest() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional .WriteLacRequest writeLacRequest = 103;</code>
+     */
+    public org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest getWriteLacRequest() {
+      return writeLacRequest_;
+    }
+    /**
+     * <code>optional .WriteLacRequest writeLacRequest = 103;</code>
+     */
+    public org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequestOrBuilder getWriteLacRequestOrBuilder() {
+      return writeLacRequest_;
+    }
+
+    public static final int READLACREQUEST_FIELD_NUMBER = 104;
+    private org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest readLacRequest_;
+    /**
+     * <code>optional .ReadLacRequest readLacRequest = 104;</code>
+     */
+    public boolean hasReadLacRequest() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional .ReadLacRequest readLacRequest = 104;</code>
+     */
+    public org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest getReadLacRequest() {
+      return readLacRequest_;
+    }
+    /**
+     * <code>optional .ReadLacRequest readLacRequest = 104;</code>
+     */
+    public org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequestOrBuilder getReadLacRequestOrBuilder() {
+      return readLacRequest_;
+    }
+
     private void initFields() {
       header_ = org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.getDefaultInstance();
       readRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest.getDefaultInstance();
       addRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest.getDefaultInstance();
       authRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+      writeLacRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.getDefaultInstance();
+      readLacRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1343,6 +1457,18 @@ public final class BookkeeperProtocol {
           return false;
         }
       }
+      if (hasWriteLacRequest()) {
+        if (!getWriteLacRequest().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      if (hasReadLacRequest()) {
+        if (!getReadLacRequest().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -1362,6 +1488,12 @@ public final class BookkeeperProtocol {
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
         output.writeMessage(102, authRequest_);
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeMessage(103, writeLacRequest_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeMessage(104, readLacRequest_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1387,6 +1519,14 @@ public final class BookkeeperProtocol {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(102, authRequest_);
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(103, writeLacRequest_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(104, readLacRequest_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1500,6 +1640,8 @@ public final class BookkeeperProtocol {
           getReadRequestFieldBuilder();
           getAddRequestFieldBuilder();
           getAuthRequestFieldBuilder();
+          getWriteLacRequestFieldBuilder();
+          getReadLacRequestFieldBuilder();
         }
       }
       private static Builder create() {
@@ -1532,6 +1674,18 @@ public final class BookkeeperProtocol {
           authRequestBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000008);
+        if (writeLacRequestBuilder_ == null) {
+          writeLacRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.getDefaultInstance();
+        } else {
+          writeLacRequestBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000010);
+        if (readLacRequestBuilder_ == null) {
+          readLacRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.getDefaultInstance();
+        } else {
+          readLacRequestBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
 
@@ -1592,6 +1746,22 @@ public final class BookkeeperProtocol {
         } else {
           result.authRequest_ = authRequestBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        if (writeLacRequestBuilder_ == null) {
+          result.writeLacRequest_ = writeLacRequest_;
+        } else {
+          result.writeLacRequest_ = writeLacRequestBuilder_.build();
+        }
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        if (readLacRequestBuilder_ == null) {
+          result.readLacRequest_ = readLacRequest_;
+        } else {
+          result.readLacRequest_ = readLacRequestBuilder_.build();
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1620,6 +1790,12 @@ public final class BookkeeperProtocol {
         if (other.hasAuthRequest()) {
           mergeAuthRequest(other.getAuthRequest());
         }
+        if (other.hasWriteLacRequest()) {
+          mergeWriteLacRequest(other.getWriteLacRequest());
+        }
+        if (other.hasReadLacRequest()) {
+          mergeReadLacRequest(other.getReadLacRequest());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1651,6 +1827,18 @@ public final class BookkeeperProtocol {
             return false;
           }
         }
+        if (hasWriteLacRequest()) {
+          if (!getWriteLacRequest().isInitialized()) {
+            
+            return false;
+          }
+        }
+        if (hasReadLacRequest()) {
+          if (!getReadLacRequest().isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
 
@@ -2173,6 +2361,238 @@ public final class BookkeeperProtocol {
         return authRequestBuilder_;
       }
 
+      private org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest writeLacRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.getDefaultInstance();
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest, org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequestOrBuilder> writeLacRequestBuilder_;
+      /**
+       * <code>optional .WriteLacRequest writeLacRequest = 103;</code>
+       */
+      public boolean hasWriteLacRequest() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      /**
+       * <code>optional .WriteLacRequest writeLacRequest = 103;</code>
+       */
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest getWriteLacRequest() {
+        if (writeLacRequestBuilder_ == null) {
+          return writeLacRequest_;
+        } else {
+          return writeLacRequestBuilder_.getMessage();
+        }
+      }
+      /**
+       * <code>optional .WriteLacRequest writeLacRequest = 103;</code>
+       */
+      public Builder setWriteLacRequest(org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest value) {
+        if (writeLacRequestBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          writeLacRequest_ = value;
+          onChanged();
+        } else {
+          writeLacRequestBuilder_.setMessage(value);
+        }
+        bitField0_ |= 0x00000010;
+        return this;
+      }
+      /**
+       * <code>optional .WriteLacRequest writeLacRequest = 103;</code>
+       */
+      public Builder setWriteLacRequest(
+          org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.Builder builderForValue) {
+        if (writeLacRequestBuilder_ == null) {
+          writeLacRequest_ = builderForValue.build();
+          onChanged();
+        } else {
+          writeLacRequestBuilder_.setMessage(builderForValue.build());
+        }
+        bitField0_ |= 0x00000010;
+        return this;
+      }
+      /**
+       * <code>optional .WriteLacRequest writeLacRequest = 103;</code>
+       */
+      public Builder mergeWriteLacRequest(org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest value) {
+        if (writeLacRequestBuilder_ == null) {
+          if (((bitField0_ & 0x00000010) == 0x00000010) &&
+              writeLacRequest_ != org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.getDefaultInstance()) {
+            writeLacRequest_ =
+              org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.newBuilder(writeLacRequest_).mergeFrom(value).buildPartial();
+          } else {
+            writeLacRequest_ = value;
+          }
+          onChanged();
+        } else {
+          writeLacRequestBuilder_.mergeFrom(value);
+        }
+        bitField0_ |= 0x00000010;
+        return this;
+      }
+      /**
+       * <code>optional .WriteLacRequest writeLacRequest = 103;</code>
+       */
+      public Builder clearWriteLacRequest() {
+        if (writeLacRequestBuilder_ == null) {
+          writeLacRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.getDefaultInstance();
+          onChanged();
+        } else {
+          writeLacRequestBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000010);
+        return this;
+      }
+      /**
+       * <code>optional .WriteLacRequest writeLacRequest = 103;</code>
+       */
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.Builder getWriteLacRequestBuilder() {
+        bitField0_ |= 0x00000010;
+        onChanged();
+        return getWriteLacRequestFieldBuilder().getBuilder();
+      }
+      /**
+       * <code>optional .WriteLacRequest writeLacRequest = 103;</code>
+       */
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequestOrBuilder getWriteLacRequestOrBuilder() {
+        if (writeLacRequestBuilder_ != null) {
+          return writeLacRequestBuilder_.getMessageOrBuilder();
+        } else {
+          return writeLacRequest_;
+        }
+      }
+      /**
+       * <code>optional .WriteLacRequest writeLacRequest = 103;</code>
+       */
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest, org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequestOrBuilder> 
+          getWriteLacRequestFieldBuilder() {
+        if (writeLacRequestBuilder_ == null) {
+          writeLacRequestBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+              org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest, org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequestOrBuilder>(
+                  getWriteLacRequest(),
+                  getParentForChildren(),
+                  isClean());
+          writeLacRequest_ = null;
+        }
+        return writeLacRequestBuilder_;
+      }
+
+      private org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest readLacRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.getDefaultInstance();
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest, org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequestOrBuilder> readLacRequestBuilder_;
+      /**
+       * <code>optional .ReadLacRequest readLacRequest = 104;</code>
+       */
+      public boolean hasReadLacRequest() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      /**
+       * <code>optional .ReadLacRequest readLacRequest = 104;</code>
+       */
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest getReadLacRequest() {
+        if (readLacRequestBuilder_ == null) {
+          return readLacRequest_;
+        } else {
+          return readLacRequestBuilder_.getMessage();
+        }
+      }
+      /**
+       * <code>optional .ReadLacRequest readLacRequest = 104;</code>
+       */
+      public Builder setReadLacRequest(org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest value) {
+        if (readLacRequestBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          readLacRequest_ = value;
+          onChanged();
+        } else {
+          readLacRequestBuilder_.setMessage(value);
+        }
+        bitField0_ |= 0x00000020;
+        return this;
+      }
+      /**
+       * <code>optional .ReadLacRequest readLacRequest = 104;</code>
+       */
+      public Builder setReadLacRequest(
+          org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.Builder builderForValue) {
+        if (readLacRequestBuilder_ == null) {
+          readLacRequest_ = builderForValue.build();
+          onChanged();
+        } else {
+          readLacRequestBuilder_.setMessage(builderForValue.build());
+        }
+        bitField0_ |= 0x00000020;
+        return this;
+      }
+      /**
+       * <code>optional .ReadLacRequest readLacRequest = 104;</code>
+       */
+      public Builder mergeReadLacRequest(org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest value) {
+        if (readLacRequestBuilder_ == null) {
+          if (((bitField0_ & 0x00000020) == 0x00000020) &&
+              readLacRequest_ != org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.getDefaultInstance()) {
+            readLacRequest_ =
+              org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.newBuilder(readLacRequest_).mergeFrom(value).buildPartial();
+          } else {
+            readLacRequest_ = value;
+          }
+          onChanged();
+        } else {
+          readLacRequestBuilder_.mergeFrom(value);
+        }
+        bitField0_ |= 0x00000020;
+        return this;
+      }
+      /**
+       * <code>optional .ReadLacRequest readLacRequest = 104;</code>
+       */
+      public Builder clearReadLacRequest() {
+        if (readLacRequestBuilder_ == null) {
+          readLacRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.getDefaultInstance();
+          onChanged();
+        } else {
+          readLacRequestBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000020);
+        return this;
+      }
+      /**
+       * <code>optional .ReadLacRequest readLacRequest = 104;</code>
+       */
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.Builder getReadLacRequestBuilder() {
+        bitField0_ |= 0x00000020;
+        onChanged();
+        return getReadLacRequestFieldBuilder().getBuilder();
+      }
+      /**
+       * <code>optional .ReadLacRequest readLacRequest = 104;</code>
+       */
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequestOrBuilder getReadLacRequestOrBuilder() {
+        if (readLacRequestBuilder_ != null) {
+          return readLacRequestBuilder_.getMessageOrBuilder();
+        } else {
+          return readLacRequest_;
+        }
+      }
+      /**
+       * <code>optional .ReadLacRequest readLacRequest = 104;</code>
+       */
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest, org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequestOrBuilder> 
+          getReadLacRequestFieldBuilder() {
+        if (readLacRequestBuilder_ == null) {
+          readLacRequestBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+              org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest, org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequestOrBuilder>(
+                  getReadLacRequest(),
+                  getParentForChildren(),
+                  isClean());
+          readLacRequest_ = null;
+        }
+        return readLacRequestBuilder_;
+      }
+
       // @@protoc_insertion_point(builder_scope:Request)
     }
 
@@ -3809,113 +4229,66 @@ public final class BookkeeperProtocol {
     // @@protoc_insertion_point(class_scope:AddRequest)
   }
 
-  public interface ResponseOrBuilder extends
-      // @@protoc_insertion_point(interface_extends:Response)
+  public interface WriteLacRequestOrBuilder extends
+      // @@protoc_insertion_point(interface_extends:WriteLacRequest)
       com.google.protobuf.MessageOrBuilder {
 
     /**
-     * <code>required .BKPacketHeader header = 1;</code>
-     */
-    boolean hasHeader();
-    /**
-     * <code>required .BKPacketHeader header = 1;</code>
+     * <code>required int64 ledgerId = 1;</code>
      */
-    org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader getHeader();
+    boolean hasLedgerId();
     /**
-     * <code>required .BKPacketHeader header = 1;</code>
+     * <code>required int64 ledgerId = 1;</code>
      */
-    org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeaderOrBuilder getHeaderOrBuilder();
+    long getLedgerId();
 
     /**
-     * <code>required .StatusCode status = 2;</code>
-     *
-     * <pre>
-     * EOK if the underlying request succeeded. Each individual response
-     * has a more meaningful status. EBADREQ if we have an unsupported request.
-     * </pre>
+     * <code>required int64 lac = 2;</code>
      */
-    boolean hasStatus();
+    boolean hasLac();
     /**
-     * <code>required .StatusCode status = 2;</code>
-     *
-     * <pre>
-     * EOK if the underlying request succeeded. Each individual response
-     * has a more meaningful status. EBADREQ if we have an unsupported request.
-     * </pre>
+     * <code>required int64 lac = 2;</code>
      */
-    org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode getStatus();
+    long getLac();
 
     /**
-     * <code>optional .ReadResponse readResponse = 100;</code>
-     *
-     * <pre>
-     * Response
-     * </pre>
+     * <code>required bytes masterKey = 3;</code>
      */
-    boolean hasReadResponse();
+    boolean hasMasterKey();
     /**
-     * <code>optional .ReadResponse readResponse = 100;</code>
-     *
-     * <pre>
-     * Response
-     * </pre>
-     */
-    org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse getReadResponse();
-    /**
-     * <code>optional .ReadResponse readResponse = 100;</code>
-     *
-     * <pre>
-     * Response
-     * </pre>
-     */
-    org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponseOrBuilder getReadResponseOrBuilder();
-
-    /**
-     * <code>optional .AddResponse addResponse = 101;</code>
-     */
-    boolean hasAddResponse();
-    /**
-     * <code>optional .AddResponse addResponse = 101;</code>
-     */
-    org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse getAddResponse();
-    /**
-     * <code>optional .AddResponse addResponse = 101;</code>
+     * <code>required bytes masterKey = 3;</code>
      */
-    org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponseOrBuilder getAddResponseOrBuilder();
+    com.google.protobuf.ByteString getMasterKey();
 
     /**
-     * <code>optional .AuthMessage authResponse = 102;</code>
-     */
-    boolean hasAuthResponse();
-    /**
-     * <code>optional .AuthMessage authResponse = 102;</code>
+     * <code>required bytes body = 4;</code>
      */
-    org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthResponse();
+    boolean hasBody();
     /**
-     * <code>optional .AuthMessage authResponse = 102;</code>
+     * <code>required bytes body = 4;</code>
      */
-    org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthResponseOrBuilder();
+    com.google.protobuf.ByteString getBody();
   }
   /**
-   * Protobuf type {@code Response}
+   * Protobuf type {@code WriteLacRequest}
    */
-  public static final class Response extends
+  public static final class WriteLacRequest extends
       com.google.protobuf.GeneratedMessage implements
-      // @@protoc_insertion_point(message_implements:Response)
-      ResponseOrBuilder {
-    // Use Response.newBuilder() to construct.
-    private Response(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      // @@protoc_insertion_point(message_implements:WriteLacRequest)
+      WriteLacRequestOrBuilder {
+    // Use WriteLacRequest.newBuilder() to construct.
+    private WriteLacRequest(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
       super(builder);
       this.unknownFields = builder.getUnknownFields();
     }
-    private Response(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+    private WriteLacRequest(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
 
-    private static final Response defaultInstance;
-    public static Response getDefaultInstance() {
+    private static final WriteLacRequest defaultInstance;
+    public static WriteLacRequest getDefaultInstance() {
       return defaultInstance;
     }
 
-    public Response getDefaultInstanceForType() {
+    public WriteLacRequest getDefaultInstanceForType() {
       return defaultInstance;
     }
 
@@ -3925,7 +4298,7 @@ public final class BookkeeperProtocol {
         getUnknownFields() {
       return this.unknownFields;
     }
-    private Response(
+    private WriteLacRequest(
         com.google.protobuf.CodedInputStream input,
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws com.google.protobuf.InvalidProtocolBufferException {
@@ -3948,67 +4321,24 @@ public final class BookkeeperProtocol {
               }
               break;
             }
-            case 10: {
-              org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.Builder subBuilder = null;
-              if (((bitField0_ & 0x00000001) == 0x00000001)) {
-                subBuilder = header_.toBuilder();
-              }
-              header_ = input.readMessage(org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.PARSER, extensionRegistry);
-              if (subBuilder != null) {
-                subBuilder.mergeFrom(header_);
-                header_ = subBuilder.buildPartial();
-              }
+            case 8: {
               bitField0_ |= 0x00000001;
+              ledgerId_ = input.readInt64();
               break;
             }
             case 16: {
-              int rawValue = input.readEnum();
-              org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode value = org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode.valueOf(rawValue);
-              if (value == null) {
-                unknownFields.mergeVarintField(2, rawValue);
-              } else {
-                bitField0_ |= 0x00000002;
-                status_ = value;
-              }
+              bitField0_ |= 0x00000002;
+              lac_ = input.readInt64();
               break;
             }
-            case 802: {
-              org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse.Builder subBuilder = null;
-              if (((bitField0_ & 0x00000004) == 0x00000004)) {
-                subBuilder = readResponse_.toBuilder();
-              }
-              readResponse_ = input.readMessage(org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse.PARSER, extensionRegistry);
-              if (subBuilder != null) {
-                subBuilder.mergeFrom(readResponse_);
-                readResponse_ = subBuilder.buildPartial();
-              }
+            case 26: {
               bitField0_ |= 0x00000004;
+              masterKey_ = input.readBytes();
               break;
             }
-            case 810: {
-              org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse.Builder subBuilder = null;
-              if (((bitField0_ & 0x00000008) == 0x00000008)) {
-                subBuilder = addResponse_.toBuilder();
-              }
-              addResponse_ = input.readMessage(org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse.PARSER, extensionRegistry);
-              if (subBuilder != null) {
-                subBuilder.mergeFrom(addResponse_);
-                addResponse_ = subBuilder.buildPartial();
-              }
+            case 34: {
               bitField0_ |= 0x00000008;
-              break;
-            }
-            case 818: {
-              org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder subBuilder = null;
-              if (((bitField0_ & 0x00000010) == 0x00000010)) {
-                subBuilder = authResponse_.toBuilder();
-              }
-              authResponse_ = input.readMessage(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.PARSER, extensionRegistry);
-              if (subBuilder != null) {
-                subBuilder.mergeFrom(authResponse_);
-                authResponse_ = subBuilder.buildPartial();
-              }
-              bitField0_ |= 0x00000010;
+              body_ = input.readBytes();
               break;
             }
           }
@@ -4025,159 +4355,97 @@ public final class BookkeeperProtocol {
     }
     public static final com.google.protobuf.Descriptors.Descriptor
         getDescriptor() {
-      return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_Response_descriptor;
+      return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_WriteLacRequest_descriptor;
     }
 
     protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
         internalGetFieldAccessorTable() {
-      return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_Response_fieldAccessorTable
+      return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_WriteLacRequest_fieldAccessorTable
           .ensureFieldAccessorsInitialized(
-              org.apache.bookkeeper.proto.BookkeeperProtocol.Response.class, org.apache.bookkeeper.proto.BookkeeperProtocol.Response.Builder.class);
+              org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.class, org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.Builder.class);
     }
 
-    public static com.google.protobuf.Parser<Response> PARSER =
-        new com.google.protobuf.AbstractParser<Response>() {
-      public Response parsePartialFrom(
+    public static com.google.protobuf.Parser<WriteLacRequest> PARSER =
+        new com.google.protobuf.AbstractParser<WriteLacRequest>() {
+      public WriteLacRequest parsePartialFrom(
           com.google.protobuf.CodedInputStream input,
           com.google.protobuf.ExtensionRegistryLite extensionRegistry)
           throws com.google.protobuf.InvalidProtocolBufferException {
-        return new Response(input, extensionRegistry);
+        return new WriteLacRequest(input, extensionRegistry);
       }
     };
 
     @java.lang.Override
-    public com.google.protobuf.Parser<Response> getParserForType() {
+    public com.google.protobuf.Parser<WriteLacRequest> getParserForType() {
       return PARSER;
     }
 
     private int bitField0_;
-    public static final int HEADER_FIELD_NUMBER = 1;
-    private org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader header_;
+    public static final int LEDGERID_FIELD_NUMBER = 1;
+    private long ledgerId_;
     /**
-     * <code>required .BKPacketHeader header = 1;</code>
+     * <code>required int64 ledgerId = 1;</code>
      */
-    public boolean hasHeader() {
+    public boolean hasLedgerId() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
-     * <code>required .BKPacketHeader header = 1;</code>
-     */
-    public org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader getHeader() {
-      return header_;
-    }
-    /**
-     * <code>required .BKPacketHeader header = 1;</code>
+     * <code>required int64 ledgerId = 1;</code>
      */
-    public org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeaderOrBuilder getHeaderOrBuilder() {
-      return header_;
+    public long getLedgerId() {
+      return ledgerId_;
     }
 
-    public static final int STATUS_FIELD_NUMBER = 2;
-    private org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode status_;
+    public static final int LAC_FIELD_NUMBER = 2;
+    private long lac_;
     /**
-     * <code>required .StatusCode status = 2;</code>
-     *
-     * <pre>
-     * EOK if the underlying request succeeded. Each individual response
-     * has a more meaningful status. EBADREQ if we have an unsupported request.
-     * </pre>
+     * <code>required int64 lac = 2;</code>
      */
-    public boolean hasStatus() {
+    public boolean hasLac() {
       return ((bitField0_ & 0x00000002) == 0x00000002);
     }
     /**
-     * <code>required .StatusCode status = 2;</code>
-     *
-     * <pre>
-     * EOK if the underlying request succeeded. Each individual response
-     * has a more meaningful status. EBADREQ if we have an unsupported request.
-     * </pre>
+     * <code>required int64 lac = 2;</code>
      */
-    public org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode getStatus() {
-      return status_;
+    public long getLac() {
+      return lac_;
     }
 
-    public static final int READRESPONSE_FIELD_NUMBER = 100;
-    private org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse readResponse_;
+    public static final int MASTERKEY_FIELD_NUMBER = 3;
+    private com.google.protobuf.ByteString masterKey_;
     /**
-     * <code>optional .ReadResponse readResponse = 100;</code>
-     *
-     * <pre>
-     * Response
-     * </pre>
+     * <code>required bytes masterKey = 3;</code>
      */
-    public boolean hasReadResponse() {
+    public boolean hasMasterKey() {
       return ((bitField0_ & 0x00000004) == 0x00000004);
     }
     /**
-     * <code>optional .ReadResponse readResponse = 100;</code>
-     *
-     * <pre>
-     * Response
-     * </pre>
-     */
-    public org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse getReadResponse() {
-      return readResponse_;
-    }
-    /**
-     * <code>optional .ReadResponse readResponse = 100;</code>
-     *
-     * <pre>
-     * Response
-     * </pre>
+     * <code>required bytes masterKey = 3;</code>
      */
-    public org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponseOrBuilder getReadResponseOrBuilder() {
-      return readResponse_;
+    public com.google.protobuf.ByteString getMasterKey() {
+      return masterKey_;
     }
 
-    public static final int ADDRESPONSE_FIELD_NUMBER = 101;
-    private org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse addResponse_;
+    public static final int BODY_FIELD_NUMBER = 4;
+    private com.google.protobuf.ByteString body_;
     /**
-     * <code>optional .AddResponse addResponse = 101;</code>
+     * <code>required bytes body = 4;</code>
      */
-    public boolean hasAddResponse() {
+    public boolean hasBody() {
       return ((bitField0_ & 0x00000008) == 0x00000008);
     }
     /**
-     * <code>optional .AddResponse addResponse = 101;</code>
-     */
-    public org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse getAddResponse() {
-      return addResponse_;
-    }
-    /**
-     * <code>optional .AddResponse addResponse = 101;</code>
-     */
-    public org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponseOrBuilder getAddResponseOrBuilder() {
-      return addResponse_;
-    }
-
-    public static final int AUTHRESPONSE_FIELD_NUMBER = 102;
-    private org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage authResponse_;
-    /**
-     * <code>optional .AuthMessage authResponse = 102;</code>
-     */
-    public boolean hasAuthResponse() {
-      return ((bitField0_ & 0x00000010) == 0x00000010);
-    }
-    /**
-     * <code>optional .AuthMessage authResponse = 102;</code>
-     */
-    public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthResponse() {
-      return authResponse_;
-    }
-    /**
-     * <code>optional .AuthMessage authResponse = 102;</code>
+     * <code>required bytes body = 4;</code>
      */
-    public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthResponseOrBuilder() {
-      return authResponse_;
+    public com.google.protobuf.ByteString getBody() {
+      return body_;
     }
 
     private void initFields() {
-      header_ = org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.getDefaultInstance();
-      status_ = org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode.EOK;
-      readResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse.getDefaultInstance();
-      addResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse.getDefaultInstance();
-      authResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+      ledgerId_ = 0L;
+      lac_ = 0L;
+      masterKey_ = com.google.protobuf.ByteString.EMPTY;
+      body_ = com.google.protobuf.ByteString.EMPTY;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4185,35 +4453,21 @@ public final class BookkeeperProtocol {
       if (isInitialized == 1) return true;
       if (isInitialized == 0) return false;
 
-      if (!hasHeader()) {
+      if (!hasLedgerId()) {
         memoizedIsInitialized = 0;
         return false;
       }
-      if (!hasStatus()) {
+      if (!hasLac()) {
         memoizedIsInitialized = 0;
         return false;
       }
-      if (!getHeader().isInitialized()) {
+      if (!hasMasterKey()) {
         memoizedIsInitialized = 0;
         return false;
       }
-      if (hasReadResponse()) {
-        if (!getReadResponse().isInitialized()) {
-          memoizedIsInitialized = 0;
-          return false;
-        }
-      }
-      if (hasAddResponse()) {
-        if (!getAddResponse().isInitialized()) {
-          memoizedIsInitialized = 0;
-          return false;
-        }
-      }
-      if (hasAuthResponse()) {
-        if (!getAuthResponse().isInitialized()) {
-          memoizedIsInitialized = 0;
-          return false;
-        }
+      if (!hasBody()) {
+        memoizedIsInitialized = 0;
+        return false;
       }
       memoizedIsInitialized = 1;
       return true;
@@ -4223,19 +4477,16 @@ public final class BookkeeperProtocol {
                         throws java.io.IOException {
       getSerializedSize();
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
-        output.writeMessage(1, header_);
+        output.writeInt64(1, ledgerId_);
       }
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
-        output.writeEnum(2, status_.getNumber());
+        output.writeInt64(2, lac_);
       }
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
-        output.writeMessage(100, readResponse_);
+        output.writeBytes(3, masterKey_);
       }
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
-        output.writeMessage(101, addResponse_);
-      }
-      if (((bitField0_ & 0x00000010) == 0x00000010)) {
-        output.writeMessage(102, authResponse_);
+        output.writeBytes(4, body_);
       }
       getUnknownFields().writeTo(output);
     }
@@ -4248,23 +4499,19 @@ public final class BookkeeperProtocol {
       size = 0;
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
         size += com.google.protobuf.CodedOutputStream
-          .computeMessageSize(1, header_);
+          .computeInt64Size(1, ledgerId_);
       }
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         size += com.google.protobuf.CodedOutputStream
-          .computeEnumSize(2, status_.getNumber());
+          .computeInt64Size(2, lac_);
       }
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         size += com.google.protobuf.CodedOutputStream
-          .computeMessageSize(100, readResponse_);
+          .computeBytesSize(3, masterKey_);
       }
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
         size += com.google.protobuf.CodedOutputStream
-          .computeMessageSize(101, addResponse_);
-      }
-      if (((bitField0_ & 0x00000010) == 0x00000010)) {
-        size += com.google.protobuf.CodedOutputStream
-          .computeMessageSize(102, authResponse_);
+          .computeBytesSize(4, body_);
       }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
@@ -4278,53 +4525,53 @@ public final class BookkeeperProtocol {
       return super.writeReplace();
     }
 
-    public static org.apache.bookkeeper.proto.BookkeeperProtocol.Response parseFrom(
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest parseFrom(
         com.google.protobuf.ByteString data)
         throws com.google.protobuf.InvalidProtocolBufferException {
       return PARSER.parseFrom(data);
     }
-    public static org.apache.bookkeeper.proto.BookkeeperProtocol.Response parseFrom(
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest parseFrom(
         com.google.protobuf.ByteString data,
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws com.google.protobuf.InvalidProtocolBufferException {
       return PARSER.parseFrom(data, extensionRegistry);
     }
-    public static org.apache.bookkeeper.proto.BookkeeperProtocol.Response parseFrom(byte[] data)
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest parseFrom(byte[] data)
         throws com.google.protobuf.InvalidProtocolBufferException {
       return PARSER.parseFrom(data);
     }
-    public static org.apache.bookkeeper.proto.BookkeeperProtocol.Response parseFrom(
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest parseFrom(
         byte[] data,
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws com.google.protobuf.InvalidProtocolBufferException {
       return PARSER.parseFrom(data, extensionRegistry);
     }
-    public static org.apache.bookkeeper.proto.BookkeeperProtocol.Response parseFrom(java.io.InputStream input)
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest parseFrom(java.io.InputStream input)
         throws java.io.IOException {
       return PARSER.parseFrom(input);
     }
-    public static org.apache.bookkeeper.proto.BookkeeperProtocol.Response parseFrom(
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest parseFrom(
         java.io.InputStream input,
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws java.io.IOException {
       return PARSER.parseFrom(input, extensionRegistry);
     }
-    public static org.apache.bookkeeper.proto.BookkeeperProtocol.Response parseDelimitedFrom(java.io.InputStream input)
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest parseDelimitedFrom(java.io.InputStream input)
         throws java.io.IOException {
       return PARSER.parseDelimitedFrom(input);
     }
-    public static org.apache.bookkeeper.proto.BookkeeperProtocol.Response parseDelimitedFrom(
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest parseDelimitedFrom(
         java.io.InputStream input,
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws java.io.IOException {
       return PARSER.parseDelimitedFrom(input, extensionRegistry);
     }
-    public static org.apache.bookkeeper.proto.BookkeeperProtocol.Response parseFrom(
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest parseFrom(
         com.google.protobuf.CodedInputStream input)
         throws java.io.IOException {
       return PARSER.parseFrom(input);
     }
-    public static org.apache.bookkeeper.proto.BookkeeperProtocol.Response parseFrom(
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest parseFrom(
         com.google.protobuf.CodedInputStream input,
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws java.io.IOException {
@@ -4333,7 +4580,7 @@ public final class BookkeeperProtocol {
 
     public static Builder newBuilder() { return Builder.create(); }
     public Builder newBuilderForType() { return newBuilder(); }
-    public static Builder newBuilder(org.apache.bookkeeper.proto.BookkeeperProtocol.Response prototype) {
+    public static Builder newBuilder(org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest prototype) {
       return newBuilder().mergeFrom(prototype);
     }
     public Builder toBuilder() { return newBuilder(this); }
@@ -4345,25 +4592,25 @@ public final class BookkeeperProtocol {
       return builder;
     }
     /**
-     * Protobuf type {@code Response}
+     * Protobuf type {@code WriteLacRequest}
      */
     public static final class Builder extends
         com.google.protobuf.GeneratedMessage.Builder<Builder> implements
-        // @@protoc_insertion_point(builder_implements:Response)
-        org.apache.bookkeeper.proto.BookkeeperProtocol.ResponseOrBuilder {
+        // @@protoc_insertion_point(builder_implements:WriteLacRequest)
+        org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequestOrBuilder {
       public static final com.google.protobuf.Descriptors.Descriptor
           getDescriptor() {
-        return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_Response_descriptor;
+        return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_WriteLacRequest_descriptor;
       }
 
       protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
           internalGetFieldAccessorTable() {
-        return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_Response_fieldAccessorTable
+        return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_WriteLacRequest_fieldAccessorTable
             .ensureFieldAccessorsInitialized(
-                org.apache.bookkeeper.proto.BookkeeperProtocol.Response.class, org.apache.bookkeeper.proto.BookkeeperProtocol.Response.Builder.class);
+                org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.class, org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.Builder.class);
       }
 
-      // Construct using org.apache.bookkeeper.proto.BookkeeperProtocol.Response.newBuilder()
+      // Construct using org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.newBuilder()
       private Builder() {
         maybeForceBuilderInitialization();
       }
@@ -4375,10 +4622,6 @@ public final class BookkeeperProtocol {
       }
       private void maybeForceBuilderInitialization() {
         if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
-          getHeaderFieldBuilder();
-          getReadResponseFieldBuilder();
-          getAddResponseFieldBuilder();
-          getAuthResponseFieldBuilder();
         }
       }
       private static Builder create() {
@@ -4387,32 +4630,14 @@ public final class BookkeeperProtocol {
 
       public Builder clear() {
         super.clear();
-        if (headerBuilder_ == null) {
-          header_ = org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.getDefaultInstance();
-        } else {
-          headerBuilder_.clear();
-        }
+        ledgerId_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000001);
-        status_ = org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode.EOK;
+        lac_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000002);
-        if (readResponseBuilder_ == null) {
-          readResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse.getDefaultInstance();
-        } else {
-          readResponseBuilder_.clear();
-        }
+        masterKey_ = com.google.protobuf.ByteString.EMPTY;
         bitField0_ = (bitField0_ & ~0x00000004);
-        if (addResponseBuilder_ == null) {
-          addResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse.getDefaultInstance();
-        } else {
-          addResponseBuilder_.clear();
-        }
+        body_ = com.google.protobuf.ByteString.EMPTY;
         bitField0_ = (bitField0_ & ~0x00000008);
-        if (authResponseBuilder_ == null) {
-          authResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
-        } else {
-          authResponseBuilder_.clear();
-        }
-        bitField0_ = (bitField0_ & ~0x00000010);
         return this;
       }
 
@@ -4422,126 +4647,89 @@ public final class BookkeeperProtocol {
 
       public com.google.protobuf.Descriptors.Descriptor
           getDescriptorForType() {
-        return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_Response_descriptor;
+        return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_WriteLacRequest_descriptor;
       }
 
-      public org.apache.bookkeeper.proto.BookkeeperProtocol.Response getDefaultInstanceForType() {
-        return org.apache.bookkeeper.proto.BookkeeperProtocol.Response.getDefaultInstance();
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest getDefaultInstanceForType() {
+        return org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.getDefaultInstance();
       }
 
-      public org.apache.bookkeeper.proto.BookkeeperProtocol.Response build() {
-        org.apache.bookkeeper.proto.BookkeeperProtocol.Response result = buildPartial();
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest build() {
+        org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest result = buildPartial();
         if (!result.isInitialized()) {
           throw newUninitializedMessageException(result);
         }
         return result;
       }
 
-      public org.apache.bookkeeper.proto.BookkeeperProtocol.Response buildPartial() {
-        org.apache.bookkeeper.proto.BookkeeperProtocol.Response result = new org.apache.bookkeeper.proto.BookkeeperProtocol.Response(this);
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest buildPartial() {
+        org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest result = new org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest(this);
         int from_bitField0_ = bitField0_;
         int to_bitField0_ = 0;
         if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
           to_bitField0_ |= 0x00000001;
         }
-        if (headerBuilder_ == null) {
-          result.header_ = header_;
-        } else {
-          result.header_ = headerBuilder_.build();
-        }
+        result.ledgerId_ = ledgerId_;
         if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
           to_bitField0_ |= 0x00000002;
         }
-        result.status_ = status_;
+        result.lac_ = lac_;
         if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
           to_bitField0_ |= 0x00000004;
         }
-        if (readResponseBuilder_ == null) {
-          result.readResponse_ = readResponse_;
-        } else {
-          result.readResponse_ = readResponseBuilder_.build();
-        }
+        result.masterKey_ = masterKey_;
         if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
           to_bitField0_ |= 0x00000008;
         }
-        if (addResponseBuilder_ == null) {
-          result.addResponse_ = addResponse_;
-        } else {
-          result.addResponse_ = addResponseBuilder_.build();
-        }
-        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
-          to_bitField0_ |= 0x00000010;
-        }
-        if (authResponseBuilder_ == null) {
-          result.authResponse_ = authResponse_;
-        } else {
-          result.authResponse_ = authResponseBuilder_.build();
-        }
+        result.body_ = body_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
       }
 
       public Builder mergeFrom(com.google.protobuf.Message other) {
-        if (other instanceof org.apache.bookkeeper.proto.BookkeeperProtocol.Response) {
-          return mergeFrom((org.apache.bookkeeper.proto.BookkeeperProtocol.Response)other);
+        if (other instanceof org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest) {
+          return mergeFrom((org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest)other);
         } else {
           super.mergeFrom(other);
           return this;
         }
       }
 
-      public Builder mergeFrom(org.apache.bookkeeper.proto.BookkeeperProtocol.Response other) {
-        if (other == org.apache.bookkeeper.proto.BookkeeperProtocol.Response.getDefaultInstance()) return this;
-        if (other.hasHeader()) {
-          mergeHeader(other.getHeader());
-        }
-        if (other.hasStatus()) {
-          setStatus(other.getStatus());
+      public Builder mergeFrom(org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest other) {
+        if (other == org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest.getDefaultInstance()) return this;
+        if (other.hasLedgerId()) {
+          setLedgerId(other.getLedgerId());
         }
-        if (other.hasReadResponse()) {
-          mergeReadResponse(other.getReadResponse());
+        if (other.hasLac()) {
+          setLac(other.getLac());
         }
-        if (other.hasAddResponse()) {
-          mergeAddResponse(other.getAddResponse());
+        if (other.hasMasterKey()) {
+          setMasterKey(other.getMasterKey());
         }
-        if (other.hasAuthResponse()) {
-          mergeAuthResponse(other.getAuthResponse());
+        if (other.hasBody()) {
+          setBody(other.getBody());
         }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
 
       public final boolean isInitialized() {
-        if (!hasHeader()) {
+        if (!hasLedgerId()) {
           
           return false;
         }
-        if (!hasStatus()) {
+        if (!hasLac()) {
           
           return false;
         }
-        if (!getHeader().isInitialized()) {
+        if (!hasMasterKey()) {
           
           return false;
         }
-        if (hasReadResponse()) {
-          if (!getReadResponse().isInitialized()) {
-            
-            return false;
-          }
-        }
-        if (hasAddResponse()) {
-          if (!getAddResponse().isInitialized()) {
-            
-            return false;
-          }
-        }
-        if (hasAuthResponse()) {
-          if (!getAuthResponse().isInitialized()) {
-            
-            return false;
-          }
+        if (!hasBody()) {
+          
+          return false;
         }
         return true;
       }
@@ -4550,11 +4738,11 @@ public final class BookkeeperProtocol {
           com.google.protobuf.CodedInputStream input,
           com.google.protobuf.ExtensionRegistryLite extensionRegistry)
           throws java.io.IOException {
-        org.apache.bookkeeper.proto.BookkeeperProtocol.Response parsedMessage = null;
+        org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest parsedMessage = null;
         try {
           parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
         } catch (com.google.protobuf.InvalidProtocolBufferException e) {
-          parsedMessage = (org.apache.bookkeeper.proto.BookkeeperProtocol.Response) e.getUnfinishedMessage();
+          parsedMessage = (org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest) e.getUnfinishedMessage();
           throw e;
         } finally {
           if (parsedMessage != null) {
@@ -4565,632 +4753,3580 @@ public final class BookkeeperProtocol {
       }
       private int bitField0_;
 
-      private org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader header_ = org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.getDefaultInstance();
-      private com.google.protobuf.SingleFieldBuilder<
-          org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader, org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeaderOrBuilder> headerBuilder_;
+      private long ledgerId_ ;
       /**
-       * <code>required .BKPacketHeader header = 1;</code>
+       * <code>required int64 ledgerId = 1;</code>
        */
-      public boolean hasHeader() {
+      public boolean hasLedgerId() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
-       * <code>required .BKPacketHeader header = 1;</code>
+       * <code>required int64 ledgerId = 1;</code>
        */
-      public org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader getHeader() {
-        if (headerBuilder_ == null) {
-          return header_;
-        } else {
-          return headerBuilder_.getMessage();
-        }
+      public long getLedgerId() {
+        return ledgerId_;
       }
       /**
-       * <code>required .BKPacketHeader header = 1;</code>
+       * <code>required int64 ledgerId = 1;</code>
        */
-      public Builder setHeader(org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader value) {
-        if (headerBuilder_ == null) {
-          if (value == null) {
-            throw new NullPointerException();
-          }
-          header_ = value;
-          onChanged();
-        } else {
-          headerBuilder_.setMessage(value);
-        }
+      public Builder setLedgerId(long value) {
         bitField0_ |= 0x00000001;
+        ledgerId_ = value;
+        onChanged();
         return this;
       }
       /**
-       * <code>required .BKPacketHeader header = 1;</code>
+       * <code>required int64 ledgerId = 1;</code>
        */
-      public Builder setHeader(
-          org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.Builder builderForValue) {
-        if (headerBuilder_ == null) {
-          header_ = builderForValue.build();
-          onChanged();
-        } else {
-          headerBuilder_.setMessage(builderForValue.build());
-        }
-        bitField0_ |= 0x00000001;
+      public Builder clearLedgerId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        ledgerId_ = 0L;
+        onChanged();
         return this;
       }
+
+      private long lac_ ;
       /**
-       * <code>required .BKPacketHeader header = 1;</code>
+       * <code>required int64 lac = 2;</code>
        */
-      public Builder mergeHeader(org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader value) {
-        if (headerBuilder_ == null) {
-          if (((bitField0_ & 0x00000001) == 0x00000001) &&
-              header_ != org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.getDefaultInstance()) {
-            header_ =
-              org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.newBuilder(header_).mergeFrom(value).buildPartial();
-          } else {
-            header_ = value;
-          }
-          onChanged();
-        } else {
-          headerBuilder_.mergeFrom(value);
-        }
-        bitField0_ |= 0x00000001;
-        return this;
+      public boolean hasLac() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
       }
       /**
-       * <code>required .BKPacketHeader header = 1;</code>
+       * <code>required int64 lac = 2;</code>
        */
-      public Builder clearHeader() {
-        if (headerBuilder_ == null) {
-          header_ = org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.getDefaultInstance();
-          onChanged();
-        } else {
-          headerBuilder_.clear();
-        }
-        bitField0_ = (bitField0_ & ~0x00000001);
+      public long getLac() {
+        return lac_;
+      }
+      /**
+       * <code>required int64 lac = 2;</code>
+       */
+      public Builder setLac(long value) {
+        bitField0_ |= 0x00000002;
+        lac_ = value;
+        onChanged();
         return this;
       }
       /**
-       * <code>required .BKPacketHeader header = 1;</code>
+       * <code>required int64 lac = 2;</code>
        */
-      public org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.Builder getHeaderBuilder() {
-        bitField0_ |= 0x00000001;
+      public Builder clearLac() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        lac_ = 0L;
         onChanged();
-        return getHeaderFieldBuilder().getBuilder();
+        return this;
       }
+
+      private com.google.protobuf.ByteString masterKey_ = com.google.protobuf.ByteString.EMPTY;
       /**
-       * <code>required .BKPacketHeader header = 1;</code>
+       * <code>required bytes masterKey = 3;</code>
        */
-      public org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeaderOrBuilder getHeaderOrBuilder() {
-        if (headerBuilder_ != null) {
-          return headerBuilder_.getMessageOrBuilder();
-        } else {
-          return header_;
-        }
+      public boolean hasMasterKey() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
       }
       /**
-       * <code>required .BKPacketHeader header = 1;</code>
+       * <code>required bytes masterKey = 3;</code>
        */
-      private com.google.protobuf.SingleFieldBuilder<
-          org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader, org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeaderOrBuilder> 
-          getHeaderFieldBuilder() {
-        if (headerBuilder_ == null) {
-          headerBuilder_ = new com.google.protobuf.SingleFieldBuilder<
-              org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader, org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeaderOrBuilder>(
-                  getHeader(),
-                  getParentForChildren(),
-                  isClean());
-          header_ = null;
-        }
-        return headerBuilder_;
-      }
-
-      private org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode status_ = org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode.EOK;
-      /**
-       * <code>required .StatusCode status = 2;</code>
-       *
-       * <pre>
-       * EOK if the underlying request succeeded. Each individual response
-       * has a more meaningful status. EBADREQ if we have an unsupported request.
-       * </pre>
-       */
-      public boolean hasStatus() {
-        return ((bitField0_ & 0x00000002) == 0x00000002);
-      }
-      /**
-       * <code>required .StatusCode status = 2;</code>
-       *
-       * <pre>
-       * EOK if the underlying request succeeded. Each individual response
-       * has a more meaningful status. EBADREQ if we have an unsupported request.
-       * </pre>
-       */
-      public org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode getStatus() {
-        return status_;
+      public com.google.protobuf.ByteString getMasterKey() {
+        return masterKey_;
       }
       /**
-       * <code>required .StatusCode status = 2;</code>
-       *
-       * <pre>
-       * EOK if the underlying request succeeded. Each individual response
-       * has a more meaningful status. EBADREQ if we have an unsupported request.
-       * </pre>
+       * <code>required bytes masterKey = 3;</code>
        */
-      public Builder setStatus(org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode value) {
+      public Builder setMasterKey(com.google.protobuf.ByteString value) {
         if (value == null) {
-          throw new NullPointerException();
-        }
-        bitField0_ |= 0x00000002;
-        status_ = value;
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        masterKey_ = value;
         onChanged();
         return this;
       }
       /**
-       * <code>required .StatusCode status = 2;</code>
-       *
-       * <pre>
-       * EOK if the underlying request succeeded. Each individual response
-       * has a more meaningful status. EBADREQ if we have an unsupported request.
-       * </pre>
+       * <code>required bytes masterKey = 3;</code>
        */
-      public Builder clearStatus() {
-        bitField0_ = (bitField0_ & ~0x00000002);
-        status_ = org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode.EOK;
+      public Builder clearMasterKey() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        masterKey_ = getDefaultInstance().getMasterKey();
         onChanged();
         return this;
       }
 
-      private org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse readResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse.getDefaultInstance();
-      private com.google.protobuf.SingleFieldBuilder<
-          org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse, org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponseOrBuilder> readResponseBuilder_;
+      private com.google.protobuf.ByteString body_ = com.google.protobuf.ByteString.EMPTY;
       /**
-       * <code>optional .ReadResponse readResponse = 100;</code>
-       *
-       * <pre>
-       * Response
-       * </pre>
+       * <code>required bytes body = 4;</code>
        */
-      public boolean hasReadResponse() {
-        return ((bitField0_ & 0x00000004) == 0x00000004);
+      public boolean hasBody() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
       }
       /**
-       * <code>optional .ReadResponse readResponse = 100;</code>
-       *
-       * <pre>
-       * Response
-       * </pre>
+       * <code>required bytes body = 4;</code>
        */
-      public org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse getReadResponse() {
-        if (readResponseBuilder_ == null) {
-          return readResponse_;
-        } else {
-          return readResponseBuilder_.getMessage();
-        }
+      public com.google.protobuf.ByteString getBody() {
+        return body_;
       }
       /**
-       * <code>optional .ReadResponse readResponse = 100;</code>
-       *
-       * <pre>
-       * Response
-       * </pre>
+       * <code>required bytes body = 4;</code>
        */
-      public Builder setReadResponse(org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse value) {
-        if (readResponseBuilder_ == null) {
-          if (value == null) {
-            throw new NullPointerException();
-          }
-          readResponse_ = value;
-          onChanged();
-        } else {
-          readResponseBuilder_.setMessage(value);
-        }
-        bitField0_ |= 0x00000004;
+      public Builder setBody(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000008;
+        body_ = value;
+        onChanged();
         return this;
       }
       /**
-       * <code>optional .ReadResponse readResponse = 100;</code>
-       *
-       * <pre>
-       * Response
-       * </pre>
+       * <code>required bytes body = 4;</code>
        */
-      public Builder setReadResponse(
-          org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse.Builder builderForValue) {
-        if (readResponseBuilder_ == null) {
-          readResponse_ = builderForValue.build();
-          onChanged();
-        } else {
-          readResponseBuilder_.setMessage(builderForValue.build());
-        }
-        bitField0_ |= 0x00000004;
+      public Builder clearBody() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        body_ = getDefaultInstance().getBody();
+        onChanged();
         return this;
       }
-      /**
-       * <code>optional .ReadResponse readResponse = 100;</code>
-       *
-       * <pre>
-       * Response
-       * </pre>
-       */
-      public Builder mergeReadResponse(org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse value) {
-        if (readResponseBuilder_ == null) {
-          if (((bitField0_ & 0x00000004) == 0x00000004) &&
-              readResponse_ != org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse.getDefaultInstance()) {
-            readResponse_ =
-              org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse.newBuilder(readResponse_).mergeFrom(value).buildPartial();
-          } else {
-            readResponse_ = value;
+
+      // @@protoc_insertion_point(builder_scope:WriteLacRequest)
+    }
+
+    static {
+      defaultInstance = new WriteLacRequest(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:WriteLacRequest)
+  }
+
+  public interface ReadLacRequestOrBuilder extends
+      // @@protoc_insertion_point(interface_extends:ReadLacRequest)
+      com.google.protobuf.MessageOrBuilder {
+
+    /**
+     * <code>required int64 ledgerId = 1;</code>
+     */
+    boolean hasLedgerId();
+    /**
+     * <code>required int64 ledgerId = 1;</code>
+     */
+    long getLedgerId();
+  }
+  /**
+   * Protobuf type {@code ReadLacRequest}
+   */
+  public static final class ReadLacRequest extends
+      com.google.protobuf.GeneratedMessage implements
+      // @@protoc_insertion_point(message_implements:ReadLacRequest)
+      ReadLacRequestOrBuilder {
+    // Use ReadLacRequest.newBuilder() to construct.
+    private ReadLacRequest(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private ReadLacRequest(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final ReadLacRequest defaultInstance;
+    public static ReadLacRequest getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public ReadLacRequest getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private ReadLacRequest(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              ledgerId_ = input.readInt64();
+              break;
+            }
           }
-          onChanged();
-        } else {
-          readResponseBuilder_.mergeFrom(value);
-        }
-        bitField0_ |= 0x00000004;
-        return this;
-      }
-      /**
-       * <code>optional .ReadResponse readResponse = 100;</code>
-       *
-       * <pre>
-       * Response
-       * </pre>
-       */
-      public Builder clearReadResponse() {
-        if (readResponseBuilder_ == null) {
-          readResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse.getDefaultInstance();
-          onChanged();
-        } else {
-          readResponseBuilder_.clear();
         }
-        bitField0_ = (bitField0_ & ~0x00000004);
-        return this;
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
       }
-      /**
-       * <code>optional .ReadResponse readResponse = 100;</code>
-       *
-       * <pre>
-       * Response
-       * </pre>
-       */
-      public org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse.Builder getReadResponseBuilder() {
-        bitField0_ |= 0x00000004;
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_ReadLacRequest_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_ReadLacRequest_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.class, org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<ReadLacRequest> PARSER =
+        new com.google.protobuf.AbstractParser<ReadLacRequest>() {
+      public ReadLacRequest parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new ReadLacRequest(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<ReadLacRequest> getParserForType() {
+      return PARSER;
+    }
+
+    private int bitField0_;
+    public static final int LEDGERID_FIELD_NUMBER = 1;
+    private long ledgerId_;
+    /**
+     * <code>required int64 ledgerId = 1;</code>
+     */
+    public boolean hasLedgerId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>required int64 ledgerId = 1;</code>
+     */
+    public long getLedgerId() {
+      return ledgerId_;
+    }
+
+    private void initFields() {
+      ledgerId_ = 0L;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized == 1) return true;
+      if (isInitialized == 0) return false;
+
+      if (!hasLedgerId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt64(1, ledgerId_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(1, ledgerId_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code ReadLacRequest}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder> implements
+        // @@protoc_insertion_point(builder_implements:ReadLacRequest)
+        org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequestOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_ReadLacRequest_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_ReadLacRequest_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.class, org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.Builder.class);
+      }
+
+      // Construct using org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        ledgerId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_ReadLacRequest_descriptor;
+      }
+
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest getDefaultInstanceForType() {
+        return org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.getDefaultInstance();
+      }
+
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest build() {
+        org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest buildPartial() {
+        org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest result = new org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.ledgerId_ = ledgerId_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest) {
+          return mergeFrom((org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest other) {
+        if (other == org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest.getDefaultInstance()) return this;
+        if (other.hasLedgerId()) {
+          setLedgerId(other.getLedgerId());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        if (!hasLedgerId()) {
+          
+          return false;
+        }
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      private long ledgerId_ ;
+      /**
+       * <code>required int64 ledgerId = 1;</code>
+       */
+      public boolean hasLedgerId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>required int64 ledgerId = 1;</code>
+       */
+      public long getLedgerId() {
+        return ledgerId_;
+      }
+      /**
+       * <code>required int64 ledgerId = 1;</code>
+       */
+      public Builder setLedgerId(long value) {
+        bitField0_ |= 0x00000001;
+        ledgerId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required int64 ledgerId = 1;</code>
+       */
+      public Builder clearLedgerId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        ledgerId_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:ReadLacRequest)
+    }
+
+    static {
+      defaultInstance = new ReadLacRequest(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:ReadLacRequest)
+  }
+
+  public interface ResponseOrBuilder extends
+      // @@protoc_insertion_point(interface_extends:Response)
+      com.google.protobuf.MessageOrBuilder {
+
+    /**
+     * <code>required .BKPacketHeader header = 1;</code>
+     */
+    boolean hasHeader();
+    /**
+     * <code>required .BKPacketHeader header = 1;</code>
+     */
+    org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader getHeader();
+    /**
+     * <code>required .BKPacketHeader header = 1;</code>
+     */
+    org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeaderOrBuilder getHeaderOrBuilder();
+
+    /**
+     * <code>required .StatusCode status = 2;</code>
+     *
+     * <pre>
+     * EOK if the underlying request succeeded. Each individual response
+     * has a more meaningful status. EBADREQ if we have an unsupported request.
+     * </pre>
+     */
+    boolean hasStatus();
+    /**
+     * <code>required .StatusCode status = 2;</code>
+     *
+     * <pre>
+     * EOK if the underlying request succeeded. Each individual response
+     * has a more meaningful status. EBADREQ if we have an unsupported request.
+     * </pre>
+     */
+    org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode getStatus();
+
+    /**
+     * <code>optional .ReadResponse readResponse = 100;</code>
+     *
+     * <pre>
+     * Response
+     * </pre>
+     */
+    boolean hasReadResponse();
+    /**
+     * <code>optional .ReadResponse readResponse = 100;</code>
+     *
+     * <pre>
+     * Response
+     * </pre>
+     */
+    org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse getReadResponse();
+    /**
+     * <code>optional .ReadResponse readResponse = 100;</code>
+     *
+     * <pre>
+     * Response
+     * </pre>
+     */
+    org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponseOrBuilder getReadResponseOrBuilder();
+
+    /**
+     * <code>optional .AddResponse addResponse = 101;</code>
+     */
+    boolean hasAddResponse();
+    /**
+     * <code>optional .AddResponse addResponse = 101;</code>
+     */
+    org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse getAddResponse();
+    /**
+     * <code>optional .AddResponse addResponse = 101;</code>
+     */
+    org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponseOrBuilder getAddResponseOrBuilder();
+
+    /**
+     * <code>optional .AuthMessage authResponse = 102;</code>
+     */
+    boolean hasAuthResponse();
+    /**
+     * <code>optional .AuthMessage authResponse = 102;</code>
+     */
+    org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthResponse();
+    /**
+     * <code>optional .AuthMessage authResponse = 102;</code>
+     */
+    org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthResponseOrBuilder();
+
+    /**
+     * <code>option

<TRUNCATED>