You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/08/19 23:25:22 UTC

svn commit: r987314 [11/16] - in /hadoop/zookeeper/trunk: ./ src/contrib/hedwig/ src/contrib/hedwig/client/ src/contrib/hedwig/client/src/ src/contrib/hedwig/client/src/main/ src/contrib/hedwig/client/src/main/cpp/ src/contrib/hedwig/client/src/main/cp...

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,739 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protoextensions.MessageIdUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.common.TopicOpQueuer;
+import org.apache.hedwig.server.common.UnexpectedError;
+import org.apache.hedwig.server.persistence.ScanCallback.ReasonForFinish;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.zookeeper.SafeAsynBKCallback;
+import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
+import org.apache.hedwig.zookeeper.ZkUtils;
+
+/**
+ * This persistence manager uses zookeeper and bookkeeper to store messages.
+ * 
+ * Information about topics are stored in zookeeper with a znode named after the
+ * topic that contains an ASCII encoded list with records of the following form:
+ * 
+ * <pre>
+ * startSeqId(included)\tledgerId\n
+ * </pre>
+ * 
+ */
+
+public class BookkeeperPersistenceManager implements PersistenceManagerWithRangeScan, TopicOwnershipChangeListener {
+    static Logger logger = Logger.getLogger(BookkeeperPersistenceManager.class);
+    static byte[] passwd = "sillysecret".getBytes();
+    private BookKeeper bk;
+    private ZooKeeper zk;
+    private ServerConfiguration cfg;
+
+    static class InMemoryLedgerRange {
+        LedgerRange range;
+        long startSeqIdIncluded; // included, for the very first ledger, this
+        // value is 1
+        LedgerHandle handle;
+
+        public InMemoryLedgerRange(LedgerRange range, long startSeqId, LedgerHandle handle) {
+            this.range = range;
+            this.startSeqIdIncluded = startSeqId;
+            this.handle = handle;
+        }
+
+        public InMemoryLedgerRange(LedgerRange range, long startSeqId) {
+            this(range, startSeqId, null);
+        }
+
+    }
+
+    static class TopicInfo {
+        /**
+         * stores the last message-seq-id vector that has been pushed to BK for
+         * persistence (but not necessarily acked yet by BK)
+         * 
+         */
+        MessageSeqId lastSeqIdPushed;
+
+        /**
+         * stores the last message-id that has been acked by BK. This number is
+         * basically used for limiting scans to not read past what has been
+         * persisted by BK
+         */
+        long lastEntryIdAckedInCurrentLedger = -1; // because BK ledgers starts
+        // at 0
+
+        /**
+         * stores a sorted structure of the ledgers for a topic, mapping from
+         * the endSeqIdIncluded to the ledger info. This structure does not
+         * include the current ledger
+         */
+        TreeMap<Long, InMemoryLedgerRange> ledgerRanges = new TreeMap<Long, InMemoryLedgerRange>();
+
+        /**
+         * This is the handle of the current ledger that is being used to write
+         * messages
+         */
+        InMemoryLedgerRange currentLedgerRange;
+
+    }
+
+    Map<ByteString, TopicInfo> topicInfos = new ConcurrentHashMap<ByteString, TopicInfo>();
+
+    TopicOpQueuer queuer;
+
+    /**
+     * Instantiates a BookKeeperPersistence manager.
+     * 
+     * @param bk
+     *            a reference to bookkeeper to use.
+     * @param zk
+     *            a zookeeper handle to use.
+     * @param zkPrefix
+     *            the zookeeper subtree that stores the topic to ledger
+     *            information. if this prefix does not exist, it will be
+     *            created.
+     */
+    public BookkeeperPersistenceManager(BookKeeper bk, ZooKeeper zk, TopicManager tm, ServerConfiguration cfg,
+            ScheduledExecutorService executor) {
+        this.bk = bk;
+        this.zk = zk;
+        this.cfg = cfg;
+        queuer = new TopicOpQueuer(executor);
+        tm.addTopicOwnershipChangeListener(this);
+    }
+
+    class RangeScanOp extends TopicOpQueuer.SynchronousOp {
+        RangeScanRequest request;
+        int numMessagesRead = 0;
+        long totalSizeRead = 0;
+        TopicInfo topicInfo;
+
+        public RangeScanOp(RangeScanRequest request) {
+            queuer.super(request.topic);
+            this.request = request;
+        }
+
+        @Override
+        protected void runInternal() {
+            topicInfo = topicInfos.get(topic);
+
+            if (topicInfo == null) {
+                request.callback.scanFailed(request.ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
+                return;
+            }
+
+            startReadingFrom(request.startSeqId);
+
+        }
+
+        protected void read(final InMemoryLedgerRange imlr, final long startSeqId, final long endSeqId) {
+
+            if (imlr.handle == null) {
+
+                bk.asyncOpenLedger(imlr.range.getLedgerId(), DigestType.CRC32, passwd,
+                        new SafeAsynBKCallback.OpenCallback() {
+                            @Override
+                            public void safeOpenComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
+                                if (rc == BKException.Code.OK) {
+                                    imlr.handle = ledgerHandle;
+                                    read(imlr, startSeqId, endSeqId);
+                                    return;
+                                }
+                                BKException bke = BKException.create(rc);
+                                logger.error("Could not open ledger: " + imlr.range.getLedgerId() + " for topic: "
+                                        + topic);
+                                request.callback.scanFailed(ctx, new PubSubException.ServiceDownException(bke));
+                                return;
+                            }
+                        }, request.ctx);
+                return;
+            }
+
+            // ledger handle is not null, we can read from it
+            long correctedEndSeqId = Math.min(startSeqId + request.messageLimit - numMessagesRead - 1, endSeqId);
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Issuing a bk read for ledger: " + imlr.handle.getId() + " from entry-id: "
+                        + (startSeqId - imlr.startSeqIdIncluded) + " to entry-id: "
+                        + (correctedEndSeqId - imlr.startSeqIdIncluded));
+            }
+
+            imlr.handle.asyncReadEntries(startSeqId - imlr.startSeqIdIncluded, correctedEndSeqId
+                    - imlr.startSeqIdIncluded, new SafeAsynBKCallback.ReadCallback() {
+
+                long expectedEntryId = startSeqId - imlr.startSeqIdIncluded;
+
+                @Override
+                public void safeReadComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+                    if (rc != BKException.Code.OK || !seq.hasMoreElements()) {
+                        BKException bke = BKException.create(rc);
+                        logger.error("Error while reading from ledger: " + imlr.range.getLedgerId() + " for topic: "
+                                + topic.toStringUtf8(), bke);
+                        request.callback.scanFailed(request.ctx, new PubSubException.ServiceDownException(bke));
+                        return;
+                    }
+
+                    LedgerEntry entry = null;
+                    while (seq.hasMoreElements()) {
+                        entry = seq.nextElement();
+                        Message message;
+                        try {
+                            message = Message.parseFrom(entry.getEntryInputStream());
+                        } catch (IOException e) {
+                            String msg = "Unreadable message found in ledger: " + imlr.range.getLedgerId()
+                                    + " for topic: " + topic.toStringUtf8();
+                            logger.error(msg, e);
+                            request.callback.scanFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+                            return;
+                        }
+
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Read response from ledger: " + lh.getId() + " entry-id: "
+                                    + entry.getEntryId());
+                        }
+
+                        assert expectedEntryId == entry.getEntryId() : "expectedEntryId (" + expectedEntryId
+                                + ") != entry.getEntryId() (" + entry.getEntryId() + ")";
+                        assert (message.getMsgId().getLocalComponent() - imlr.startSeqIdIncluded) == expectedEntryId;
+
+                        expectedEntryId++;
+                        request.callback.messageScanned(ctx, message);
+                        numMessagesRead++;
+                        totalSizeRead += message.getBody().size();
+
+                        if (numMessagesRead >= request.messageLimit) {
+                            request.callback.scanFinished(ctx, ReasonForFinish.NUM_MESSAGES_LIMIT_EXCEEDED);
+                            return;
+                        }
+
+                        if (totalSizeRead >= request.sizeLimit) {
+                            request.callback.scanFinished(ctx, ReasonForFinish.SIZE_LIMIT_EXCEEDED);
+                            return;
+                        }
+                    }
+
+                    startReadingFrom(imlr.startSeqIdIncluded + entry.getEntryId() + 1);
+
+                }
+            }, request.ctx);
+        }
+
+        protected void startReadingFrom(long startSeqId) {
+
+            Map.Entry<Long, InMemoryLedgerRange> entry = topicInfo.ledgerRanges.ceilingEntry(startSeqId);
+
+            if (entry == null) {
+                // None of the old ledgers have this seq-id, we must use the
+                // current ledger
+                long endSeqId = topicInfo.currentLedgerRange.startSeqIdIncluded
+                        + topicInfo.lastEntryIdAckedInCurrentLedger;
+
+                if (endSeqId < startSeqId) {
+                    request.callback.scanFinished(request.ctx, ReasonForFinish.NO_MORE_MESSAGES);
+                    return;
+                }
+
+                read(topicInfo.currentLedgerRange, startSeqId, endSeqId);
+            } else {
+                read(entry.getValue(), startSeqId, entry.getValue().range.getEndSeqIdIncluded().getLocalComponent());
+            }
+
+        }
+
+    }
+
+    @Override
+    public void scanMessages(RangeScanRequest request) {
+        queuer.pushAndMaybeRun(request.topic, new RangeScanOp(request));
+    }
+
+    public void deliveredUntil(ByteString topic, Long seqId) {
+        // Nothing to do here. this is just a hint that we cannot use.
+    }
+
+    public void consumedUntil(ByteString topic, Long seqId) {
+        TopicInfo topicInfo = topicInfos.get(topic);
+        if (topicInfo == null) {
+            logger.error("Server is not responsible for topic!");
+            return;
+        }
+        for (Long endSeqIdIncluded : topicInfo.ledgerRanges.keySet()) {
+            if (endSeqIdIncluded <= seqId) {
+                // This ledger's message entries have all been consumed already
+                // so it is safe to delete it from BookKeeper.
+                long ledgerId = topicInfo.ledgerRanges.get(endSeqIdIncluded).range.getLedgerId();
+                try {
+                    bk.deleteLedger(ledgerId);
+                } catch (Exception e) {
+                    // For now, just log an exception error message. In the
+                    // future, we can have more complicated retry logic to
+                    // delete a consumed ledger. The next time the ledger
+                    // garbage collection job runs, we'll once again try to
+                    // delete this ledger.
+                    logger.error("Exception while deleting consumed ledgerId: " + ledgerId, e);
+                }
+            } else
+                break;
+        }
+    }
+
+    public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) throws ServerNotResponsibleForTopicException {
+        TopicInfo topicInfo = topicInfos.get(topic);
+
+        if (topicInfo == null) {
+            throw new PubSubException.ServerNotResponsibleForTopicException("");
+        }
+
+        return topicInfo.lastSeqIdPushed;
+    }
+
+    public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) {
+        return seqId + skipAmount;
+    }
+
+    public class PersistOp extends TopicOpQueuer.SynchronousOp {
+        PersistRequest request;
+
+        public PersistOp(PersistRequest request) {
+            queuer.super(request.topic);
+            this.request = request;
+        }
+
+        @Override
+        public void runInternal() {
+            final TopicInfo topicInfo = topicInfos.get(topic);
+
+            if (topicInfo == null) {
+                request.callback.operationFailed(request.ctx,
+                        new PubSubException.ServerNotResponsibleForTopicException(""));
+                return;
+            }
+
+            final long localSeqId = topicInfo.lastSeqIdPushed.getLocalComponent() + 1;
+            MessageSeqId.Builder builder = MessageSeqId.newBuilder();
+            if (request.message.hasMsgId()) {
+                MessageIdUtils.takeRegionMaximum(builder, topicInfo.lastSeqIdPushed, request.message.getMsgId());
+            } else {
+                builder.addAllRemoteComponents(topicInfo.lastSeqIdPushed.getRemoteComponentsList());
+            }
+            builder.setLocalComponent(localSeqId);
+
+            topicInfo.lastSeqIdPushed = builder.build();
+            Message msgToSerialize = Message.newBuilder(request.message).setMsgId(topicInfo.lastSeqIdPushed).build();
+
+            topicInfo.currentLedgerRange.handle.asyncAddEntry(msgToSerialize.toByteArray(),
+                    new SafeAsynBKCallback.AddCallback() {
+                        @Override
+                        public void safeAddComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+                            if (rc != BKException.Code.OK) {
+                                BKException bke = BKException.create(rc);
+                                logger.error("Error while persisting entry to ledger: " + lh.getId() + " for topic: "
+                                        + topic.toStringUtf8(), bke);
+
+                                // To preserve ordering guarantees, we
+                                // should give up the topic and not let
+                                // other operations through
+                                request.callback.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
+                                return;
+                            }
+
+                            if (entryId + topicInfo.currentLedgerRange.startSeqIdIncluded != localSeqId) {
+                                String msg = "Expected BK to assign entry-id: "
+                                        + (localSeqId - topicInfo.currentLedgerRange.startSeqIdIncluded)
+                                        + " but it instead assigned entry-id: " + entryId + " topic: "
+                                        + topic.toStringUtf8() + "ledger: " + lh.getId();
+                                logger.fatal(msg);
+                                throw new UnexpectedError(msg);
+                            }
+
+                            topicInfo.lastEntryIdAckedInCurrentLedger = entryId;
+                            request.callback.operationFinished(ctx, localSeqId);
+                        }
+                    }, request.ctx);
+
+        }
+    }
+
+    public void persistMessage(PersistRequest request) {
+        queuer.pushAndMaybeRun(request.topic, new PersistOp(request));
+    }
+
+    public void scanSingleMessage(ScanRequest request) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    static SafeAsynBKCallback.CloseCallback noOpCloseCallback = new SafeAsynBKCallback.CloseCallback() {
+        @Override
+        public void safeCloseComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
+        };
+    };
+
+    String ledgersPath(ByteString topic) {
+        return cfg.getZkTopicPath(new StringBuilder(), topic).append("/ledgers").toString();
+    }
+
+    class AcquireOp extends TopicOpQueuer.AsynchronousOp<Void> {
+        public AcquireOp(ByteString topic, Callback<Void> cb, Object ctx) {
+            queuer.super(topic, cb, ctx);
+        }
+
+        @Override
+        public void run() {
+            if (topicInfos.containsKey(topic)) {
+                // Already acquired, do nothing
+                cb.operationFinished(ctx, null);
+                return;
+            }
+            // read topic ledgers node data
+            final String zNodePath = ledgersPath(topic);
+
+            zk.getData(zNodePath, false, new SafeAsyncZKCallback.DataCallback() {
+                @Override
+                public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+                    if (rc == Code.OK.intValue()) {
+                        processTopicLedgersNodeData(data, stat.getVersion());
+                        return;
+                    }
+
+                    if (rc == Code.NONODE.intValue()) {
+                        // create it
+                        final byte[] initialData = LedgerRanges.getDefaultInstance().toByteArray();
+                        ZkUtils.createFullPathOptimistic(zk, zNodePath, initialData, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                                CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
+                                    @Override
+                                    public void safeProcessResult(int rc, String path, Object ctx, String name) {
+                                        if (rc != Code.OK.intValue()) {
+                                            KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+                                                    "Could not create ledgers node for topic: " + topic.toStringUtf8(),
+                                                    path, rc);
+                                            cb.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+                                            return;
+                                        }
+                                        // initial version is version 1
+                                        // (guessing)
+                                        processTopicLedgersNodeData(initialData, 0);
+                                    }
+                                }, ctx);
+                        return;
+                    }
+
+                    // otherwise some other error
+                    KeeperException ke = ZkUtils.logErrorAndCreateZKException("Could not read ledgers node for topic: "
+                            + topic.toStringUtf8(), path, rc);
+                    cb.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+
+                }
+            }, ctx);
+        }
+
+        void processTopicLedgersNodeData(byte[] data, int version) {
+
+            final LedgerRanges ranges;
+            try {
+                ranges = LedgerRanges.parseFrom(data);
+            } catch (InvalidProtocolBufferException e) {
+                String msg = "Ledger ranges for topic:" + topic.toStringUtf8() + " could not be deserialized";
+                logger.fatal(msg, e);
+                cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+                return;
+            }
+
+            Iterator<LedgerRange> lrIterator = ranges.getRangesList().iterator();
+            TopicInfo topicInfo = new TopicInfo();
+
+            long startOfLedger = 1;
+
+            while (lrIterator.hasNext()) {
+                LedgerRange range = lrIterator.next();
+
+                if (range.hasEndSeqIdIncluded()) {
+                    // this means it was a valid and completely closed ledger
+                    long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
+                    topicInfo.ledgerRanges.put(endOfLedger, new InMemoryLedgerRange(range, startOfLedger));
+                    startOfLedger = endOfLedger + 1;
+                    continue;
+                }
+
+                // If it doesn't have a valid end, it must be the last ledger
+                if (lrIterator.hasNext()) {
+                    String msg = "Ledger-id: " + range.getLedgerId() + " for topic: " + topic.toStringUtf8()
+                            + " is not the last one but still does not have an end seq-id";
+                    logger.fatal(msg);
+                    cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+                    return;
+                }
+
+                // The last ledger does not have a valid seq-id, lets try to
+                // find it out
+                recoverLastTopicLedgerAndOpenNewOne(range.getLedgerId(), version, topicInfo);
+                return;
+            }
+
+            // All ledgers were found properly closed, just start a new one
+            openNewTopicLedger(version, topicInfo);
+        }
+
+        /**
+         * Recovers the last ledger, opens a new one, and persists the new
+         * information to ZK
+         * 
+         * @param ledgerId
+         *            Ledger to be recovered
+         */
+        private void recoverLastTopicLedgerAndOpenNewOne(final long ledgerId, final int expectedVersionOfLedgerNode,
+                final TopicInfo topicInfo) {
+
+            bk.asyncOpenLedger(ledgerId, DigestType.CRC32, passwd, new SafeAsynBKCallback.OpenCallback() {
+                @Override
+                public void safeOpenComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
+
+                    if (rc != BKException.Code.OK) {
+                        BKException bke = BKException.create(rc);
+                        logger.error("While acquiring topic: " + topic.toStringUtf8()
+                                + ", could not open unrecovered ledger: " + ledgerId, bke);
+                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
+                        return;
+                    }
+
+                    final long numEntriesInLastLedger = ledgerHandle.getLastAddConfirmed() + 1;
+
+                    if (numEntriesInLastLedger <= 0) {
+                        // this was an empty ledger that someone created but
+                        // couldn't write to, so just ignore it
+                        logger.info("Pruning empty ledger: " + ledgerId + " for topic: " + topic.toStringUtf8());
+                        closeLedger(ledgerHandle);
+                        openNewTopicLedger(expectedVersionOfLedgerNode, topicInfo);
+                        return;
+                    }
+
+                    // we have to read the last entry of the ledger to find
+                    // out the last seq-id
+
+                    ledgerHandle.asyncReadEntries(numEntriesInLastLedger - 1, numEntriesInLastLedger - 1,
+                            new SafeAsynBKCallback.ReadCallback() {
+                                @Override
+                                public void safeReadComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
+                                        Object ctx) {
+                                    if (rc != BKException.Code.OK || !seq.hasMoreElements()) {
+                                        BKException bke = BKException.create(rc);
+                                        logger.error("While recovering ledger: " + ledgerId + " for topic: "
+                                                + topic.toStringUtf8() + ", could not read last entry", bke);
+                                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
+                                        return;
+                                    }
+
+                                    Message lastMessage;
+                                    try {
+                                        lastMessage = Message.parseFrom(seq.nextElement().getEntry());
+                                    } catch (InvalidProtocolBufferException e) {
+                                        String msg = "While recovering ledger: " + ledgerId + " for topic: "
+                                                + topic.toStringUtf8() + ", could not deserialize last message";
+                                        logger.error(msg, e);
+                                        cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+                                        return;
+                                    }
+
+                                    long prevLedgerEnd = topicInfo.ledgerRanges.isEmpty() ? 0 : topicInfo.ledgerRanges
+                                            .lastKey();
+                                    LedgerRange lr = LedgerRange.newBuilder().setLedgerId(ledgerId)
+                                            .setEndSeqIdIncluded(lastMessage.getMsgId()).build();
+                                    topicInfo.ledgerRanges.put(lr.getEndSeqIdIncluded().getLocalComponent(),
+                                            new InMemoryLedgerRange(lr, prevLedgerEnd + 1, lh));
+
+                                    logger.info("Recovered unclosed ledger: " + ledgerId + " for topic: "
+                                            + topic.toStringUtf8() + " with " + numEntriesInLastLedger + " entries");
+
+                                    openNewTopicLedger(expectedVersionOfLedgerNode, topicInfo);
+                                }
+                            }, ctx);
+
+                }
+
+            }, ctx);
+        }
+
+        /**
+         * 
+         * @param requiredVersionOfLedgersNode
+         *            The version of the ledgers node when we read it, should be
+         *            the same when we try to write
+         */
+        private void openNewTopicLedger(final int expectedVersionOfLedgersNode, final TopicInfo topicInfo) {
+            final int ENSEMBLE_SIZE = 3;
+            final int QUORUM_SIZE = 2;
+
+            bk.asyncCreateLedger(ENSEMBLE_SIZE, QUORUM_SIZE, DigestType.CRC32, passwd,
+                    new SafeAsynBKCallback.CreateCallback() {
+                        boolean processed = false;
+
+                        @Override
+                        public void safeCreateComplete(int rc, LedgerHandle lh, Object ctx) {
+                            if (processed) {
+                                return;
+                            } else {
+                                processed = true;
+                            }
+
+                            if (rc != BKException.Code.OK) {
+                                BKException bke = BKException.create(rc);
+                                logger.error("Could not create new ledger while acquiring topic: "
+                                        + topic.toStringUtf8(), bke);
+                                cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
+                                return;
+                            }
+
+                            topicInfo.lastSeqIdPushed = topicInfo.ledgerRanges.isEmpty() ? MessageSeqId.newBuilder()
+                                    .setLocalComponent(0).build() : topicInfo.ledgerRanges.lastEntry().getValue().range
+                                    .getEndSeqIdIncluded();
+
+                            LedgerRange lastRange = LedgerRange.newBuilder().setLedgerId(lh.getId()).build();
+                            topicInfo.currentLedgerRange = new InMemoryLedgerRange(lastRange, topicInfo.lastSeqIdPushed
+                                    .getLocalComponent() + 1, lh);
+
+                            // Persist the fact that we started this new
+                            // ledger to ZK
+
+                            LedgerRanges.Builder builder = LedgerRanges.newBuilder();
+                            for (InMemoryLedgerRange imlr : topicInfo.ledgerRanges.values()) {
+                                builder.addRanges(imlr.range);
+                            }
+                            builder.addRanges(lastRange);
+
+                            writeTopicLedgersNode(topic, builder.build().toByteArray(), expectedVersionOfLedgersNode,
+                                    topicInfo);
+                            return;
+                        }
+                    }, ctx);
+        }
+
+        void writeTopicLedgersNode(final ByteString topic, byte[] data, int expectedVersion, final TopicInfo topicInfo) {
+            final String zNodePath = ledgersPath(topic);
+
+            zk.setData(zNodePath, data, expectedVersion, new SafeAsyncZKCallback.StatCallback() {
+                @Override
+                public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
+                    if (rc != KeeperException.Code.OK.intValue()) {
+                        KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+                                "Could not write ledgers node for topic: " + topic.toStringUtf8(), path, rc);
+                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+                        return;
+                    }
+
+                    // Finally, all done
+                    topicInfos.put(topic, topicInfo);
+                    cb.operationFinished(ctx, null);
+                }
+            }, ctx);
+
+        }
+    }
+
+    /**
+     * acquire ownership of a topic, doing whatever is needed to be able to
+     * perform reads and writes on that topic from here on
+     * 
+     * @param topic
+     * @param callback
+     * @param ctx
+     */
+    @Override
+    public void acquiredTopic(ByteString topic, Callback<Void> callback, Object ctx) {
+        queuer.pushAndMaybeRun(topic, new AcquireOp(topic, callback, ctx));
+    }
+
+    public void closeLedger(LedgerHandle lh) {
+        // try {
+        // lh.asyncClose(noOpCloseCallback, null);
+        // } catch (InterruptedException e) {
+        // logger.error(e);
+        // Thread.currentThread().interrupt();
+        // }
+    }
+
+    class ReleaseOp extends TopicOpQueuer.SynchronousOp {
+
+        public ReleaseOp(ByteString topic) {
+            queuer.super(topic);
+        }
+
+        @Override
+        public void runInternal() {
+            TopicInfo topicInfo = topicInfos.remove(topic);
+
+            if (topicInfo == null) {
+                return;
+            }
+
+            for (InMemoryLedgerRange imlr : topicInfo.ledgerRanges.values()) {
+                if (imlr.handle != null) {
+                    closeLedger(imlr.handle);
+                }
+            }
+
+            if (topicInfo.currentLedgerRange != null && topicInfo.currentLedgerRange.handle != null) {
+                closeLedger(topicInfo.currentLedgerRange.handle);
+            }
+        }
+    }
+
+    /**
+     * Release any resources for the topic that might be currently held. There
+     * wont be any subsequent reads or writes on that topic coming
+     * 
+     * @param topic
+     */
+    @Override
+    public void lostTopic(ByteString topic) {
+        queuer.pushAndMaybeRun(topic, new ReleaseOp(topic));
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/CacheKey.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/CacheKey.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/CacheKey.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/CacheKey.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.server.common.ByteStringInterner;
+
+public class CacheKey {
+
+    ByteString topic;
+    long seqId;
+
+    public CacheKey(ByteString topic, long seqId) {
+        this.topic = ByteStringInterner.intern(topic);
+        this.seqId = seqId;
+    }
+
+    public ByteString getTopic() {
+        return topic;
+    }
+
+    public long getSeqId() {
+        return seqId;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (int) (seqId ^ (seqId >>> 32));
+        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        CacheKey other = (CacheKey) obj;
+        if (seqId != other.seqId)
+            return false;
+        if (topic == null) {
+            if (other.topic != null)
+                return false;
+        } else if (!topic.equals(other.topic))
+            return false;
+        return true;
+    }
+    
+    @Override
+    public String toString() {
+        return "(" + topic.toStringUtf8() + "," + seqId + ")"; 
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/CacheValue.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/CacheValue.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/CacheValue.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/CacheValue.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.log4j.Logger;
+
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.server.common.UnexpectedError;
+
+/**
+ * This class is NOT thread safe. It need not be thread-safe because our
+ * read-ahead cache will operate with only 1 thread
+ * 
+ */
+public class CacheValue {
+
+    static Logger logger = Logger.getLogger(ReadAheadCache.class);
+
+    Queue<ScanCallbackWithContext> callbacks = new LinkedList<ScanCallbackWithContext>();
+    Message message;
+    long timeOfAddition = 0;
+
+    public CacheValue() {
+    }
+
+    public boolean isStub() {
+        return message == null;
+    }
+
+    public long getTimeOfAddition() {
+        if (message == null) {
+            throw new UnexpectedError("Time of add requested from a stub");
+        }
+        return timeOfAddition;
+    }
+
+    public void setMessageAndInvokeCallbacks(Message message, long currTime) {
+        if (this.message != null) {
+            // Duplicate read for the same message coming back
+            return;
+        }
+
+        this.message = message;
+        this.timeOfAddition = currTime;
+        ScanCallbackWithContext callbackWithCtx;
+        if (logger.isDebugEnabled()) {
+            logger.debug("Invoking " + callbacks.size() + " callbacks for " + " message added to cache");
+        }
+        while ((callbackWithCtx = callbacks.poll()) != null) {
+            callbackWithCtx.getScanCallback().messageScanned(callbackWithCtx.getCtx(), message);
+        }
+    }
+
+    public void addCallback(ScanCallback callback, Object ctx) {
+        if (!isStub()) {
+            // call the callback right away
+            callback.messageScanned(ctx, message);
+            return;
+        }
+
+        callbacks.add(new ScanCallbackWithContext(callback, ctx));
+    }
+
+    public Message getMessage() {
+        return message;
+    }
+
+    public void setErrorAndInvokeCallbacks(Exception exception) {
+        ScanCallbackWithContext callbackWithCtx;
+        while ((callbackWithCtx = callbacks.poll()) != null) {
+            callbackWithCtx.getScanCallback().scanFailed(callbackWithCtx.getCtx(), exception);
+        }
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/Factory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/Factory.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/Factory.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/Factory.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+public interface Factory<T> {
+    public T newInstance();
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.sql.rowset.serial.SerialBlob;
+
+import org.apache.log4j.Logger;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protoextensions.MessageIdUtils;
+import org.apache.hedwig.server.persistence.ScanCallback.ReasonForFinish;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.FileUtils;
+
+public class LocalDBPersistenceManager implements PersistenceManagerWithRangeScan {
+    static Logger logger = Logger.getLogger(LocalDBPersistenceManager.class);
+
+    static String connectionURL;
+
+    static {
+        try {
+            File tempDir = FileUtils.createTempDirectory("derby", null);
+
+            // Since derby needs to create it, I will have to delete it first
+            if (!tempDir.delete()) {
+                throw new IOException("Could not delete dir: " + tempDir.getAbsolutePath());
+            }
+            connectionURL = "jdbc:derby:" + tempDir.getAbsolutePath() + ";create=true";
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    private static final ThreadLocal<Connection> threadLocalConnection = new ThreadLocal<Connection>() {
+        @Override
+        protected Connection initialValue() {
+            try {
+                return DriverManager.getConnection(connectionURL);
+            } catch (SQLException e) {
+                logger.error("Could not connect to derby", e);
+                return null;
+            }
+        }
+    };
+    static final String ID_FIELD_NAME = "id";
+    static final String MSG_FIELD_NAME = "msg";
+    static final String driver = "org.apache.derby.jdbc.EmbeddedDriver";
+
+    static final int SCAN_CHUNK = 1000;
+
+    /**
+     * Having trouble restarting the database multiple times from within the
+     * same jvm. Hence to facilitate units tests, we are just going to have a
+     * version number that we will append to every table name. This version
+     * number will be incremented in lieu of shutting down the database and
+     * restarting it, so that we get different table names, and it behaves like
+     * a brand new database
+     */
+    private int version = 0;
+
+    ConcurrentMap<ByteString, MessageSeqId> currTopicSeqIds = new ConcurrentHashMap<ByteString, MessageSeqId>();
+
+    static LocalDBPersistenceManager instance = new LocalDBPersistenceManager();
+
+    public static LocalDBPersistenceManager instance() {
+        return instance;
+    }
+
+    private LocalDBPersistenceManager() {
+
+        try {
+            Class.forName(driver).newInstance();
+            logger.info("Derby Driver loaded");
+        } catch (java.lang.ClassNotFoundException e) {
+            logger.error("Derby driver not found", e);
+        } catch (InstantiationException e) {
+            logger.error("Could not instantiate derby driver", e);
+        } catch (IllegalAccessException e) {
+            logger.error("Could not instantiate derby driver", e);
+        }
+    }
+
+    /**
+     * Ensures that at least the default seq-id exists in the map for the given
+     * topic. Checks for race conditions (.e.g, another thread inserts the
+     * default id before us), and returns the latest seq-id value in the map
+     * 
+     * @param topic
+     * @return
+     */
+    private MessageSeqId ensureSeqIdExistsForTopic(ByteString topic) {
+        MessageSeqId presentSeqIdInMap = currTopicSeqIds.get(topic);
+
+        if (presentSeqIdInMap != null) {
+            return presentSeqIdInMap;
+        }
+
+        presentSeqIdInMap = MessageSeqId.newBuilder().setLocalComponent(0).build();
+        MessageSeqId oldSeqIdInMap = currTopicSeqIds.putIfAbsent(topic, presentSeqIdInMap);
+
+        if (oldSeqIdInMap != null) {
+            return oldSeqIdInMap;
+        }
+        return presentSeqIdInMap;
+
+    }
+
+    /**
+     * Adjust the current seq id of the topic based on the message we are about
+     * to publish. The local component of the current seq-id is always
+     * incremented by 1. For the other components, there are two cases:
+     * 
+     * 1. If the message to be published doesn't have a seq-id (locally
+     * published messages), the other components are left as is.
+     * 
+     * 2. If the message to be published has a seq-id, we take the max of the
+     * current one we have, and that in the message to be published.
+     * 
+     * @param topic
+     * @param messageToPublish
+     * @return The value of the local seq-id obtained after incrementing the
+     *         local component. This value should be used as an id while
+     *         persisting to Derby
+     * @throws UnexpectedConditionException
+     */
+    private long adjustTopicSeqIdForPublish(ByteString topic, Message messageToPublish)
+            throws UnexpectedConditionException {
+        long retValue = 0;
+        MessageSeqId oldId;
+        MessageSeqId.Builder newIdBuilder = MessageSeqId.newBuilder();
+
+        do {
+            oldId = ensureSeqIdExistsForTopic(topic);
+
+            // Increment our own component by 1
+            retValue = oldId.getLocalComponent() + 1;
+            newIdBuilder.setLocalComponent(retValue);
+
+            if (messageToPublish.hasMsgId()) {
+                // take a region-wise max
+                MessageIdUtils.takeRegionMaximum(newIdBuilder, messageToPublish.getMsgId(), oldId);
+
+            } else {
+                newIdBuilder.addAllRemoteComponents(oldId.getRemoteComponentsList());
+            }
+        } while (!currTopicSeqIds.replace(topic, oldId, newIdBuilder.build()));
+
+        return retValue;
+
+    }
+
+    public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) {
+        return seqId + skipAmount;
+    }
+
+    public void persistMessage(PersistRequest request) {
+
+        Connection conn = threadLocalConnection.get();
+
+        Callback<Long> callback = request.getCallback();
+        Object ctx = request.getCtx();
+        ByteString topic = request.getTopic();
+        Message message = request.getMessage();
+
+        if (conn == null) {
+            callback.operationFailed(ctx, new ServiceDownException("Not connected to derby"));
+            return;
+        }
+
+        long seqId;
+
+        try {
+            seqId = adjustTopicSeqIdForPublish(topic, message);
+        } catch (UnexpectedConditionException e) {
+            callback.operationFailed(ctx, e);
+            return;
+        }
+        PreparedStatement stmt;
+
+        boolean triedCreatingTable = false;
+        while (true) {
+            try {
+                message.getBody();
+                stmt = conn.prepareStatement("INSERT INTO " + getTableNameForTopic(topic) + " VALUES(?,?)");
+                stmt.setLong(1, seqId);
+                stmt.setBlob(2, new SerialBlob(message.toByteArray()));
+
+                int rowCount = stmt.executeUpdate();
+                stmt.close();
+                if (rowCount != 1) {
+                    logger.error("Unexpected number of affected rows from derby");
+                    callback.operationFailed(ctx, new ServiceDownException("Unexpected response from derby"));
+                    return;
+                }
+                break;
+            } catch (SQLException sqle) {
+                String theError = (sqle).getSQLState();
+                if (theError.equals("42X05") && !triedCreatingTable) {
+                    createTable(conn, topic);
+                    triedCreatingTable = true;
+                    continue;
+                }
+
+                logger.error("Error while executing derby insert", sqle);
+                callback.operationFailed(ctx, new ServiceDownException(sqle));
+                return;
+            }
+        }
+        callback.operationFinished(ctx, seqId);
+    }
+
+    /*
+     * This method does not throw an exception because another thread might
+     * sneak in and create the table before us
+     */
+    private void createTable(Connection conn, ByteString topic) {
+
+        try {
+            Statement stmt = conn.createStatement();
+            String tableName = getTableNameForTopic(topic);
+            stmt.execute("CREATE TABLE " + tableName + " (" + ID_FIELD_NAME + " BIGINT NOT NULL CONSTRAINT ID_PK_"
+                    + tableName + " PRIMARY KEY," + MSG_FIELD_NAME + " BLOB(2M) NOT NULL)");
+        } catch (SQLException e) {
+            logger.debug("Could not create table", e);
+        }
+    }
+
+    public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) {
+        return ensureSeqIdExistsForTopic(topic);
+    }
+
+    public void scanSingleMessage(ScanRequest request) {
+        scanMessagesInternal(request.getTopic(), request.getStartSeqId(), 1, Long.MAX_VALUE, request.getCallback(),
+                request.getCtx(), 1);
+        return;
+    }
+
+    public void scanMessages(RangeScanRequest request) {
+        scanMessagesInternal(request.getTopic(), request.getStartSeqId(), request.getMessageLimit(), request
+                .getSizeLimit(), request.getCallback(), request.getCtx(), SCAN_CHUNK);
+        return;
+    }
+
+    private String getTableNameForTopic(ByteString topic) {
+        return (topic.toStringUtf8() + "_" + version);
+    }
+
+    private void scanMessagesInternal(ByteString topic, long startSeqId, int messageLimit, long sizeLimit,
+            ScanCallback callback, Object ctx, int scanChunk) {
+
+        Connection conn = threadLocalConnection.get();
+
+        if (conn == null) {
+            callback.scanFailed(ctx, new ServiceDownException("Not connected to derby"));
+            return;
+        }
+
+        long currentSeqId;
+        currentSeqId = startSeqId;
+
+        PreparedStatement stmt;
+        try {
+            try {
+                stmt = conn.prepareStatement("SELECT * FROM " + getTableNameForTopic(topic) + " WHERE " + ID_FIELD_NAME
+                        + " >= ?  AND " + ID_FIELD_NAME + " <= ?");
+
+            } catch (SQLException sqle) {
+                String theError = (sqle).getSQLState();
+                if (theError.equals("42X05")) {
+                    // No table, scan is over
+                    callback.scanFinished(ctx, ReasonForFinish.NO_MORE_MESSAGES);
+                    return;
+                } else {
+                    throw sqle;
+                }
+            }
+
+            int numMessages = 0;
+            long totalSize = 0;
+
+            while (true) {
+
+                stmt.setLong(1, currentSeqId);
+                stmt.setLong(2, currentSeqId + scanChunk);
+
+                if (!stmt.execute()) {
+                    String errorMsg = "Select query did not return a result set";
+                    logger.error(errorMsg);
+                    stmt.close();
+                    callback.scanFailed(ctx, new ServiceDownException(errorMsg));
+                    return;
+                }
+
+                ResultSet resultSet = stmt.getResultSet();
+
+                if (!resultSet.next()) {
+                    stmt.close();
+                    callback.scanFinished(ctx, ReasonForFinish.NO_MORE_MESSAGES);
+                    return;
+                }
+
+                do {
+
+                    long localSeqId = resultSet.getLong(1);
+
+                    Message.Builder messageBuilder = Message.newBuilder().mergeFrom(resultSet.getBinaryStream(2));
+
+                    // Merge in the local seq-id since that is not stored with
+                    // the message
+                    Message message = MessageIdUtils.mergeLocalSeqId(messageBuilder, localSeqId);
+
+                    callback.messageScanned(ctx, message);
+                    numMessages++;
+                    totalSize += message.getBody().size();
+
+                    if (numMessages > messageLimit) {
+                        stmt.close();
+                        callback.scanFinished(ctx, ReasonForFinish.NUM_MESSAGES_LIMIT_EXCEEDED);
+                        return;
+                    } else if (totalSize > sizeLimit) {
+                        stmt.close();
+                        callback.scanFinished(ctx, ReasonForFinish.SIZE_LIMIT_EXCEEDED);
+                        return;
+                    }
+
+                } while (resultSet.next());
+
+                currentSeqId += SCAN_CHUNK;
+            }
+        } catch (SQLException e) {
+            logger.error("SQL Exception", e);
+            callback.scanFailed(ctx, new ServiceDownException(e));
+            return;
+        } catch (IOException e) {
+            logger.error("Message stored in derby is not parseable", e);
+            callback.scanFailed(ctx, new ServiceDownException(e));
+            return;
+        }
+
+    }
+
+    public void deliveredUntil(ByteString topic, Long seqId) {
+        // noop
+    }
+
+    public void consumedUntil(ByteString topic, Long seqId) {
+        Connection conn = threadLocalConnection.get();
+        if (conn == null) {
+            logger.error("Not connected to derby");
+            return;
+        }
+        PreparedStatement stmt;
+        try {
+            stmt = conn.prepareStatement("DELETE FROM " + getTableNameForTopic(topic) + " WHERE " + ID_FIELD_NAME
+                    + " <= ?");
+            stmt.setLong(1, seqId);
+            int rowCount = stmt.executeUpdate();
+            logger.debug("Deleted " + rowCount + " records for topic: " + topic.toStringUtf8() + ", seqId: " + seqId);
+            stmt.close();
+        } catch (SQLException sqle) {
+            String theError = (sqle).getSQLState();
+            if (theError.equals("42X05")) {
+                logger.warn("Table for topic (" + topic + ") does not exist so no consumed messages to delete!");
+            } else
+                logger.error("Error while executing derby delete for consumed messages", sqle);
+        }
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        if (driver.equals("org.apache.derby.jdbc.EmbeddedDriver")) {
+            boolean gotSQLExc = false;
+            // This is weird: on normal shutdown, it throws an exception
+            try {
+                DriverManager.getConnection("jdbc:derby:;shutdown=true").close();
+            } catch (SQLException se) {
+                if (se.getSQLState().equals("XJ015")) {
+                    gotSQLExc = true;
+                }
+            }
+            if (!gotSQLExc) {
+                logger.error("Database did not shut down normally");
+            } else {
+                logger.info("Database shut down normally");
+            }
+        }
+        super.finalize();
+    }
+
+    public void reset() {
+        // just move the namespace over to the next one
+        version++;
+        currTopicSeqIds.clear();
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/MapMethods.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/MapMethods.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/MapMethods.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/MapMethods.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class MapMethods {
+
+    public static <K, V> V getAfterInsertingIfAbsent(Map<K, V> map, K key, Factory<V> valueFactory) {
+        V value = map.get(key);
+
+        if (value == null) {
+            value = valueFactory.newInstance();
+            map.put(key, value);
+        }
+
+        return value;
+    }
+
+    public static <K, V, Z extends Collection<V>> void addToMultiMap(Map<K, Z> map, K key, V value,
+            Factory<Z> valueFactory) {
+        Collection<V> collection = getAfterInsertingIfAbsent(map, key, valueFactory);
+
+        collection.add(value);
+
+    }
+
+    public static <K, V, Z extends Collection<V>> boolean removeFromMultiMap(Map<K, Z> map, K key, V value) {
+        Collection<V> collection = map.get(key);
+
+        if (collection == null) {
+            return false;
+        }
+
+        if (!collection.remove(value)) {
+            return false;
+        } else {
+            if (collection.isEmpty()) {
+                map.remove(key);
+            }
+            return true;
+        }
+
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistRequest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistRequest.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistRequest.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistRequest.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * Encapsulates a request to persist a given message on a given topic. The
+ * request is completed asynchronously, callback and context are provided
+ * 
+ */
+public class PersistRequest {
+    ByteString topic;
+    Message message;
+    Callback<Long> callback;
+    Object ctx;
+
+    public PersistRequest(ByteString topic, Message message, Callback<Long> callback, Object ctx) {
+        this.topic = topic;
+        this.message = message;
+        this.callback = callback;
+        this.ctx = ctx;
+    }
+
+    public ByteString getTopic() {
+        return topic;
+    }
+
+    public Message getMessage() {
+        return message;
+    }
+
+    public Callback<Long> getCallback() {
+        return callback;
+    }
+
+    public Object getCtx() {
+        return ctx;
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+
+/**
+ * An implementation of this interface will persist messages in order and assign
+ * a seqId to each persisted message. SeqId need not be a single number in
+ * general. SeqId is opaque to all layers above {@link PersistenceManager}. Only
+ * the {@link PersistenceManager} needs to understand the format of the seqId
+ * and maintain it in such a way that there is a total order on the seqIds of a
+ * topic.
+ * 
+ */
+public interface PersistenceManager {
+
+    /**
+     * Executes the given persist request asynchronously. When done, the
+     * callback specified in the request object is called with the result of the
+     * operation set to the {@link LocalMessageSeqId} assigned to the persisted
+     * message.
+     */
+    public void persistMessage(PersistRequest request);
+
+    /**
+     * Get the seqId of the last message that has been persisted to the given
+     * topic. The returned seqId will be set as the consume position of any
+     * brand new subscription on this topic.
+     * 
+     * Note that the return value may quickly become invalid because a
+     * {@link #persistMessage(String, PublishedMessage)} call from another
+     * thread succeeds. For us, the typical use case is choosing the consume
+     * position of a new subscriber. Since the subscriber need not receive all
+     * messages that are published while the subscribe call is in progress, such
+     * loose semantics from this method is acceptable.
+     * 
+     * @param topic
+     * @return the seqId of the last persisted message.
+     * @throws ServerNotResponsibleForTopicException
+     */
+    public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) throws ServerNotResponsibleForTopicException;
+
+    /**
+     * Executes the given scan request
+     * 
+     */
+    public void scanSingleMessage(ScanRequest request);
+
+    /**
+     * Gets the next seq-id. This method should never block.
+     */
+    public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount);
+
+    /**
+     * Hint that the messages until the given seqId have been delivered and wont
+     * be needed unless there is a failure of some kind
+     */
+    public void deliveredUntil(ByteString topic, Long seqId);
+
+    /**
+     * Hint that the messages until the given seqId have been consumed by all
+     * subscribers to the topic and no longer need to be stored. The
+     * implementation classes can decide how and if they want to garbage collect
+     * and delete these older topic messages that are no longer needed.
+     * 
+     * @param topic
+     *            Topic
+     * @param seqId
+     *            Message local sequence ID
+     */
+    public void consumedUntil(ByteString topic, Long seqId);
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+public interface PersistenceManagerWithRangeScan extends PersistenceManager {
+    /**
+     * Executes the given range scan request
+     * 
+     * @param request
+     */
+    public void scanMessages(RangeScanRequest request);
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * Encapsulates a request to scan messages on the given topic starting from the
+ * given seqId (included). A call-back {@link ScanCallback} is provided. As
+ * messages are scanned, the relevant methods of the {@link ScanCallback} are
+ * called. Two hints are provided as to when scanning should stop: in terms of
+ * number of messages scanned, or in terms of the total size of messages
+ * scanned. Scanning stops whenever one of these limits is exceeded. These
+ * checks, especially the one about message size, are only approximate. The
+ * {@link ScanCallback} used should be prepared to deal with more or less
+ * messages scanned. If an error occurs during scanning, the
+ * {@link ScanCallback} is notified of the error.
+ * 
+ */
+public class RangeScanRequest {
+    ByteString topic;
+    long startSeqId;
+    int messageLimit;
+    long sizeLimit;
+    ScanCallback callback;
+    Object ctx;
+
+    public RangeScanRequest(ByteString topic, long startSeqId, int messageLimit, long sizeLimit, ScanCallback callback,
+            Object ctx) {
+        this.topic = topic;
+        this.startSeqId = startSeqId;
+        this.messageLimit = messageLimit;
+        this.sizeLimit = sizeLimit;
+        this.callback = callback;
+        this.ctx = ctx;
+    }
+
+    public ByteString getTopic() {
+        return topic;
+    }
+
+    public long getStartSeqId() {
+        return startSeqId;
+    }
+
+    public int getMessageLimit() {
+        return messageLimit;
+    }
+
+    public long getSizeLimit() {
+        return sizeLimit;
+    }
+
+    public ScanCallback getCallback() {
+        return callback;
+    }
+
+    public Object getCtx() {
+        return ctx;
+    }
+
+}