You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:23 UTC

[13/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java
deleted file mode 100644
index 00a52bf..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.netty;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelHandler.Sharable;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.ssl.SslHandler;
-
-import org.apache.hedwig.exceptions.PubSubException.MalformedRequestException;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
-import org.apache.hedwig.server.handlers.Handler;
-
-@Sharable
-public class UmbrellaHandler extends SimpleChannelHandler {
-    private static final Logger logger = LoggerFactory.getLogger(UmbrellaHandler.class);
-
-    private final Map<OperationType, Handler> handlers;
-    private final ChannelGroup allChannels;
-    private final ChannelDisconnectListener channelDisconnectListener;
-    private final boolean isSSLEnabled; 
-
-    public UmbrellaHandler(ChannelGroup allChannels, Map<OperationType, Handler> handlers,
-                           ChannelDisconnectListener channelDisconnectListener,
-                           boolean isSSLEnabled) {
-        this.allChannels = allChannels;
-        this.isSSLEnabled = isSSLEnabled;
-        this.handlers = handlers;
-        this.channelDisconnectListener = channelDisconnectListener;
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
-        Throwable throwable = e.getCause();
-
-        // Add here if there are more exceptions we need to be able to tolerate.
-        // 1. IOException may be thrown when a channel is forcefully closed by
-        // the other end, or by the ProtobufDecoder when an invalid protobuf is
-        // received
-        // 2. TooLongFrameException is thrown by the LengthBasedDecoder if it
-        // receives a packet that is too big
-        // 3. CorruptedFramException is thrown by the LengthBasedDecoder when
-        // the length is negative etc.
-        if (throwable instanceof IOException || throwable instanceof TooLongFrameException
-                || throwable instanceof CorruptedFrameException) {
-            e.getChannel().close();
-            logger.debug("Uncaught exception", throwable);
-        } else {
-            // call our uncaught exception handler, which might decide to
-            // shutdown the system
-            Thread thread = Thread.currentThread();
-            thread.getUncaughtExceptionHandler().uncaughtException(thread, throwable);
-        }
-
-    }
-
-    @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        // If SSL is NOT enabled, then we can add this channel to the
-        // ChannelGroup. Otherwise, that is done when the channel is connected
-        // and the SSL handshake has completed successfully.
-        if (!isSSLEnabled) {
-            allChannels.add(ctx.getChannel());
-        }
-    }
-
-    @Override
-    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        if (isSSLEnabled) {
-            ctx.getPipeline().get(SslHandler.class).handshake().addListener(new ChannelFutureListener() {
-                public void operationComplete(ChannelFuture future) throws Exception {
-                    if (future.isSuccess()) {
-                        logger.debug("SSL handshake has completed successfully!");
-                        allChannels.add(future.getChannel());
-                    } else {
-                        future.getChannel().close();
-                    }
-                }
-            });
-        }
-    }
-
-    @Override
-    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        Channel channel = ctx.getChannel();
-        // subscribe handler needs to know about channel disconnects
-        channelDisconnectListener.channelDisconnected(channel);
-        channel.close();
-    }
-
-    public static void sendErrorResponseToMalformedRequest(Channel channel, long txnId, String msg) {
-        logger.debug("Malformed request from {}, msg = {}", channel.getRemoteAddress(), msg);
-        MalformedRequestException mre = new MalformedRequestException(msg);
-        PubSubResponse response = PubSubResponseUtils.getResponseForException(mre, txnId);
-        channel.write(response);
-    }
-
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-
-        if (!(e.getMessage() instanceof PubSubProtocol.PubSubRequest)) {
-            ctx.sendUpstream(e);
-            return;
-        }
-
-        PubSubProtocol.PubSubRequest request = (PubSubProtocol.PubSubRequest) e.getMessage();
-
-        Handler handler = handlers.get(request.getType());
-        Channel channel = ctx.getChannel();
-        long txnId = request.getTxnId();
-
-        if (handler == null) {
-            sendErrorResponseToMalformedRequest(channel, txnId, "Request type " + request.getType().getNumber()
-                                                + " unknown");
-            return;
-        }
-
-        handler.handleRequest(request, channel);
-        ServerStats.getInstance().incrementRequestsReceived();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
deleted file mode 100644
index b0b5a80..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
+++ /dev/null
@@ -1,1263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.io.IOException;
-import java.util.Enumeration;
-import java.util.Iterator;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
-import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protoextensions.MessageIdUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.common.TopicOpQueuer;
-import org.apache.hedwig.server.common.UnexpectedError;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.server.meta.TopicPersistenceManager;
-import org.apache.hedwig.server.persistence.ScanCallback.ReasonForFinish;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.zookeeper.SafeAsynBKCallback;
-import static org.apache.hedwig.util.VarArgs.va;
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * This persistence manager uses zookeeper and bookkeeper to store messages.
- *
- * Information about topics are stored in zookeeper with a znode named after the
- * topic that contains an ASCII encoded list with records of the following form:
- *
- * <pre>
- * startSeqId(included)\tledgerId\n
- * </pre>
- *
- */
-
-public class BookkeeperPersistenceManager implements PersistenceManagerWithRangeScan, TopicOwnershipChangeListener {
-    private static final Logger logger = LoggerFactory.getLogger(BookkeeperPersistenceManager.class);
-    static byte[] passwd = "sillysecret".getBytes(UTF_8);
-    private BookKeeper bk;
-    private TopicPersistenceManager tpManager;
-    private ServerConfiguration cfg;
-    private TopicManager tm;
-
-    private static final long START_SEQ_ID = 1L;
-    // max number of entries allowed in a ledger
-    private static final long UNLIMITED_ENTRIES = 0L;
-    private final long maxEntriesPerLedger;
-
-    static class InMemoryLedgerRange {
-        LedgerRange range;
-        LedgerHandle handle;
-
-        public InMemoryLedgerRange(LedgerRange range, LedgerHandle handle) {
-            this.range = range;
-            this.handle = handle;
-        }
-
-        public InMemoryLedgerRange(LedgerRange range) {
-            this(range, null);
-        }
-
-        public long getStartSeqIdIncluded() {
-            assert range.hasStartSeqIdIncluded();
-            return range.getStartSeqIdIncluded();
-        }
-    }
-
-    static class TopicInfo {
-        /**
-         * stores the last message-seq-id vector that has been pushed to BK for
-         * persistence (but not necessarily acked yet by BK)
-         *
-         */
-        MessageSeqId lastSeqIdPushed;
-
-        /**
-         * stores the last message-id that has been acked by BK. This number is
-         * basically used for limiting scans to not read past what has been
-         * persisted by BK
-         */
-        long lastEntryIdAckedInCurrentLedger = -1; // because BK ledgers starts
-        // at 0
-
-        /**
-         * stores a sorted structure of the ledgers for a topic, mapping from
-         * the endSeqIdIncluded to the ledger info. This structure does not
-         * include the current ledger
-         */
-        TreeMap<Long, InMemoryLedgerRange> ledgerRanges = new TreeMap<Long, InMemoryLedgerRange>();
-        Version ledgerRangesVersion = Version.NEW;
-
-        /**
-         * This is the handle of the current ledger that is being used to write
-         * messages
-         */
-        InMemoryLedgerRange currentLedgerRange;
-
-        /**
-         * Flag to release topic when encountering unrecoverable exceptions
-         */
-        AtomicBoolean doRelease = new AtomicBoolean(false);
-
-        /**
-         * Flag indicats the topic is changing ledger
-         */
-        AtomicBoolean doChangeLedger = new AtomicBoolean(false);
-        /**
-         * Last seq id to change ledger.
-         */
-        long lastSeqIdBeforeLedgerChange = -1;
-        /**
-         * List to buffer all persist requests during changing ledger.
-         */
-        LinkedList<PersistRequest> deferredRequests = null;
-
-        final static int UNLIMITED = 0;
-        int messageBound = UNLIMITED;
-    }
-
-    Map<ByteString, TopicInfo> topicInfos = new ConcurrentHashMap<ByteString, TopicInfo>();
-
-    TopicOpQueuer queuer;
-
-    /**
-     * Instantiates a BookKeeperPersistence manager.
-     *
-     * @param bk
-     *            a reference to bookkeeper to use.
-     * @param metaManagerFactory
-     *            a metadata manager factory handle to use.
-     * @param tm
-     *            a reference to topic manager.
-     * @param cfg
-     *            Server configuration object
-     * @param executor
-     *            A executor
-     */
-    public BookkeeperPersistenceManager(BookKeeper bk, MetadataManagerFactory metaManagerFactory,
-                                        TopicManager tm, ServerConfiguration cfg,
-                                        ScheduledExecutorService executor) {
-        this.bk = bk;
-        this.tpManager = metaManagerFactory.newTopicPersistenceManager();
-        this.cfg = cfg;
-        this.tm = tm;
-        this.maxEntriesPerLedger = cfg.getMaxEntriesPerLedger();
-        queuer = new TopicOpQueuer(executor);
-        tm.addTopicOwnershipChangeListener(this);
-    }
-
-    private static LedgerRange buildLedgerRange(long ledgerId, long startOfLedger,
-                                                MessageSeqId endOfLedger) {
-        LedgerRange.Builder builder =
-            LedgerRange.newBuilder().setLedgerId(ledgerId).setStartSeqIdIncluded(startOfLedger)
-                       .setEndSeqIdIncluded(endOfLedger);
-        return builder.build();
-    }
-
-    class RangeScanOp extends TopicOpQueuer.SynchronousOp {
-        RangeScanRequest request;
-        int numMessagesRead = 0;
-        long totalSizeRead = 0;
-        TopicInfo topicInfo;
-        long startSeqIdToScan;
-
-        public RangeScanOp(RangeScanRequest request) {
-            this(request, -1L, 0, 0L);
-        }
-
-        public RangeScanOp(RangeScanRequest request, long startSeqId, int numMessagesRead, long totalSizeRead) {
-            queuer.super(request.topic);
-            this.request = request;
-            this.startSeqIdToScan = startSeqId;
-            this.numMessagesRead = numMessagesRead;
-            this.totalSizeRead = totalSizeRead;
-        }
-
-        @Override
-        protected void runInternal() {
-            topicInfo = topicInfos.get(topic);
-
-            if (topicInfo == null) {
-                request.callback.scanFailed(request.ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
-                return;
-            }
-
-            // if startSeqIdToScan is less than zero, which means it is an unfinished scan request
-            // we continue the scan from the provided position
-            startReadingFrom(startSeqIdToScan < 0 ? request.startSeqId : startSeqIdToScan);
-        }
-
-        protected void read(final InMemoryLedgerRange imlr, final long startSeqId, final long endSeqId) {
-            // Verify whether startSeqId falls in ledger range.
-            // Only the left endpoint of range needs to be checked.
-            if (imlr.getStartSeqIdIncluded() > startSeqId) {
-                logger.error(
-                        "Invalid RangeScan read, startSeqId {} doesn't fall in ledger range [{} ~ {}]",
-                        va(startSeqId, imlr.getStartSeqIdIncluded(), imlr.range.hasEndSeqIdIncluded() ? imlr.range
-                                .getEndSeqIdIncluded().getLocalComponent() : ""));
-                request.callback.scanFailed(request.ctx, new PubSubException.UnexpectedConditionException("Scan request is out of range"));
-
-                // try release topic to reset the state
-                lostTopic(topic);
-                return;
-            }
-
-            if (imlr.handle == null) {
-
-                bk.asyncOpenLedger(imlr.range.getLedgerId(), DigestType.CRC32, passwd,
-                new SafeAsynBKCallback.OpenCallback() {
-                    @Override
-                    public void safeOpenComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
-                        if (rc == BKException.Code.OK) {
-                            imlr.handle = ledgerHandle;
-                            read(imlr, startSeqId, endSeqId);
-                            return;
-                        }
-                        BKException bke = BKException.create(rc);
-                        logger.error("Could not open ledger: " + imlr.range.getLedgerId() + " for topic: "
-                                     + topic);
-                        request.callback.scanFailed(ctx, new PubSubException.ServiceDownException(bke));
-                        return;
-                    }
-                }, request.ctx);
-                return;
-            }
-
-            // ledger handle is not null, we can read from it
-            long correctedEndSeqId = Math.min(startSeqId + request.messageLimit - numMessagesRead - 1, endSeqId);
-
-            if (logger.isDebugEnabled()) {
-                logger.debug("Issuing a bk read for ledger: " + imlr.handle.getId() + " from entry-id: "
-                             + (startSeqId - imlr.getStartSeqIdIncluded()) + " to entry-id: "
-                             + (correctedEndSeqId - imlr.getStartSeqIdIncluded()));
-            }
-
-            imlr.handle.asyncReadEntries(startSeqId - imlr.getStartSeqIdIncluded(), correctedEndSeqId
-            - imlr.getStartSeqIdIncluded(), new SafeAsynBKCallback.ReadCallback() {
-
-                long expectedEntryId = startSeqId - imlr.getStartSeqIdIncluded();
-
-                @Override
-                public void safeReadComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
-                    if (rc != BKException.Code.OK || !seq.hasMoreElements()) {
-                        if (rc == BKException.Code.OK) {
-                            // means that there is no entries read, provide a meaningful exception
-                            rc = BKException.Code.NoSuchEntryException;
-                        }
-                        BKException bke = BKException.create(rc);
-                        logger.error("Error while reading from ledger: " + imlr.range.getLedgerId() + " for topic: "
-                                     + topic.toStringUtf8(), bke);
-                        request.callback.scanFailed(request.ctx, new PubSubException.ServiceDownException(bke));
-                        return;
-                    }
-
-                    LedgerEntry entry = null;
-                    while (seq.hasMoreElements()) {
-                        entry = seq.nextElement();
-                        Message message;
-                        try {
-                            message = Message.parseFrom(entry.getEntryInputStream());
-                        } catch (IOException e) {
-                            String msg = "Unreadable message found in ledger: " + imlr.range.getLedgerId()
-                                         + " for topic: " + topic.toStringUtf8();
-                            logger.error(msg, e);
-                            request.callback.scanFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
-                            return;
-                        }
-
-                        logger.debug("Read response from ledger: {} entry-id: {}",
-                                     lh.getId(), entry.getEntryId());
-
-                        assert expectedEntryId == entry.getEntryId() : "expectedEntryId (" + expectedEntryId
-                        + ") != entry.getEntryId() (" + entry.getEntryId() + ")";
-                        assert (message.getMsgId().getLocalComponent() - imlr.getStartSeqIdIncluded()) == expectedEntryId;
-
-                        expectedEntryId++;
-                        request.callback.messageScanned(ctx, message);
-                        numMessagesRead++;
-                        totalSizeRead += message.getBody().size();
-
-                        if (numMessagesRead >= request.messageLimit) {
-                            request.callback.scanFinished(ctx, ReasonForFinish.NUM_MESSAGES_LIMIT_EXCEEDED);
-                            return;
-                        }
-
-                        if (totalSizeRead >= request.sizeLimit) {
-                            request.callback.scanFinished(ctx, ReasonForFinish.SIZE_LIMIT_EXCEEDED);
-                            return;
-                        }
-                    }
-
-                    // continue scanning messages
-                    scanMessages(request, imlr.getStartSeqIdIncluded() + entry.getEntryId() + 1, numMessagesRead, totalSizeRead);
-                }
-            }, request.ctx);
-        }
-
-        protected void startReadingFrom(long startSeqId) {
-
-            Map.Entry<Long, InMemoryLedgerRange> entry = topicInfo.ledgerRanges.ceilingEntry(startSeqId);
-
-            if (entry == null) {
-                // None of the old ledgers have this seq-id, we must use the
-                // current ledger
-                long endSeqId = topicInfo.currentLedgerRange.getStartSeqIdIncluded()
-                                + topicInfo.lastEntryIdAckedInCurrentLedger;
-
-                if (endSeqId < startSeqId) {
-                    request.callback.scanFinished(request.ctx, ReasonForFinish.NO_MORE_MESSAGES);
-                    return;
-                }
-
-                read(topicInfo.currentLedgerRange, startSeqId, endSeqId);
-            } else {
-                read(entry.getValue(), startSeqId, entry.getValue().range.getEndSeqIdIncluded().getLocalComponent());
-            }
-
-        }
-
-    }
-
-    @Override
-    public void scanMessages(RangeScanRequest request) {
-        queuer.pushAndMaybeRun(request.topic, new RangeScanOp(request));
-    }
-
-    protected void scanMessages(RangeScanRequest request, long scanSeqId, int numMsgsRead, long totalSizeRead) {
-        queuer.pushAndMaybeRun(request.topic, new RangeScanOp(request, scanSeqId, numMsgsRead, totalSizeRead));
-    }
-
-    public void deliveredUntil(ByteString topic, Long seqId) {
-        // Nothing to do here. this is just a hint that we cannot use.
-    }
-
-    class UpdateLedgerOp extends TopicOpQueuer.AsynchronousOp<Void> {
-        private Set<Long> ledgersDeleted;
-
-        public UpdateLedgerOp(ByteString topic, final Callback<Void> cb, final Object ctx,
-                              Set<Long> ledgersDeleted) {
-            queuer.super(topic, cb, ctx);
-            this.ledgersDeleted = ledgersDeleted;
-        }
-
-        @Override
-        public void run() {
-            final TopicInfo topicInfo = topicInfos.get(topic);
-            if (topicInfo == null) {
-                logger.error("Server is not responsible for topic!");
-                cb.operationFailed(ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
-                return;
-            }
-            LedgerRanges.Builder builder = LedgerRanges.newBuilder();
-            final Set<Long> keysToRemove = new HashSet<Long>();
-            boolean foundUnconsumedLedger = false;
-            for (Map.Entry<Long, InMemoryLedgerRange> e : topicInfo.ledgerRanges.entrySet()) {
-                LedgerRange lr = e.getValue().range;
-                long ledgerId = lr.getLedgerId();
-                if (!foundUnconsumedLedger && ledgersDeleted.contains(ledgerId)) {
-                    keysToRemove.add(e.getKey());
-                    if (!lr.hasEndSeqIdIncluded()) {
-                        String msg = "Should not remove unclosed ledger " + ledgerId + " for topic " + topic.toStringUtf8();
-                        logger.error(msg);
-                        cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
-                        return;
-                    }
-                } else {
-                    foundUnconsumedLedger = true;
-                    builder.addRanges(lr);
-                }
-            }
-            builder.addRanges(topicInfo.currentLedgerRange.range);
-
-            if (!keysToRemove.isEmpty()) {
-                final LedgerRanges newRanges = builder.build();
-                tpManager.writeTopicPersistenceInfo(
-                topic, newRanges, topicInfo.ledgerRangesVersion, new Callback<Version>() {
-                    public void operationFinished(Object ctx, Version newVersion) {
-                        // Finally, all done
-                        for (Long k : keysToRemove) {
-                            topicInfo.ledgerRanges.remove(k);
-                        }
-                        topicInfo.ledgerRangesVersion = newVersion;
-                        cb.operationFinished(ctx, null);
-                    }
-                    public void operationFailed(Object ctx, PubSubException exception) {
-                        cb.operationFailed(ctx, exception);
-                    }
-                }, ctx);
-            } else {
-                cb.operationFinished(ctx, null);
-            }
-        }
-    }
-
-    class ConsumeUntilOp extends TopicOpQueuer.SynchronousOp {
-        private final long seqId;
-
-        public ConsumeUntilOp(ByteString topic, long seqId) {
-            queuer.super(topic);
-            this.seqId = seqId;
-        }
-
-        @Override
-        public void runInternal() {
-            TopicInfo topicInfo = topicInfos.get(topic);
-            if (topicInfo == null) {
-                logger.error("Server is not responsible for topic!");
-                return;
-            }
-
-            final LinkedList<Long> ledgersToDelete = new LinkedList<Long>();
-            for (Long endSeqIdIncluded : topicInfo.ledgerRanges.keySet()) {
-                if (endSeqIdIncluded <= seqId) {
-                    // This ledger's message entries have all been consumed already
-                    // so it is safe to delete it from BookKeeper.
-                    long ledgerId = topicInfo.ledgerRanges.get(endSeqIdIncluded).range.getLedgerId();
-                    ledgersToDelete.add(ledgerId);
-                } else {
-                    break;
-                }
-            }
-
-            // no ledgers need to delete
-            if (ledgersToDelete.isEmpty()) {
-                return;
-            }
-
-            Set<Long> ledgersDeleted = new HashSet<Long>();
-            deleteLedgersAndUpdateLedgersRange(topic, ledgersToDelete, ledgersDeleted);
-        }
-    }
-
-    private void deleteLedgersAndUpdateLedgersRange(final ByteString topic,
-                                                    final LinkedList<Long> ledgersToDelete,
-                                                    final Set<Long> ledgersDeleted) {
-        if (ledgersToDelete.isEmpty()) {
-            Callback<Void> cb = new Callback<Void>() {
-                public void operationFinished(Object ctx, Void result) {
-                    // do nothing, op is async to stop other ops
-                    // occurring on the topic during the update
-                }
-                public void operationFailed(Object ctx, PubSubException exception) {
-                    logger.error("Failed to update ledger znode for topic {} deleting ledgers {} : {}",
-                                 va(topic.toStringUtf8(), ledgersDeleted, exception.getMessage()));
-                }
-            };
-            queuer.pushAndMaybeRun(topic, new UpdateLedgerOp(topic, cb, null, ledgersDeleted));
-            return;
-        }
-
-        final Long ledger = ledgersToDelete.poll();
-        if (null == ledger) {
-            deleteLedgersAndUpdateLedgersRange(topic, ledgersToDelete, ledgersDeleted);
-            return;
-        }
-
-        bk.asyncDeleteLedger(ledger, new DeleteCallback() {
-            @Override
-            public void deleteComplete(int rc, Object ctx) {
-                if (BKException.Code.NoSuchLedgerExistsException == rc ||
-                    BKException.Code.OK == rc) {
-                    ledgersDeleted.add(ledger);
-                    deleteLedgersAndUpdateLedgersRange(topic, ledgersToDelete, ledgersDeleted);
-                    return;
-                } else {
-                    logger.warn("Exception while deleting consumed ledger {}, stop deleting other ledgers {} "
-                                + "and update ledger ranges with deleted ledgers {} : {}",
-                                va(ledger, ledgersToDelete, ledgersDeleted, BKException.create(rc)));
-                    // We should not continue when failed to delete ledger
-                    Callback<Void> cb = new Callback<Void>() {
-                        public void operationFinished(Object ctx, Void result) {
-                            // do nothing, op is async to stop other ops
-                            // occurring on the topic during the update
-                        }
-                        public void operationFailed(Object ctx, PubSubException exception) {
-                            logger.error("Failed to update ledger znode for topic {} deleting ledgers {} : {}",
-                                         va(topic, ledgersDeleted, exception.getMessage()));
-                        }
-                    };
-                    queuer.pushAndMaybeRun(topic, new UpdateLedgerOp(topic, cb, null, ledgersDeleted));
-                    return;
-                }
-            }
-        }, null);
-    }
-
-    public void consumedUntil(ByteString topic, Long seqId) {
-        queuer.pushAndMaybeRun(topic, new ConsumeUntilOp(topic, Math.max(seqId, getMinSeqIdForTopic(topic))));
-    }
-
-    public void consumeToBound(ByteString topic) {
-        TopicInfo topicInfo = topicInfos.get(topic);
-
-        if (topicInfo == null || topicInfo.messageBound == topicInfo.UNLIMITED) {
-            return;
-        }
-        queuer.pushAndMaybeRun(topic, new ConsumeUntilOp(topic, getMinSeqIdForTopic(topic)));
-    }
-
-    public long getMinSeqIdForTopic(ByteString topic) {
-        TopicInfo topicInfo = topicInfos.get(topic);
-
-        if (topicInfo == null || topicInfo.messageBound == topicInfo.UNLIMITED) {
-            return Long.MIN_VALUE;
-        } else {
-            return (topicInfo.lastSeqIdPushed.getLocalComponent() - topicInfo.messageBound) + 1;
-        }
-    }
-
-    public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) throws ServerNotResponsibleForTopicException {
-        TopicInfo topicInfo = topicInfos.get(topic);
-
-        if (topicInfo == null) {
-            throw new PubSubException.ServerNotResponsibleForTopicException("");
-        }
-
-        return topicInfo.lastSeqIdPushed;
-    }
-
-    public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) {
-        return Math.max(seqId + skipAmount, getMinSeqIdForTopic(topic));
-    }
-
-    /**
-     * Release topic on failure
-     *
-     * @param topic
-     *          Topic Name
-     * @param e
-     *          Failure Exception
-     * @param ctx
-     *          Callback context
-     */
-    protected void releaseTopicIfRequested(final ByteString topic, Exception e, Object ctx) {
-        TopicInfo topicInfo = topicInfos.get(topic);
-        if (topicInfo == null) {
-            logger.warn("No topic found when trying to release ownership of topic " + topic.toStringUtf8()
-                      + " on failure.");
-            return;
-        }
-        // do release owner ship of topic
-        if (topicInfo.doRelease.compareAndSet(false, true)) {
-            logger.info("Release topic " + topic.toStringUtf8() + " when bookkeeper persistence mananger encounters failure :",
-                        e);
-            tm.releaseTopic(topic, new Callback<Void>() {
-                @Override
-                public void operationFailed(Object ctx, PubSubException exception) {
-                    logger.error("Exception found on releasing topic " + topic.toStringUtf8()
-                               + " when encountering exception from bookkeeper:", exception);
-                }
-                @Override
-                public void operationFinished(Object ctx, Void resultOfOperation) {
-                    logger.info("successfully releasing topic {} when encountering"
-                              + " exception from bookkeeper", topic.toStringUtf8());
-                }
-            }, null);
-        }
-        // if release happens when the topic is changing ledger
-        // we need to fail all queued persist requests
-        if (topicInfo.doChangeLedger.get()) {
-            for (PersistRequest pr : topicInfo.deferredRequests) {
-                pr.getCallback().operationFailed(ctx, new PubSubException.ServiceDownException(e));
-            }
-            topicInfo.deferredRequests.clear();
-            topicInfo.lastSeqIdBeforeLedgerChange = -1;
-        }
-    }
-
-    public class PersistOp extends TopicOpQueuer.SynchronousOp {
-        PersistRequest request;
-
-        public PersistOp(PersistRequest request) {
-            queuer.super(request.topic);
-            this.request = request;
-        }
-
-        @Override
-        public void runInternal() {
-            doPersistMessage(request);
-        }
-    }
-
-    /**
-     * Persist a message by executing a persist request.
-     */
-    protected void doPersistMessage(final PersistRequest request) {
-        final ByteString topic = request.topic;
-        final TopicInfo topicInfo = topicInfos.get(topic);
-
-        if (topicInfo == null) {
-            request.getCallback().operationFailed(request.ctx,
-                                             new PubSubException.ServerNotResponsibleForTopicException(""));
-            return;
-        }
-
-        if (topicInfo.doRelease.get()) {
-            request.getCallback().operationFailed(request.ctx, new PubSubException.ServiceDownException(
-                "The ownership of the topic is releasing due to unrecoverable issue."));
-            return;
-        }
-
-        // if the topic is changing ledger, queue following persist requests until ledger is changed
-        if (topicInfo.doChangeLedger.get()) {
-            logger.info("Topic {} is changing ledger, so queue persist request for message.",
-                        topic.toStringUtf8());
-            topicInfo.deferredRequests.add(request);
-            return;
-        }
-
-        final long localSeqId = topicInfo.lastSeqIdPushed.getLocalComponent() + 1;
-        MessageSeqId.Builder builder = MessageSeqId.newBuilder();
-        if (request.message.hasMsgId()) {
-            MessageIdUtils.takeRegionMaximum(builder, topicInfo.lastSeqIdPushed, request.message.getMsgId());
-        } else {
-            builder.addAllRemoteComponents(topicInfo.lastSeqIdPushed.getRemoteComponentsList());
-        }
-        builder.setLocalComponent(localSeqId);
-
-        // check whether reach the threshold of a ledger, if it does,
-        // open a ledger to write
-        long entriesInThisLedger = localSeqId - topicInfo.currentLedgerRange.getStartSeqIdIncluded() + 1;
-        if (UNLIMITED_ENTRIES != maxEntriesPerLedger &&
-            entriesInThisLedger >= maxEntriesPerLedger) {
-            if (topicInfo.doChangeLedger.compareAndSet(false, true)) {
-                // for order guarantees, we should wait until all the adding operations for current ledger
-                // are succeed. so we just mark it as lastSeqIdBeforeLedgerChange
-                // when the lastSeqIdBeforeLedgerChange acked, we do changing the ledger
-                if (null == topicInfo.deferredRequests) {
-                    topicInfo.deferredRequests = new LinkedList<PersistRequest>();
-                }
-                topicInfo.lastSeqIdBeforeLedgerChange = localSeqId;
-            }
-        }
-
-        topicInfo.lastSeqIdPushed = builder.build();
-        Message msgToSerialize = Message.newBuilder(request.message).setMsgId(topicInfo.lastSeqIdPushed).build();
-
-        final MessageSeqId responseSeqId = msgToSerialize.getMsgId();
-        topicInfo.currentLedgerRange.handle.asyncAddEntry(msgToSerialize.toByteArray(),
-        new SafeAsynBKCallback.AddCallback() {
-            AtomicBoolean processed = new AtomicBoolean(false);
-            @Override
-            public void safeAddComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
-
-                // avoid double callback by mistake, since we may do change ledger in this callback.
-                if (!processed.compareAndSet(false, true)) {
-                    return;
-                }
-                if (rc != BKException.Code.OK) {
-                    BKException bke = BKException.create(rc);
-                    logger.error("Error while persisting entry to ledger: " + lh.getId() + " for topic: "
-                                 + topic.toStringUtf8(), bke);
-                    request.getCallback().operationFailed(ctx, new PubSubException.ServiceDownException(bke));
-
-                    // To preserve ordering guarantees, we
-                    // should give up the topic and not let
-                    // other operations through
-                    releaseTopicIfRequested(request.topic, bke, ctx);
-                    return;
-                }
-
-                if (entryId + topicInfo.currentLedgerRange.getStartSeqIdIncluded() != localSeqId) {
-                    String msg = "Expected BK to assign entry-id: "
-                                 + (localSeqId - topicInfo.currentLedgerRange.getStartSeqIdIncluded())
-                                 + " but it instead assigned entry-id: " + entryId + " topic: "
-                                 + topic.toStringUtf8() + "ledger: " + lh.getId();
-                    logger.error(msg);
-                    throw new UnexpectedError(msg);
-                }
-
-                topicInfo.lastEntryIdAckedInCurrentLedger = entryId;
-                request.getCallback().operationFinished(ctx, responseSeqId);
-                // if this acked entry is the last entry of current ledger
-                // we can add a ChangeLedgerOp to execute to change ledger
-                if (topicInfo.doChangeLedger.get() &&
-                    entryId + topicInfo.currentLedgerRange.getStartSeqIdIncluded() == topicInfo.lastSeqIdBeforeLedgerChange) {
-                    // change ledger
-                    changeLedger(topic, new Callback<Void>() {
-                        @Override
-                        public void operationFailed(Object ctx, PubSubException exception) {
-                            logger.error("Failed to change ledger for topic " + topic.toStringUtf8(), exception);
-                            // change ledger failed, we should give up topic
-                            releaseTopicIfRequested(request.topic, exception, ctx);
-                        }
-                        @Override
-                        public void operationFinished(Object ctx, Void resultOfOperation) {
-                            topicInfo.doChangeLedger.set(false);
-                            topicInfo.lastSeqIdBeforeLedgerChange = -1;
-                            // the ledger is changed, persist queued requests
-                            // if the number of queued persist requests is more than maxEntriesPerLedger
-                            // we just persist maxEntriesPerLedger requests, other requests are still queued
-                            // until next ledger changed.
-                            int numRequests = 0;
-                            while (!topicInfo.deferredRequests.isEmpty() &&
-                                   numRequests < maxEntriesPerLedger) {
-                                PersistRequest pr = topicInfo.deferredRequests.removeFirst();
-                                doPersistMessage(pr);
-                                ++numRequests;
-                            }
-                            logger.debug("Finished persisting {} queued requests, but there are still {} requests in queue.",
-                                         numRequests, topicInfo.deferredRequests.size());
-                        }
-                    }, ctx);
-                }
-            }
-        }, request.ctx);
-    }
-
-    public void persistMessage(PersistRequest request) {
-        queuer.pushAndMaybeRun(request.topic, new PersistOp(request));
-    }
-
-    public void scanSingleMessage(ScanRequest request) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    static SafeAsynBKCallback.CloseCallback noOpCloseCallback = new SafeAsynBKCallback.CloseCallback() {
-        @Override
-        public void safeCloseComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
-        };
-    };
-
-    class AcquireOp extends TopicOpQueuer.AsynchronousOp<Void> {
-        public AcquireOp(ByteString topic, Callback<Void> cb, Object ctx) {
-            queuer.super(topic, cb, ctx);
-        }
-
-        @Override
-        public void run() {
-            if (topicInfos.containsKey(topic)) {
-                // Already acquired, do nothing
-                cb.operationFinished(ctx, null);
-                return;
-            }
-
-            // read persistence info
-            tpManager.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() {
-                @Override
-                public void operationFinished(Object ctx, Versioned<LedgerRanges> ranges) {
-                    if (null != ranges) {
-                        processTopicLedgerRanges(ranges.getValue(), ranges.getVersion());
-                    } else {
-                        processTopicLedgerRanges(LedgerRanges.getDefaultInstance(), Version.NEW);
-                    }
-                }
-                @Override
-                public void operationFailed(Object ctx, PubSubException exception) {
-                    cb.operationFailed(ctx, exception);
-                }
-            }, ctx);
-        }
-
-        void processTopicLedgerRanges(final LedgerRanges ranges, final Version version) {
-            final List<LedgerRange> rangesList = ranges.getRangesList();
-            if (!rangesList.isEmpty()) {
-                LedgerRange range = rangesList.get(0);
-                if (range.hasStartSeqIdIncluded()) {
-                    // we already have start seq id
-                    processTopicLedgerRanges(rangesList, version, range.getStartSeqIdIncluded());
-                    return;
-                }
-                getStartSeqIdToProcessTopicLedgerRanges(rangesList, version);
-                return;
-            }
-            // process topic ledger ranges directly
-            processTopicLedgerRanges(rangesList, version, START_SEQ_ID);
-        }
-
-        /**
-         * Process old version ledger ranges to fetch start seq id.
-         */
-        void getStartSeqIdToProcessTopicLedgerRanges(
-            final List<LedgerRange> rangesList, final Version version) {
-
-            final LedgerRange range = rangesList.get(0);
-
-            if (!range.hasEndSeqIdIncluded()) {
-                // process topic ledger ranges directly
-                processTopicLedgerRanges(rangesList, version, START_SEQ_ID);
-                return;
-            }
-
-            final long ledgerId = range.getLedgerId();
-            // open the first ledger to compute right start seq id
-            bk.asyncOpenLedger(ledgerId, DigestType.CRC32, passwd,
-            new SafeAsynBKCallback.OpenCallback() {
-
-                @Override
-                public void safeOpenComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
-
-                    if (rc == BKException.Code.NoSuchLedgerExistsException) {
-                        // process next ledger 
-                        processTopicLedgerRanges(rangesList, version, START_SEQ_ID);
-                        return;
-                    } else if (rc != BKException.Code.OK) {
-                        BKException bke = BKException.create(rc);
-                        logger.error("Could not open ledger {} to get start seq id while acquiring topic {} : {}",
-                                     va(ledgerId, topic.toStringUtf8(), bke));
-                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
-                        return;
-                    }
-
-                    final long numEntriesInLastLedger = ledgerHandle.getLastAddConfirmed() + 1;
-
-                    // the ledger is closed before, calling close is just a nop operation.
-                    try {
-                        ledgerHandle.close();
-                    } catch (InterruptedException ie) {
-                        // the exception would never be thrown for a read only ledger handle.
-                    } catch (BKException bke) {
-                        // the exception would never be thrown for a read only ledger handle.
-                    }
-
-                    if (numEntriesInLastLedger <= 0) {
-                        String msg = "No entries found in a have-end-seq-id ledger " + ledgerId
-                                     + " when acquiring topic " + topic.toStringUtf8() + ".";
-                        logger.error(msg);
-                        cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
-                        return;
-                    }
-                    long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
-                    long startOfLedger = endOfLedger - numEntriesInLastLedger + 1;
-
-                    processTopicLedgerRanges(rangesList, version, startOfLedger);
-                }
-
-            }, ctx);
-        }
-
-        void processTopicLedgerRanges(final List<LedgerRange> rangesList, final Version version,
-                                      long startOfLedger) {
-            logger.info("Process {} ledgers for topic {} starting from seq id {}.",
-                        va(rangesList.size(), topic.toStringUtf8(), startOfLedger));
-
-            Iterator<LedgerRange> lrIterator = rangesList.iterator();
-
-            TopicInfo topicInfo = new TopicInfo();
-            while (lrIterator.hasNext()) {
-                LedgerRange range = lrIterator.next();
-
-                if (range.hasEndSeqIdIncluded()) {
-                    // this means it was a valid and completely closed ledger
-                    long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
-                    if (range.hasStartSeqIdIncluded()) {
-                        startOfLedger = range.getStartSeqIdIncluded();
-                    } else {
-                        range = buildLedgerRange(range.getLedgerId(), startOfLedger,
-                                                 range.getEndSeqIdIncluded());
-                    }
-                    topicInfo.ledgerRanges.put(endOfLedger, new InMemoryLedgerRange(range));
-                    if (startOfLedger < endOfLedger + 1) {
-                        startOfLedger = endOfLedger + 1;
-                    }
-                    continue;
-                }
-
-                // If it doesn't have a valid end, it must be the last ledger
-                if (lrIterator.hasNext()) {
-                    String msg = "Ledger-id: " + range.getLedgerId() + " for topic: " + topic.toStringUtf8()
-                                 + " is not the last one but still does not have an end seq-id";
-                    logger.error(msg);
-                    cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
-                    return;
-                }
-
-                if (range.hasStartSeqIdIncluded()) {
-                    startOfLedger = range.getStartSeqIdIncluded();
-                }
-
-                // The last ledger does not have a valid seq-id, lets try to
-                // find it out
-                recoverLastTopicLedgerAndOpenNewOne(range.getLedgerId(), startOfLedger,
-                                                    version, topicInfo);
-                return;
-            }
-
-            // All ledgers were found properly closed, just start a new one
-            openNewTopicLedger(topic, version, topicInfo, startOfLedger, false, cb, ctx);
-        }
-
-        /**
-         * Recovers the last ledger, opens a new one, and persists the new
-         * information to ZK
-         *
-         * @param ledgerId
-         *            Ledger to be recovered
-         * @param expectedStartSeqId 
-         *            Start seq id of the ledger to recover
-         * @param expectedVersionOfLedgerNode
-         *            Expected version to update ledgers range
-         * @param topicInfo
-         *            Topic info
-         */
-        private void recoverLastTopicLedgerAndOpenNewOne(final long ledgerId, final long expectedStartSeqId,
-                final Version expectedVersionOfLedgerNode, final TopicInfo topicInfo) {
-
-            bk.asyncOpenLedger(ledgerId, DigestType.CRC32, passwd, new SafeAsynBKCallback.OpenCallback() {
-                @Override
-                public void safeOpenComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
-
-                    if (rc != BKException.Code.OK) {
-                        BKException bke = BKException.create(rc);
-                        logger.error("While acquiring topic: " + topic.toStringUtf8()
-                                     + ", could not open unrecovered ledger: " + ledgerId, bke);
-                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
-                        return;
-                    }
-
-                    final long numEntriesInLastLedger = ledgerHandle.getLastAddConfirmed() + 1;
-
-                    if (numEntriesInLastLedger <= 0) {
-                        // this was an empty ledger that someone created but
-                        // couldn't write to, so just ignore it
-                        logger.info("Pruning empty ledger: " + ledgerId + " for topic: " + topic.toStringUtf8());
-                        closeLedger(ledgerHandle);
-                        openNewTopicLedger(topic, expectedVersionOfLedgerNode, topicInfo,
-                                           expectedStartSeqId, false, cb, ctx);
-                        return;
-                    }
-
-                    // we have to read the last entry of the ledger to find
-                    // out the last seq-id
-
-                    ledgerHandle.asyncReadEntries(numEntriesInLastLedger - 1, numEntriesInLastLedger - 1,
-                    new SafeAsynBKCallback.ReadCallback() {
-                        @Override
-                        public void safeReadComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
-                        Object ctx) {
-                            if (rc != BKException.Code.OK || !seq.hasMoreElements()) {
-                                if (rc == BKException.Code.OK) {
-                                    // means that there is no entries read, provide a meaningful exception
-                                    rc = BKException.Code.NoSuchEntryException;
-                                }
-                                logger.info("Received error code {}", rc);
-                                BKException bke = BKException.create(rc);
-                                logger.error("While recovering ledger: " + ledgerId + " for topic: "
-                                             + topic.toStringUtf8() + ", could not read last entry", bke);
-                                cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
-                                return;
-                            }
-
-                            Message lastMessage;
-                            try {
-                                lastMessage = Message.parseFrom(seq.nextElement().getEntry());
-                            } catch (InvalidProtocolBufferException e) {
-                                String msg = "While recovering ledger: " + ledgerId + " for topic: "
-                                             + topic.toStringUtf8() + ", could not deserialize last message";
-                                logger.error(msg, e);
-                                cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
-                                return;
-                            }
-
-                            long endOfLedger  = lastMessage.getMsgId().getLocalComponent();
-                            long startOfLedger = endOfLedger - numEntriesInLastLedger + 1;
-
-                            if (startOfLedger != expectedStartSeqId) {
-                                // gap would be introduced by old version when gc consumed ledgers
-                                String msg = "Expected start seq id of recovered ledger " + ledgerId
-                                             + " to be " + expectedStartSeqId + " but it was "
-                                             + startOfLedger + ".";
-                                logger.warn(msg);
-                            }
-
-                            LedgerRange lr = buildLedgerRange(ledgerId, startOfLedger, lastMessage.getMsgId());
-                            topicInfo.ledgerRanges.put(endOfLedger,
-                                    new InMemoryLedgerRange(lr, lh));
-
-                            logger.info("Recovered unclosed ledger: {} for topic: {} with {} entries starting from seq id {}",
-                                        va(ledgerId, topic.toStringUtf8(), numEntriesInLastLedger, startOfLedger));
-
-                            openNewTopicLedger(topic, expectedVersionOfLedgerNode, topicInfo, endOfLedger + 1, false, cb, ctx);
-                        }
-                    }, ctx);
-
-                }
-
-            }, ctx);
-        }
-    }
-
-    /**
-     * Open New Ledger to write for a topic.
-     *
-     * @param topic
-     *          Topic Name
-     * @param expectedVersionOfLedgersNode
-     *          Expected Version to Update Ledgers Node.
-     * @param topicInfo
-     *          Topic Information
-     * @param startSeqId
-     *          Start of sequence id for new ledger
-     * @param changeLedger
-     *          Whether is it called when changing ledger
-     * @param cb
-     *          Callback to trigger after opening new ledger.
-     * @param ctx
-     *          Callback context.
-     */
-    void openNewTopicLedger(final ByteString topic,
-                            final Version expectedVersionOfLedgersNode, final TopicInfo topicInfo,
-                            final long startSeqId, final boolean changeLedger,
-                            final Callback<Void> cb, final Object ctx) {
-        bk.asyncCreateLedger(cfg.getBkEnsembleSize(), cfg.getBkWriteQuorumSize(),
-                             cfg.getBkAckQuorumSize(), DigestType.CRC32, passwd,
-        new SafeAsynBKCallback.CreateCallback() {
-            AtomicBoolean processed = new AtomicBoolean(false);
-
-            @Override
-            public void safeCreateComplete(int rc, LedgerHandle lh, Object ctx) {
-                if (!processed.compareAndSet(false, true)) {
-                    return;
-                }
-
-                if (rc != BKException.Code.OK) {
-                    BKException bke = BKException.create(rc);
-                    logger.error("Could not create new ledger while acquiring topic: "
-                                 + topic.toStringUtf8(), bke);
-                    cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
-                    return;
-                }
-
-                // compute last seq id
-                if (!changeLedger) {
-                    topicInfo.lastSeqIdPushed = topicInfo.ledgerRanges.isEmpty() ? MessageSeqId.newBuilder()
-                                                .setLocalComponent(startSeqId - 1).build() : topicInfo.ledgerRanges.lastEntry().getValue().range
-                                                .getEndSeqIdIncluded();
-                }
-
-                LedgerRange lastRange = LedgerRange.newBuilder().setLedgerId(lh.getId())
-                                        .setStartSeqIdIncluded(startSeqId).build();
-                topicInfo.currentLedgerRange = new InMemoryLedgerRange(lastRange, lh);
-                topicInfo.lastEntryIdAckedInCurrentLedger = -1;
-
-                // Persist the fact that we started this new
-                // ledger to ZK
-
-                LedgerRanges.Builder builder = LedgerRanges.newBuilder();
-                for (InMemoryLedgerRange imlr : topicInfo.ledgerRanges.values()) {
-                    builder.addRanges(imlr.range);
-                }
-                builder.addRanges(lastRange);
-
-                tpManager.writeTopicPersistenceInfo(
-                topic, builder.build(), expectedVersionOfLedgersNode, new Callback<Version>() {
-                    @Override
-                    public void operationFinished(Object ctx, Version newVersion) {
-                        // Finally, all done
-                        topicInfo.ledgerRangesVersion = newVersion;
-                        topicInfos.put(topic, topicInfo);
-                        cb.operationFinished(ctx, null);
-                    }
-                    @Override
-                    public void operationFailed(Object ctx, PubSubException exception) {
-                        cb.operationFailed(ctx, exception);
-                    }
-                }, ctx);
-                return;
-            }
-        }, ctx);
-    }
-
-    /**
-     * acquire ownership of a topic, doing whatever is needed to be able to
-     * perform reads and writes on that topic from here on
-     *
-     * @param topic
-     * @param callback
-     * @param ctx
-     */
-    @Override
-    public void acquiredTopic(ByteString topic, Callback<Void> callback, Object ctx) {
-        queuer.pushAndMaybeRun(topic, new AcquireOp(topic, callback, ctx));
-    }
-
-    /**
-     * Change ledger to write for a topic.
-     */
-    class ChangeLedgerOp extends TopicOpQueuer.AsynchronousOp<Void> {
-
-        public ChangeLedgerOp(ByteString topic, Callback<Void> cb, Object ctx) {
-            queuer.super(topic, cb, ctx);
-        }
-
-        @Override
-        public void run() {
-            TopicInfo topicInfo = topicInfos.get(topic);
-            if (null == topicInfo) {
-                logger.error("Weired! hub server doesn't own topic " + topic.toStringUtf8()
-                           + " when changing ledger to write.");
-                cb.operationFailed(ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
-                return;
-            }
-            closeLastTopicLedgerAndOpenNewOne(topicInfo);
-        }
-
-        private void closeLastTopicLedgerAndOpenNewOne(final TopicInfo topicInfo) {
-            final long ledgerId = topicInfo.currentLedgerRange.handle.getId();
-            topicInfo.currentLedgerRange.handle.asyncClose(new CloseCallback() {
-                AtomicBoolean processed = new AtomicBoolean(false);
-                @Override
-                public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
-                    if (!processed.compareAndSet(false, true)) {
-                        return;
-                    }
-                    if (BKException.Code.OK != rc) {
-                        BKException bke = BKException.create(rc);
-                        logger.error("Could not close ledger " + ledgerId
-                                   + " while changing ledger of topic " + topic.toStringUtf8(), bke);
-                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
-                        return;
-                    }
-                    long endSeqId = topicInfo.lastSeqIdPushed.getLocalComponent();
-                    // update last range
-                    LedgerRange lastRange =
-                        buildLedgerRange(ledgerId, topicInfo.currentLedgerRange.getStartSeqIdIncluded(),
-                                         topicInfo.lastSeqIdPushed);
-
-                    topicInfo.currentLedgerRange.range = lastRange;
-                    // put current ledger to ledger ranges
-                    topicInfo.ledgerRanges.put(endSeqId, topicInfo.currentLedgerRange);
-                    logger.info("Closed written ledger " + ledgerId + " for topic "
-                              + topic.toStringUtf8() + " to change ledger.");
-                    openNewTopicLedger(topic, topicInfo.ledgerRangesVersion,
-                                       topicInfo, endSeqId + 1, true, cb, ctx);
-                }
-            }, ctx);
-        }
-
-    }
-
-    /**
-     * Change ledger to write for a topic.
-     *
-     * @param topic
-     *          Topic Name
-     */
-    protected void changeLedger(ByteString topic, Callback<Void> cb, Object ctx) {
-        queuer.pushAndMaybeRun(topic, new ChangeLedgerOp(topic, cb, ctx));
-    }
-
-    public void closeLedger(LedgerHandle lh) {
-        // try {
-        // lh.asyncClose(noOpCloseCallback, null);
-        // } catch (InterruptedException e) {
-        // logger.error(e);
-        // Thread.currentThread().interrupt();
-        // }
-    }
-
-    class ReleaseOp extends TopicOpQueuer.SynchronousOp {
-
-        public ReleaseOp(ByteString topic) {
-            queuer.super(topic);
-        }
-
-        @Override
-        public void runInternal() {
-            TopicInfo topicInfo = topicInfos.remove(topic);
-
-            if (topicInfo == null) {
-                return;
-            }
-
-            for (InMemoryLedgerRange imlr : topicInfo.ledgerRanges.values()) {
-                if (imlr.handle != null) {
-                    closeLedger(imlr.handle);
-                }
-            }
-
-            if (topicInfo.currentLedgerRange != null && topicInfo.currentLedgerRange.handle != null) {
-                closeLedger(topicInfo.currentLedgerRange.handle);
-            }
-        }
-    }
-
-    /**
-     * Release any resources for the topic that might be currently held. There
-     * wont be any subsequent reads or writes on that topic coming
-     *
-     * @param topic
-     */
-    @Override
-    public void lostTopic(ByteString topic) {
-        queuer.pushAndMaybeRun(topic, new ReleaseOp(topic));
-    }
-
-    class SetMessageBoundOp extends TopicOpQueuer.SynchronousOp {
-        final int bound;
-
-        public SetMessageBoundOp(ByteString topic, int bound) {
-            queuer.super(topic);
-            this.bound = bound;
-        }
-
-        @Override
-        public void runInternal() {
-            TopicInfo topicInfo = topicInfos.get(topic);
-            if (topicInfo != null) {
-                topicInfo.messageBound = bound;
-            }
-        }
-    }
-
-    public void setMessageBound(ByteString topic, Integer bound) {
-        queuer.pushAndMaybeRun(topic, new SetMessageBoundOp(topic, bound));
-    }
-
-    public void clearMessageBound(ByteString topic) {
-        queuer.pushAndMaybeRun(topic, new SetMessageBoundOp(topic, TopicInfo.UNLIMITED));
-    }
-
-    @Override
-    public void stop() {
-        try {
-            tpManager.close();
-        } catch (IOException ioe) {
-            logger.warn("Exception closing topic persistence manager : ", ioe);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CacheKey.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CacheKey.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CacheKey.java
deleted file mode 100644
index 26bdb94..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CacheKey.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.server.common.ByteStringInterner;
-
-public class CacheKey {
-
-    ByteString topic;
-    long seqId;
-
-    public CacheKey(ByteString topic, long seqId) {
-        this.topic = ByteStringInterner.intern(topic);
-        this.seqId = seqId;
-    }
-
-    public ByteString getTopic() {
-        return topic;
-    }
-
-    public long getSeqId() {
-        return seqId;
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + (int) (seqId ^ (seqId >>> 32));
-        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        CacheKey other = (CacheKey) obj;
-        if (seqId != other.seqId)
-            return false;
-        if (topic == null) {
-            if (other.topic != null)
-                return false;
-        } else if (!topic.equals(other.topic))
-            return false;
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        return "(" + topic.toStringUtf8() + "," + seqId + ")";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CacheValue.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CacheValue.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CacheValue.java
deleted file mode 100644
index 992ff11..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CacheValue.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.server.common.UnexpectedError;
-
-/**
- * This class is NOT thread safe. It need not be thread-safe because our
- * read-ahead cache will operate with only 1 thread
- *
- */
-public class CacheValue {
-
-    private static final Logger logger = LoggerFactory.getLogger(ReadAheadCache.class);
-
-    // Actually we don't care the order of callbacks
-    // when a scan callback, it should be delivered to both callbacks
-    Set<ScanCallbackWithContext> callbacks = new HashSet<ScanCallbackWithContext>();
-    Message message;
-    long timeOfAddition = 0;
-
-    public CacheValue() {
-    }
-
-    public boolean isStub() {
-        return message == null;
-    }
-
-    public long getTimeOfAddition() {
-        if (message == null) {
-            throw new UnexpectedError("Time of add requested from a stub");
-        }
-        return timeOfAddition;
-    }
-
-    public void setMessageAndInvokeCallbacks(Message message, long currTime) {
-        if (this.message != null) {
-            // Duplicate read for the same message coming back
-            return;
-        }
-
-        this.message = message;
-        this.timeOfAddition = currTime;
-
-        logger.debug("Invoking {} callbacks for {} message added to cache", callbacks.size(), message);
-        for (ScanCallbackWithContext callbackWithCtx : callbacks) {
-            if (null != callbackWithCtx) {
-                callbackWithCtx.getScanCallback().messageScanned(callbackWithCtx.getCtx(), message);
-            }
-        }
-    }
-
-    public boolean removeCallback(ScanCallback callback, Object ctx) {
-        return callbacks.remove(new ScanCallbackWithContext(callback, ctx));
-    }
-
-    public void addCallback(ScanCallback callback, Object ctx) {
-        if (!isStub()) {
-            // call the callback right away
-            callback.messageScanned(ctx, message);
-            return;
-        }
-
-        callbacks.add(new ScanCallbackWithContext(callback, ctx));
-    }
-
-    public Message getMessage() {
-        return message;
-    }
-
-    public void setErrorAndInvokeCallbacks(Exception exception) {
-        for (ScanCallbackWithContext callbackWithCtx : callbacks) {
-            if (null != callbackWithCtx) {
-                callbackWithCtx.getScanCallback().scanFailed(callbackWithCtx.getCtx(), exception);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CancelScanRequest.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CancelScanRequest.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CancelScanRequest.java
deleted file mode 100644
index c3b5214..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CancelScanRequest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-public interface CancelScanRequest {
-
-    /**
-     * @return the scan request to cancel
-     */
-    public ScanRequest getScanRequest();
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/Factory.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/Factory.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/Factory.java
deleted file mode 100644
index c1ee24c..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/Factory.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-public interface Factory<T> {
-    public T newInstance();
-}