You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/08/19 23:25:22 UTC
svn commit: r987314 [11/16] - in /hadoop/zookeeper/trunk: ./
src/contrib/hedwig/ src/contrib/hedwig/client/
src/contrib/hedwig/client/src/ src/contrib/hedwig/client/src/main/
src/contrib/hedwig/client/src/main/cpp/
src/contrib/hedwig/client/src/main/cp...
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,739 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protoextensions.MessageIdUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.common.TopicOpQueuer;
+import org.apache.hedwig.server.common.UnexpectedError;
+import org.apache.hedwig.server.persistence.ScanCallback.ReasonForFinish;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.zookeeper.SafeAsynBKCallback;
+import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
+import org.apache.hedwig.zookeeper.ZkUtils;
+
+/**
+ * This persistence manager uses zookeeper and bookkeeper to store messages.
+ *
+ * Information about topics are stored in zookeeper with a znode named after the
+ * topic that contains an ASCII encoded list with records of the following form:
+ *
+ * <pre>
+ * startSeqId(included)\tledgerId\n
+ * </pre>
+ *
+ */
+
+public class BookkeeperPersistenceManager implements PersistenceManagerWithRangeScan, TopicOwnershipChangeListener {
+ static Logger logger = Logger.getLogger(BookkeeperPersistenceManager.class);
+ static byte[] passwd = "sillysecret".getBytes();
+ private BookKeeper bk;
+ private ZooKeeper zk;
+ private ServerConfiguration cfg;
+
+ static class InMemoryLedgerRange {
+ LedgerRange range;
+ long startSeqIdIncluded; // included, for the very first ledger, this
+ // value is 1
+ LedgerHandle handle;
+
+ public InMemoryLedgerRange(LedgerRange range, long startSeqId, LedgerHandle handle) {
+ this.range = range;
+ this.startSeqIdIncluded = startSeqId;
+ this.handle = handle;
+ }
+
+ public InMemoryLedgerRange(LedgerRange range, long startSeqId) {
+ this(range, startSeqId, null);
+ }
+
+ }
+
+ static class TopicInfo {
+ /**
+ * stores the last message-seq-id vector that has been pushed to BK for
+ * persistence (but not necessarily acked yet by BK)
+ *
+ */
+ MessageSeqId lastSeqIdPushed;
+
+ /**
+ * stores the last message-id that has been acked by BK. This number is
+ * basically used for limiting scans to not read past what has been
+ * persisted by BK
+ */
+ long lastEntryIdAckedInCurrentLedger = -1; // because BK ledgers starts
+ // at 0
+
+ /**
+ * stores a sorted structure of the ledgers for a topic, mapping from
+ * the endSeqIdIncluded to the ledger info. This structure does not
+ * include the current ledger
+ */
+ TreeMap<Long, InMemoryLedgerRange> ledgerRanges = new TreeMap<Long, InMemoryLedgerRange>();
+
+ /**
+ * This is the handle of the current ledger that is being used to write
+ * messages
+ */
+ InMemoryLedgerRange currentLedgerRange;
+
+ }
+
+ Map<ByteString, TopicInfo> topicInfos = new ConcurrentHashMap<ByteString, TopicInfo>();
+
+ TopicOpQueuer queuer;
+
+ /**
+ * Instantiates a BookKeeperPersistence manager.
+ *
+ * @param bk
+ * a reference to bookkeeper to use.
+ * @param zk
+ * a zookeeper handle to use.
+ * @param zkPrefix
+ * the zookeeper subtree that stores the topic to ledger
+ * information. if this prefix does not exist, it will be
+ * created.
+ */
+ public BookkeeperPersistenceManager(BookKeeper bk, ZooKeeper zk, TopicManager tm, ServerConfiguration cfg,
+ ScheduledExecutorService executor) {
+ this.bk = bk;
+ this.zk = zk;
+ this.cfg = cfg;
+ queuer = new TopicOpQueuer(executor);
+ tm.addTopicOwnershipChangeListener(this);
+ }
+
+ class RangeScanOp extends TopicOpQueuer.SynchronousOp {
+ RangeScanRequest request;
+ int numMessagesRead = 0;
+ long totalSizeRead = 0;
+ TopicInfo topicInfo;
+
+ public RangeScanOp(RangeScanRequest request) {
+ queuer.super(request.topic);
+ this.request = request;
+ }
+
+ @Override
+ protected void runInternal() {
+ topicInfo = topicInfos.get(topic);
+
+ if (topicInfo == null) {
+ request.callback.scanFailed(request.ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
+ return;
+ }
+
+ startReadingFrom(request.startSeqId);
+
+ }
+
+ protected void read(final InMemoryLedgerRange imlr, final long startSeqId, final long endSeqId) {
+
+ if (imlr.handle == null) {
+
+ bk.asyncOpenLedger(imlr.range.getLedgerId(), DigestType.CRC32, passwd,
+ new SafeAsynBKCallback.OpenCallback() {
+ @Override
+ public void safeOpenComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
+ if (rc == BKException.Code.OK) {
+ imlr.handle = ledgerHandle;
+ read(imlr, startSeqId, endSeqId);
+ return;
+ }
+ BKException bke = BKException.create(rc);
+ logger.error("Could not open ledger: " + imlr.range.getLedgerId() + " for topic: "
+ + topic);
+ request.callback.scanFailed(ctx, new PubSubException.ServiceDownException(bke));
+ return;
+ }
+ }, request.ctx);
+ return;
+ }
+
+ // ledger handle is not null, we can read from it
+ long correctedEndSeqId = Math.min(startSeqId + request.messageLimit - numMessagesRead - 1, endSeqId);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Issuing a bk read for ledger: " + imlr.handle.getId() + " from entry-id: "
+ + (startSeqId - imlr.startSeqIdIncluded) + " to entry-id: "
+ + (correctedEndSeqId - imlr.startSeqIdIncluded));
+ }
+
+ imlr.handle.asyncReadEntries(startSeqId - imlr.startSeqIdIncluded, correctedEndSeqId
+ - imlr.startSeqIdIncluded, new SafeAsynBKCallback.ReadCallback() {
+
+ long expectedEntryId = startSeqId - imlr.startSeqIdIncluded;
+
+ @Override
+ public void safeReadComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+ if (rc != BKException.Code.OK || !seq.hasMoreElements()) {
+ BKException bke = BKException.create(rc);
+ logger.error("Error while reading from ledger: " + imlr.range.getLedgerId() + " for topic: "
+ + topic.toStringUtf8(), bke);
+ request.callback.scanFailed(request.ctx, new PubSubException.ServiceDownException(bke));
+ return;
+ }
+
+ LedgerEntry entry = null;
+ while (seq.hasMoreElements()) {
+ entry = seq.nextElement();
+ Message message;
+ try {
+ message = Message.parseFrom(entry.getEntryInputStream());
+ } catch (IOException e) {
+ String msg = "Unreadable message found in ledger: " + imlr.range.getLedgerId()
+ + " for topic: " + topic.toStringUtf8();
+ logger.error(msg, e);
+ request.callback.scanFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+ return;
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Read response from ledger: " + lh.getId() + " entry-id: "
+ + entry.getEntryId());
+ }
+
+ assert expectedEntryId == entry.getEntryId() : "expectedEntryId (" + expectedEntryId
+ + ") != entry.getEntryId() (" + entry.getEntryId() + ")";
+ assert (message.getMsgId().getLocalComponent() - imlr.startSeqIdIncluded) == expectedEntryId;
+
+ expectedEntryId++;
+ request.callback.messageScanned(ctx, message);
+ numMessagesRead++;
+ totalSizeRead += message.getBody().size();
+
+ if (numMessagesRead >= request.messageLimit) {
+ request.callback.scanFinished(ctx, ReasonForFinish.NUM_MESSAGES_LIMIT_EXCEEDED);
+ return;
+ }
+
+ if (totalSizeRead >= request.sizeLimit) {
+ request.callback.scanFinished(ctx, ReasonForFinish.SIZE_LIMIT_EXCEEDED);
+ return;
+ }
+ }
+
+ startReadingFrom(imlr.startSeqIdIncluded + entry.getEntryId() + 1);
+
+ }
+ }, request.ctx);
+ }
+
+ protected void startReadingFrom(long startSeqId) {
+
+ Map.Entry<Long, InMemoryLedgerRange> entry = topicInfo.ledgerRanges.ceilingEntry(startSeqId);
+
+ if (entry == null) {
+ // None of the old ledgers have this seq-id, we must use the
+ // current ledger
+ long endSeqId = topicInfo.currentLedgerRange.startSeqIdIncluded
+ + topicInfo.lastEntryIdAckedInCurrentLedger;
+
+ if (endSeqId < startSeqId) {
+ request.callback.scanFinished(request.ctx, ReasonForFinish.NO_MORE_MESSAGES);
+ return;
+ }
+
+ read(topicInfo.currentLedgerRange, startSeqId, endSeqId);
+ } else {
+ read(entry.getValue(), startSeqId, entry.getValue().range.getEndSeqIdIncluded().getLocalComponent());
+ }
+
+ }
+
+ }
+
+ @Override
+ public void scanMessages(RangeScanRequest request) {
+ queuer.pushAndMaybeRun(request.topic, new RangeScanOp(request));
+ }
+
+ public void deliveredUntil(ByteString topic, Long seqId) {
+ // Nothing to do here. this is just a hint that we cannot use.
+ }
+
+ public void consumedUntil(ByteString topic, Long seqId) {
+ TopicInfo topicInfo = topicInfos.get(topic);
+ if (topicInfo == null) {
+ logger.error("Server is not responsible for topic!");
+ return;
+ }
+ for (Long endSeqIdIncluded : topicInfo.ledgerRanges.keySet()) {
+ if (endSeqIdIncluded <= seqId) {
+ // This ledger's message entries have all been consumed already
+ // so it is safe to delete it from BookKeeper.
+ long ledgerId = topicInfo.ledgerRanges.get(endSeqIdIncluded).range.getLedgerId();
+ try {
+ bk.deleteLedger(ledgerId);
+ } catch (Exception e) {
+ // For now, just log an exception error message. In the
+ // future, we can have more complicated retry logic to
+ // delete a consumed ledger. The next time the ledger
+ // garbage collection job runs, we'll once again try to
+ // delete this ledger.
+ logger.error("Exception while deleting consumed ledgerId: " + ledgerId, e);
+ }
+ } else
+ break;
+ }
+ }
+
+ public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) throws ServerNotResponsibleForTopicException {
+ TopicInfo topicInfo = topicInfos.get(topic);
+
+ if (topicInfo == null) {
+ throw new PubSubException.ServerNotResponsibleForTopicException("");
+ }
+
+ return topicInfo.lastSeqIdPushed;
+ }
+
+ public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) {
+ return seqId + skipAmount;
+ }
+
+ public class PersistOp extends TopicOpQueuer.SynchronousOp {
+ PersistRequest request;
+
+ public PersistOp(PersistRequest request) {
+ queuer.super(request.topic);
+ this.request = request;
+ }
+
+ @Override
+ public void runInternal() {
+ final TopicInfo topicInfo = topicInfos.get(topic);
+
+ if (topicInfo == null) {
+ request.callback.operationFailed(request.ctx,
+ new PubSubException.ServerNotResponsibleForTopicException(""));
+ return;
+ }
+
+ final long localSeqId = topicInfo.lastSeqIdPushed.getLocalComponent() + 1;
+ MessageSeqId.Builder builder = MessageSeqId.newBuilder();
+ if (request.message.hasMsgId()) {
+ MessageIdUtils.takeRegionMaximum(builder, topicInfo.lastSeqIdPushed, request.message.getMsgId());
+ } else {
+ builder.addAllRemoteComponents(topicInfo.lastSeqIdPushed.getRemoteComponentsList());
+ }
+ builder.setLocalComponent(localSeqId);
+
+ topicInfo.lastSeqIdPushed = builder.build();
+ Message msgToSerialize = Message.newBuilder(request.message).setMsgId(topicInfo.lastSeqIdPushed).build();
+
+ topicInfo.currentLedgerRange.handle.asyncAddEntry(msgToSerialize.toByteArray(),
+ new SafeAsynBKCallback.AddCallback() {
+ @Override
+ public void safeAddComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+ if (rc != BKException.Code.OK) {
+ BKException bke = BKException.create(rc);
+ logger.error("Error while persisting entry to ledger: " + lh.getId() + " for topic: "
+ + topic.toStringUtf8(), bke);
+
+ // To preserve ordering guarantees, we
+ // should give up the topic and not let
+ // other operations through
+ request.callback.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
+ return;
+ }
+
+ if (entryId + topicInfo.currentLedgerRange.startSeqIdIncluded != localSeqId) {
+ String msg = "Expected BK to assign entry-id: "
+ + (localSeqId - topicInfo.currentLedgerRange.startSeqIdIncluded)
+ + " but it instead assigned entry-id: " + entryId + " topic: "
+ + topic.toStringUtf8() + "ledger: " + lh.getId();
+ logger.fatal(msg);
+ throw new UnexpectedError(msg);
+ }
+
+ topicInfo.lastEntryIdAckedInCurrentLedger = entryId;
+ request.callback.operationFinished(ctx, localSeqId);
+ }
+ }, request.ctx);
+
+ }
+ }
+
+ public void persistMessage(PersistRequest request) {
+ queuer.pushAndMaybeRun(request.topic, new PersistOp(request));
+ }
+
+ public void scanSingleMessage(ScanRequest request) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ static SafeAsynBKCallback.CloseCallback noOpCloseCallback = new SafeAsynBKCallback.CloseCallback() {
+ @Override
+ public void safeCloseComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
+ };
+ };
+
+ String ledgersPath(ByteString topic) {
+ return cfg.getZkTopicPath(new StringBuilder(), topic).append("/ledgers").toString();
+ }
+
+ class AcquireOp extends TopicOpQueuer.AsynchronousOp<Void> {
+ public AcquireOp(ByteString topic, Callback<Void> cb, Object ctx) {
+ queuer.super(topic, cb, ctx);
+ }
+
+ @Override
+ public void run() {
+ if (topicInfos.containsKey(topic)) {
+ // Already acquired, do nothing
+ cb.operationFinished(ctx, null);
+ return;
+ }
+ // read topic ledgers node data
+ final String zNodePath = ledgersPath(topic);
+
+ zk.getData(zNodePath, false, new SafeAsyncZKCallback.DataCallback() {
+ @Override
+ public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+ if (rc == Code.OK.intValue()) {
+ processTopicLedgersNodeData(data, stat.getVersion());
+ return;
+ }
+
+ if (rc == Code.NONODE.intValue()) {
+ // create it
+ final byte[] initialData = LedgerRanges.getDefaultInstance().toByteArray();
+ ZkUtils.createFullPathOptimistic(zk, zNodePath, initialData, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
+ @Override
+ public void safeProcessResult(int rc, String path, Object ctx, String name) {
+ if (rc != Code.OK.intValue()) {
+ KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+ "Could not create ledgers node for topic: " + topic.toStringUtf8(),
+ path, rc);
+ cb.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+ return;
+ }
+ // initial version is version 1
+ // (guessing)
+ processTopicLedgersNodeData(initialData, 0);
+ }
+ }, ctx);
+ return;
+ }
+
+ // otherwise some other error
+ KeeperException ke = ZkUtils.logErrorAndCreateZKException("Could not read ledgers node for topic: "
+ + topic.toStringUtf8(), path, rc);
+ cb.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+
+ }
+ }, ctx);
+ }
+
+ void processTopicLedgersNodeData(byte[] data, int version) {
+
+ final LedgerRanges ranges;
+ try {
+ ranges = LedgerRanges.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ String msg = "Ledger ranges for topic:" + topic.toStringUtf8() + " could not be deserialized";
+ logger.fatal(msg, e);
+ cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+ return;
+ }
+
+ Iterator<LedgerRange> lrIterator = ranges.getRangesList().iterator();
+ TopicInfo topicInfo = new TopicInfo();
+
+ long startOfLedger = 1;
+
+ while (lrIterator.hasNext()) {
+ LedgerRange range = lrIterator.next();
+
+ if (range.hasEndSeqIdIncluded()) {
+ // this means it was a valid and completely closed ledger
+ long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
+ topicInfo.ledgerRanges.put(endOfLedger, new InMemoryLedgerRange(range, startOfLedger));
+ startOfLedger = endOfLedger + 1;
+ continue;
+ }
+
+ // If it doesn't have a valid end, it must be the last ledger
+ if (lrIterator.hasNext()) {
+ String msg = "Ledger-id: " + range.getLedgerId() + " for topic: " + topic.toStringUtf8()
+ + " is not the last one but still does not have an end seq-id";
+ logger.fatal(msg);
+ cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+ return;
+ }
+
+ // The last ledger does not have a valid seq-id, lets try to
+ // find it out
+ recoverLastTopicLedgerAndOpenNewOne(range.getLedgerId(), version, topicInfo);
+ return;
+ }
+
+ // All ledgers were found properly closed, just start a new one
+ openNewTopicLedger(version, topicInfo);
+ }
+
+ /**
+ * Recovers the last ledger, opens a new one, and persists the new
+ * information to ZK
+ *
+ * @param ledgerId
+ * Ledger to be recovered
+ */
+ private void recoverLastTopicLedgerAndOpenNewOne(final long ledgerId, final int expectedVersionOfLedgerNode,
+ final TopicInfo topicInfo) {
+
+ bk.asyncOpenLedger(ledgerId, DigestType.CRC32, passwd, new SafeAsynBKCallback.OpenCallback() {
+ @Override
+ public void safeOpenComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
+
+ if (rc != BKException.Code.OK) {
+ BKException bke = BKException.create(rc);
+ logger.error("While acquiring topic: " + topic.toStringUtf8()
+ + ", could not open unrecovered ledger: " + ledgerId, bke);
+ cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
+ return;
+ }
+
+ final long numEntriesInLastLedger = ledgerHandle.getLastAddConfirmed() + 1;
+
+ if (numEntriesInLastLedger <= 0) {
+ // this was an empty ledger that someone created but
+ // couldn't write to, so just ignore it
+ logger.info("Pruning empty ledger: " + ledgerId + " for topic: " + topic.toStringUtf8());
+ closeLedger(ledgerHandle);
+ openNewTopicLedger(expectedVersionOfLedgerNode, topicInfo);
+ return;
+ }
+
+ // we have to read the last entry of the ledger to find
+ // out the last seq-id
+
+ ledgerHandle.asyncReadEntries(numEntriesInLastLedger - 1, numEntriesInLastLedger - 1,
+ new SafeAsynBKCallback.ReadCallback() {
+ @Override
+ public void safeReadComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
+ Object ctx) {
+ if (rc != BKException.Code.OK || !seq.hasMoreElements()) {
+ BKException bke = BKException.create(rc);
+ logger.error("While recovering ledger: " + ledgerId + " for topic: "
+ + topic.toStringUtf8() + ", could not read last entry", bke);
+ cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
+ return;
+ }
+
+ Message lastMessage;
+ try {
+ lastMessage = Message.parseFrom(seq.nextElement().getEntry());
+ } catch (InvalidProtocolBufferException e) {
+ String msg = "While recovering ledger: " + ledgerId + " for topic: "
+ + topic.toStringUtf8() + ", could not deserialize last message";
+ logger.error(msg, e);
+ cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+ return;
+ }
+
+ long prevLedgerEnd = topicInfo.ledgerRanges.isEmpty() ? 0 : topicInfo.ledgerRanges
+ .lastKey();
+ LedgerRange lr = LedgerRange.newBuilder().setLedgerId(ledgerId)
+ .setEndSeqIdIncluded(lastMessage.getMsgId()).build();
+ topicInfo.ledgerRanges.put(lr.getEndSeqIdIncluded().getLocalComponent(),
+ new InMemoryLedgerRange(lr, prevLedgerEnd + 1, lh));
+
+ logger.info("Recovered unclosed ledger: " + ledgerId + " for topic: "
+ + topic.toStringUtf8() + " with " + numEntriesInLastLedger + " entries");
+
+ openNewTopicLedger(expectedVersionOfLedgerNode, topicInfo);
+ }
+ }, ctx);
+
+ }
+
+ }, ctx);
+ }
+
+ /**
+ *
+ * @param requiredVersionOfLedgersNode
+ * The version of the ledgers node when we read it, should be
+ * the same when we try to write
+ */
+ private void openNewTopicLedger(final int expectedVersionOfLedgersNode, final TopicInfo topicInfo) {
+ final int ENSEMBLE_SIZE = 3;
+ final int QUORUM_SIZE = 2;
+
+ bk.asyncCreateLedger(ENSEMBLE_SIZE, QUORUM_SIZE, DigestType.CRC32, passwd,
+ new SafeAsynBKCallback.CreateCallback() {
+ boolean processed = false;
+
+ @Override
+ public void safeCreateComplete(int rc, LedgerHandle lh, Object ctx) {
+ if (processed) {
+ return;
+ } else {
+ processed = true;
+ }
+
+ if (rc != BKException.Code.OK) {
+ BKException bke = BKException.create(rc);
+ logger.error("Could not create new ledger while acquiring topic: "
+ + topic.toStringUtf8(), bke);
+ cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
+ return;
+ }
+
+ topicInfo.lastSeqIdPushed = topicInfo.ledgerRanges.isEmpty() ? MessageSeqId.newBuilder()
+ .setLocalComponent(0).build() : topicInfo.ledgerRanges.lastEntry().getValue().range
+ .getEndSeqIdIncluded();
+
+ LedgerRange lastRange = LedgerRange.newBuilder().setLedgerId(lh.getId()).build();
+ topicInfo.currentLedgerRange = new InMemoryLedgerRange(lastRange, topicInfo.lastSeqIdPushed
+ .getLocalComponent() + 1, lh);
+
+ // Persist the fact that we started this new
+ // ledger to ZK
+
+ LedgerRanges.Builder builder = LedgerRanges.newBuilder();
+ for (InMemoryLedgerRange imlr : topicInfo.ledgerRanges.values()) {
+ builder.addRanges(imlr.range);
+ }
+ builder.addRanges(lastRange);
+
+ writeTopicLedgersNode(topic, builder.build().toByteArray(), expectedVersionOfLedgersNode,
+ topicInfo);
+ return;
+ }
+ }, ctx);
+ }
+
+ void writeTopicLedgersNode(final ByteString topic, byte[] data, int expectedVersion, final TopicInfo topicInfo) {
+ final String zNodePath = ledgersPath(topic);
+
+ zk.setData(zNodePath, data, expectedVersion, new SafeAsyncZKCallback.StatCallback() {
+ @Override
+ public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
+ if (rc != KeeperException.Code.OK.intValue()) {
+ KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+ "Could not write ledgers node for topic: " + topic.toStringUtf8(), path, rc);
+ cb.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+ return;
+ }
+
+ // Finally, all done
+ topicInfos.put(topic, topicInfo);
+ cb.operationFinished(ctx, null);
+ }
+ }, ctx);
+
+ }
+ }
+
+ /**
+ * acquire ownership of a topic, doing whatever is needed to be able to
+ * perform reads and writes on that topic from here on
+ *
+ * @param topic
+ * @param callback
+ * @param ctx
+ */
+ @Override
+ public void acquiredTopic(ByteString topic, Callback<Void> callback, Object ctx) {
+ queuer.pushAndMaybeRun(topic, new AcquireOp(topic, callback, ctx));
+ }
+
+ public void closeLedger(LedgerHandle lh) {
+ // try {
+ // lh.asyncClose(noOpCloseCallback, null);
+ // } catch (InterruptedException e) {
+ // logger.error(e);
+ // Thread.currentThread().interrupt();
+ // }
+ }
+
+ class ReleaseOp extends TopicOpQueuer.SynchronousOp {
+
+ public ReleaseOp(ByteString topic) {
+ queuer.super(topic);
+ }
+
+ @Override
+ public void runInternal() {
+ TopicInfo topicInfo = topicInfos.remove(topic);
+
+ if (topicInfo == null) {
+ return;
+ }
+
+ for (InMemoryLedgerRange imlr : topicInfo.ledgerRanges.values()) {
+ if (imlr.handle != null) {
+ closeLedger(imlr.handle);
+ }
+ }
+
+ if (topicInfo.currentLedgerRange != null && topicInfo.currentLedgerRange.handle != null) {
+ closeLedger(topicInfo.currentLedgerRange.handle);
+ }
+ }
+ }
+
+ /**
+ * Release any resources for the topic that might be currently held. There
+ * wont be any subsequent reads or writes on that topic coming
+ *
+ * @param topic
+ */
+ @Override
+ public void lostTopic(ByteString topic) {
+ queuer.pushAndMaybeRun(topic, new ReleaseOp(topic));
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/CacheKey.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/CacheKey.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/CacheKey.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/CacheKey.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.server.common.ByteStringInterner;
+
+public class CacheKey {
+
+ ByteString topic;
+ long seqId;
+
+ public CacheKey(ByteString topic, long seqId) {
+ this.topic = ByteStringInterner.intern(topic);
+ this.seqId = seqId;
+ }
+
+ public ByteString getTopic() {
+ return topic;
+ }
+
+ public long getSeqId() {
+ return seqId;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (seqId ^ (seqId >>> 32));
+ result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ CacheKey other = (CacheKey) obj;
+ if (seqId != other.seqId)
+ return false;
+ if (topic == null) {
+ if (other.topic != null)
+ return false;
+ } else if (!topic.equals(other.topic))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "(" + topic.toStringUtf8() + "," + seqId + ")";
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/CacheValue.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/CacheValue.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/CacheValue.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/CacheValue.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.log4j.Logger;
+
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.server.common.UnexpectedError;
+
+/**
+ * This class is NOT thread safe. It need not be thread-safe because our
+ * read-ahead cache will operate with only 1 thread
+ *
+ */
+public class CacheValue {
+
+ static Logger logger = Logger.getLogger(ReadAheadCache.class);
+
+ Queue<ScanCallbackWithContext> callbacks = new LinkedList<ScanCallbackWithContext>();
+ Message message;
+ long timeOfAddition = 0;
+
+ public CacheValue() {
+ }
+
+ public boolean isStub() {
+ return message == null;
+ }
+
+ public long getTimeOfAddition() {
+ if (message == null) {
+ throw new UnexpectedError("Time of add requested from a stub");
+ }
+ return timeOfAddition;
+ }
+
+ public void setMessageAndInvokeCallbacks(Message message, long currTime) {
+ if (this.message != null) {
+ // Duplicate read for the same message coming back
+ return;
+ }
+
+ this.message = message;
+ this.timeOfAddition = currTime;
+ ScanCallbackWithContext callbackWithCtx;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Invoking " + callbacks.size() + " callbacks for " + " message added to cache");
+ }
+ while ((callbackWithCtx = callbacks.poll()) != null) {
+ callbackWithCtx.getScanCallback().messageScanned(callbackWithCtx.getCtx(), message);
+ }
+ }
+
+ public void addCallback(ScanCallback callback, Object ctx) {
+ if (!isStub()) {
+ // call the callback right away
+ callback.messageScanned(ctx, message);
+ return;
+ }
+
+ callbacks.add(new ScanCallbackWithContext(callback, ctx));
+ }
+
+ public Message getMessage() {
+ return message;
+ }
+
+ public void setErrorAndInvokeCallbacks(Exception exception) {
+ ScanCallbackWithContext callbackWithCtx;
+ while ((callbackWithCtx = callbacks.poll()) != null) {
+ callbackWithCtx.getScanCallback().scanFailed(callbackWithCtx.getCtx(), exception);
+ }
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/Factory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/Factory.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/Factory.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/Factory.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+public interface Factory<T> {
+ public T newInstance();
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.sql.rowset.serial.SerialBlob;
+
+import org.apache.log4j.Logger;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protoextensions.MessageIdUtils;
+import org.apache.hedwig.server.persistence.ScanCallback.ReasonForFinish;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.FileUtils;
+
+public class LocalDBPersistenceManager implements PersistenceManagerWithRangeScan {
+ static Logger logger = Logger.getLogger(LocalDBPersistenceManager.class);
+
+ static String connectionURL;
+
+ static {
+ try {
+ File tempDir = FileUtils.createTempDirectory("derby", null);
+
+ // Since derby needs to create it, I will have to delete it first
+ if (!tempDir.delete()) {
+ throw new IOException("Could not delete dir: " + tempDir.getAbsolutePath());
+ }
+ connectionURL = "jdbc:derby:" + tempDir.getAbsolutePath() + ";create=true";
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ private static final ThreadLocal<Connection> threadLocalConnection = new ThreadLocal<Connection>() {
+ @Override
+ protected Connection initialValue() {
+ try {
+ return DriverManager.getConnection(connectionURL);
+ } catch (SQLException e) {
+ logger.error("Could not connect to derby", e);
+ return null;
+ }
+ }
+ };
+ static final String ID_FIELD_NAME = "id";
+ static final String MSG_FIELD_NAME = "msg";
+ static final String driver = "org.apache.derby.jdbc.EmbeddedDriver";
+
+ static final int SCAN_CHUNK = 1000;
+
+ /**
+ * Having trouble restarting the database multiple times from within the
+ * same jvm. Hence to facilitate units tests, we are just going to have a
+ * version number that we will append to every table name. This version
+ * number will be incremented in lieu of shutting down the database and
+ * restarting it, so that we get different table names, and it behaves like
+ * a brand new database
+ */
+ private int version = 0;
+
+ ConcurrentMap<ByteString, MessageSeqId> currTopicSeqIds = new ConcurrentHashMap<ByteString, MessageSeqId>();
+
+ static LocalDBPersistenceManager instance = new LocalDBPersistenceManager();
+
+ public static LocalDBPersistenceManager instance() {
+ return instance;
+ }
+
+ private LocalDBPersistenceManager() {
+
+ try {
+ Class.forName(driver).newInstance();
+ logger.info("Derby Driver loaded");
+ } catch (java.lang.ClassNotFoundException e) {
+ logger.error("Derby driver not found", e);
+ } catch (InstantiationException e) {
+ logger.error("Could not instantiate derby driver", e);
+ } catch (IllegalAccessException e) {
+ logger.error("Could not instantiate derby driver", e);
+ }
+ }
+
+ /**
+ * Ensures that at least the default seq-id exists in the map for the given
+ * topic. Checks for race conditions (.e.g, another thread inserts the
+ * default id before us), and returns the latest seq-id value in the map
+ *
+ * @param topic
+ * @return
+ */
+ private MessageSeqId ensureSeqIdExistsForTopic(ByteString topic) {
+ MessageSeqId presentSeqIdInMap = currTopicSeqIds.get(topic);
+
+ if (presentSeqIdInMap != null) {
+ return presentSeqIdInMap;
+ }
+
+ presentSeqIdInMap = MessageSeqId.newBuilder().setLocalComponent(0).build();
+ MessageSeqId oldSeqIdInMap = currTopicSeqIds.putIfAbsent(topic, presentSeqIdInMap);
+
+ if (oldSeqIdInMap != null) {
+ return oldSeqIdInMap;
+ }
+ return presentSeqIdInMap;
+
+ }
+
+ /**
+ * Adjust the current seq id of the topic based on the message we are about
+ * to publish. The local component of the current seq-id is always
+ * incremented by 1. For the other components, there are two cases:
+ *
+ * 1. If the message to be published doesn't have a seq-id (locally
+ * published messages), the other components are left as is.
+ *
+ * 2. If the message to be published has a seq-id, we take the max of the
+ * current one we have, and that in the message to be published.
+ *
+ * @param topic
+ * @param messageToPublish
+ * @return The value of the local seq-id obtained after incrementing the
+ * local component. This value should be used as an id while
+ * persisting to Derby
+ * @throws UnexpectedConditionException
+ */
+ private long adjustTopicSeqIdForPublish(ByteString topic, Message messageToPublish)
+ throws UnexpectedConditionException {
+ long retValue = 0;
+ MessageSeqId oldId;
+ MessageSeqId.Builder newIdBuilder = MessageSeqId.newBuilder();
+
+ do {
+ oldId = ensureSeqIdExistsForTopic(topic);
+
+ // Increment our own component by 1
+ retValue = oldId.getLocalComponent() + 1;
+ newIdBuilder.setLocalComponent(retValue);
+
+ if (messageToPublish.hasMsgId()) {
+ // take a region-wise max
+ MessageIdUtils.takeRegionMaximum(newIdBuilder, messageToPublish.getMsgId(), oldId);
+
+ } else {
+ newIdBuilder.addAllRemoteComponents(oldId.getRemoteComponentsList());
+ }
+ } while (!currTopicSeqIds.replace(topic, oldId, newIdBuilder.build()));
+
+ return retValue;
+
+ }
+
+ public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) {
+ return seqId + skipAmount;
+ }
+
+ public void persistMessage(PersistRequest request) {
+
+ Connection conn = threadLocalConnection.get();
+
+ Callback<Long> callback = request.getCallback();
+ Object ctx = request.getCtx();
+ ByteString topic = request.getTopic();
+ Message message = request.getMessage();
+
+ if (conn == null) {
+ callback.operationFailed(ctx, new ServiceDownException("Not connected to derby"));
+ return;
+ }
+
+ long seqId;
+
+ try {
+ seqId = adjustTopicSeqIdForPublish(topic, message);
+ } catch (UnexpectedConditionException e) {
+ callback.operationFailed(ctx, e);
+ return;
+ }
+ PreparedStatement stmt;
+
+ boolean triedCreatingTable = false;
+ while (true) {
+ try {
+ message.getBody();
+ stmt = conn.prepareStatement("INSERT INTO " + getTableNameForTopic(topic) + " VALUES(?,?)");
+ stmt.setLong(1, seqId);
+ stmt.setBlob(2, new SerialBlob(message.toByteArray()));
+
+ int rowCount = stmt.executeUpdate();
+ stmt.close();
+ if (rowCount != 1) {
+ logger.error("Unexpected number of affected rows from derby");
+ callback.operationFailed(ctx, new ServiceDownException("Unexpected response from derby"));
+ return;
+ }
+ break;
+ } catch (SQLException sqle) {
+ String theError = (sqle).getSQLState();
+ if (theError.equals("42X05") && !triedCreatingTable) {
+ createTable(conn, topic);
+ triedCreatingTable = true;
+ continue;
+ }
+
+ logger.error("Error while executing derby insert", sqle);
+ callback.operationFailed(ctx, new ServiceDownException(sqle));
+ return;
+ }
+ }
+ callback.operationFinished(ctx, seqId);
+ }
+
+ /*
+ * This method does not throw an exception because another thread might
+ * sneak in and create the table before us
+ */
+ private void createTable(Connection conn, ByteString topic) {
+
+ try {
+ Statement stmt = conn.createStatement();
+ String tableName = getTableNameForTopic(topic);
+ stmt.execute("CREATE TABLE " + tableName + " (" + ID_FIELD_NAME + " BIGINT NOT NULL CONSTRAINT ID_PK_"
+ + tableName + " PRIMARY KEY," + MSG_FIELD_NAME + " BLOB(2M) NOT NULL)");
+ } catch (SQLException e) {
+ logger.debug("Could not create table", e);
+ }
+ }
+
+ public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) {
+ return ensureSeqIdExistsForTopic(topic);
+ }
+
+ public void scanSingleMessage(ScanRequest request) {
+ scanMessagesInternal(request.getTopic(), request.getStartSeqId(), 1, Long.MAX_VALUE, request.getCallback(),
+ request.getCtx(), 1);
+ return;
+ }
+
+ public void scanMessages(RangeScanRequest request) {
+ scanMessagesInternal(request.getTopic(), request.getStartSeqId(), request.getMessageLimit(), request
+ .getSizeLimit(), request.getCallback(), request.getCtx(), SCAN_CHUNK);
+ return;
+ }
+
+ private String getTableNameForTopic(ByteString topic) {
+ return (topic.toStringUtf8() + "_" + version);
+ }
+
+ private void scanMessagesInternal(ByteString topic, long startSeqId, int messageLimit, long sizeLimit,
+ ScanCallback callback, Object ctx, int scanChunk) {
+
+ Connection conn = threadLocalConnection.get();
+
+ if (conn == null) {
+ callback.scanFailed(ctx, new ServiceDownException("Not connected to derby"));
+ return;
+ }
+
+ long currentSeqId;
+ currentSeqId = startSeqId;
+
+ PreparedStatement stmt;
+ try {
+ try {
+ stmt = conn.prepareStatement("SELECT * FROM " + getTableNameForTopic(topic) + " WHERE " + ID_FIELD_NAME
+ + " >= ? AND " + ID_FIELD_NAME + " <= ?");
+
+ } catch (SQLException sqle) {
+ String theError = (sqle).getSQLState();
+ if (theError.equals("42X05")) {
+ // No table, scan is over
+ callback.scanFinished(ctx, ReasonForFinish.NO_MORE_MESSAGES);
+ return;
+ } else {
+ throw sqle;
+ }
+ }
+
+ int numMessages = 0;
+ long totalSize = 0;
+
+ while (true) {
+
+ stmt.setLong(1, currentSeqId);
+ stmt.setLong(2, currentSeqId + scanChunk);
+
+ if (!stmt.execute()) {
+ String errorMsg = "Select query did not return a result set";
+ logger.error(errorMsg);
+ stmt.close();
+ callback.scanFailed(ctx, new ServiceDownException(errorMsg));
+ return;
+ }
+
+ ResultSet resultSet = stmt.getResultSet();
+
+ if (!resultSet.next()) {
+ stmt.close();
+ callback.scanFinished(ctx, ReasonForFinish.NO_MORE_MESSAGES);
+ return;
+ }
+
+ do {
+
+ long localSeqId = resultSet.getLong(1);
+
+ Message.Builder messageBuilder = Message.newBuilder().mergeFrom(resultSet.getBinaryStream(2));
+
+ // Merge in the local seq-id since that is not stored with
+ // the message
+ Message message = MessageIdUtils.mergeLocalSeqId(messageBuilder, localSeqId);
+
+ callback.messageScanned(ctx, message);
+ numMessages++;
+ totalSize += message.getBody().size();
+
+ if (numMessages > messageLimit) {
+ stmt.close();
+ callback.scanFinished(ctx, ReasonForFinish.NUM_MESSAGES_LIMIT_EXCEEDED);
+ return;
+ } else if (totalSize > sizeLimit) {
+ stmt.close();
+ callback.scanFinished(ctx, ReasonForFinish.SIZE_LIMIT_EXCEEDED);
+ return;
+ }
+
+ } while (resultSet.next());
+
+ currentSeqId += SCAN_CHUNK;
+ }
+ } catch (SQLException e) {
+ logger.error("SQL Exception", e);
+ callback.scanFailed(ctx, new ServiceDownException(e));
+ return;
+ } catch (IOException e) {
+ logger.error("Message stored in derby is not parseable", e);
+ callback.scanFailed(ctx, new ServiceDownException(e));
+ return;
+ }
+
+ }
+
+ public void deliveredUntil(ByteString topic, Long seqId) {
+ // noop
+ }
+
+ public void consumedUntil(ByteString topic, Long seqId) {
+ Connection conn = threadLocalConnection.get();
+ if (conn == null) {
+ logger.error("Not connected to derby");
+ return;
+ }
+ PreparedStatement stmt;
+ try {
+ stmt = conn.prepareStatement("DELETE FROM " + getTableNameForTopic(topic) + " WHERE " + ID_FIELD_NAME
+ + " <= ?");
+ stmt.setLong(1, seqId);
+ int rowCount = stmt.executeUpdate();
+ logger.debug("Deleted " + rowCount + " records for topic: " + topic.toStringUtf8() + ", seqId: " + seqId);
+ stmt.close();
+ } catch (SQLException sqle) {
+ String theError = (sqle).getSQLState();
+ if (theError.equals("42X05")) {
+ logger.warn("Table for topic (" + topic + ") does not exist so no consumed messages to delete!");
+ } else
+ logger.error("Error while executing derby delete for consumed messages", sqle);
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ if (driver.equals("org.apache.derby.jdbc.EmbeddedDriver")) {
+ boolean gotSQLExc = false;
+ // This is weird: on normal shutdown, it throws an exception
+ try {
+ DriverManager.getConnection("jdbc:derby:;shutdown=true").close();
+ } catch (SQLException se) {
+ if (se.getSQLState().equals("XJ015")) {
+ gotSQLExc = true;
+ }
+ }
+ if (!gotSQLExc) {
+ logger.error("Database did not shut down normally");
+ } else {
+ logger.info("Database shut down normally");
+ }
+ }
+ super.finalize();
+ }
+
+ public void reset() {
+ // just move the namespace over to the next one
+ version++;
+ currTopicSeqIds.clear();
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/MapMethods.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/MapMethods.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/MapMethods.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/MapMethods.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class MapMethods {
+
+ public static <K, V> V getAfterInsertingIfAbsent(Map<K, V> map, K key, Factory<V> valueFactory) {
+ V value = map.get(key);
+
+ if (value == null) {
+ value = valueFactory.newInstance();
+ map.put(key, value);
+ }
+
+ return value;
+ }
+
+ public static <K, V, Z extends Collection<V>> void addToMultiMap(Map<K, Z> map, K key, V value,
+ Factory<Z> valueFactory) {
+ Collection<V> collection = getAfterInsertingIfAbsent(map, key, valueFactory);
+
+ collection.add(value);
+
+ }
+
+ public static <K, V, Z extends Collection<V>> boolean removeFromMultiMap(Map<K, Z> map, K key, V value) {
+ Collection<V> collection = map.get(key);
+
+ if (collection == null) {
+ return false;
+ }
+
+ if (!collection.remove(value)) {
+ return false;
+ } else {
+ if (collection.isEmpty()) {
+ map.remove(key);
+ }
+ return true;
+ }
+
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistRequest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistRequest.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistRequest.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistRequest.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * Encapsulates a request to persist a given message on a given topic. The
+ * request is completed asynchronously, callback and context are provided
+ *
+ */
+public class PersistRequest {
+ ByteString topic;
+ Message message;
+ Callback<Long> callback;
+ Object ctx;
+
+ public PersistRequest(ByteString topic, Message message, Callback<Long> callback, Object ctx) {
+ this.topic = topic;
+ this.message = message;
+ this.callback = callback;
+ this.ctx = ctx;
+ }
+
+ public ByteString getTopic() {
+ return topic;
+ }
+
+ public Message getMessage() {
+ return message;
+ }
+
+ public Callback<Long> getCallback() {
+ return callback;
+ }
+
+ public Object getCtx() {
+ return ctx;
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+
+/**
+ * An implementation of this interface will persist messages in order and assign
+ * a seqId to each persisted message. SeqId need not be a single number in
+ * general. SeqId is opaque to all layers above {@link PersistenceManager}. Only
+ * the {@link PersistenceManager} needs to understand the format of the seqId
+ * and maintain it in such a way that there is a total order on the seqIds of a
+ * topic.
+ *
+ */
+public interface PersistenceManager {
+
+ /**
+ * Executes the given persist request asynchronously. When done, the
+ * callback specified in the request object is called with the result of the
+ * operation set to the {@link LocalMessageSeqId} assigned to the persisted
+ * message.
+ */
+ public void persistMessage(PersistRequest request);
+
+ /**
+ * Get the seqId of the last message that has been persisted to the given
+ * topic. The returned seqId will be set as the consume position of any
+ * brand new subscription on this topic.
+ *
+ * Note that the return value may quickly become invalid because a
+ * {@link #persistMessage(String, PublishedMessage)} call from another
+ * thread succeeds. For us, the typical use case is choosing the consume
+ * position of a new subscriber. Since the subscriber need not receive all
+ * messages that are published while the subscribe call is in progress, such
+ * loose semantics from this method is acceptable.
+ *
+ * @param topic
+ * @return the seqId of the last persisted message.
+ * @throws ServerNotResponsibleForTopicException
+ */
+ public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) throws ServerNotResponsibleForTopicException;
+
+ /**
+ * Executes the given scan request
+ *
+ */
+ public void scanSingleMessage(ScanRequest request);
+
+ /**
+ * Gets the next seq-id. This method should never block.
+ */
+ public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount);
+
+ /**
+ * Hint that the messages until the given seqId have been delivered and wont
+ * be needed unless there is a failure of some kind
+ */
+ public void deliveredUntil(ByteString topic, Long seqId);
+
+ /**
+ * Hint that the messages until the given seqId have been consumed by all
+ * subscribers to the topic and no longer need to be stored. The
+ * implementation classes can decide how and if they want to garbage collect
+ * and delete these older topic messages that are no longer needed.
+ *
+ * @param topic
+ * Topic
+ * @param seqId
+ * Message local sequence ID
+ */
+ public void consumedUntil(ByteString topic, Long seqId);
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+public interface PersistenceManagerWithRangeScan extends PersistenceManager {
+ /**
+ * Executes the given range scan request
+ *
+ * @param request
+ */
+ public void scanMessages(RangeScanRequest request);
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * Encapsulates a request to scan messages on the given topic starting from the
+ * given seqId (included). A call-back {@link ScanCallback} is provided. As
+ * messages are scanned, the relevant methods of the {@link ScanCallback} are
+ * called. Two hints are provided as to when scanning should stop: in terms of
+ * number of messages scanned, or in terms of the total size of messages
+ * scanned. Scanning stops whenever one of these limits is exceeded. These
+ * checks, especially the one about message size, are only approximate. The
+ * {@link ScanCallback} used should be prepared to deal with more or less
+ * messages scanned. If an error occurs during scanning, the
+ * {@link ScanCallback} is notified of the error.
+ *
+ */
+public class RangeScanRequest {
+ ByteString topic;
+ long startSeqId;
+ int messageLimit;
+ long sizeLimit;
+ ScanCallback callback;
+ Object ctx;
+
+ public RangeScanRequest(ByteString topic, long startSeqId, int messageLimit, long sizeLimit, ScanCallback callback,
+ Object ctx) {
+ this.topic = topic;
+ this.startSeqId = startSeqId;
+ this.messageLimit = messageLimit;
+ this.sizeLimit = sizeLimit;
+ this.callback = callback;
+ this.ctx = ctx;
+ }
+
+ public ByteString getTopic() {
+ return topic;
+ }
+
+ public long getStartSeqId() {
+ return startSeqId;
+ }
+
+ public int getMessageLimit() {
+ return messageLimit;
+ }
+
+ public long getSizeLimit() {
+ return sizeLimit;
+ }
+
+ public ScanCallback getCallback() {
+ return callback;
+ }
+
+ public Object getCtx() {
+ return ctx;
+ }
+
+}