You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/03/14 12:29:04 UTC

svn commit: r1300510 [3/3] - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/cpp/ hedwig-client/src/main/cpp/inc/hedwig/ hedwig-client/src/main/cpp/lib/ hedwig-client/src/main/cpp/scripts/ hedwig-client/src/main/cpp/test/ hedwig-client/src/ma...

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto Wed Mar 14 11:29:03 2012
@@ -96,6 +96,12 @@ message SubscribeRequest{
 	
 	// wait for cross-regional subscriptions to be established before returning
 	optional bool synchronous = 4 [default = false];
+	optional uint32 messageBound = 5;
+}
+
+message SubscriptionOptions {
+    optional SubscribeRequest.CreateOrAttach createOrAttach = 2 [default = CREATE_OR_ATTACH];
+    optional uint32 messageBound = 3 [default = 0];
 }
 
 message ConsumeRequest{
@@ -162,6 +168,7 @@ enum StatusCode{
     
 message SubscriptionState {
     required MessageSeqId msgId = 1;
+    optional uint32 messageBound = 2;
 }
 
 message LedgerRange{

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java Wed Mar 14 11:29:03 2012
@@ -20,6 +20,8 @@ package org.apache.hedwig.server.persist
 import java.io.IOException;
 import java.util.Enumeration;
 import java.util.Iterator;
+import java.util.Set;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -120,6 +122,7 @@ public class BookkeeperPersistenceManage
          * include the current ledger
          */
         TreeMap<Long, InMemoryLedgerRange> ledgerRanges = new TreeMap<Long, InMemoryLedgerRange>();
+        int ledgerRangesZnodeVersion = -1;
 
         /**
          * This is the handle of the current ledger that is being used to write
@@ -131,6 +134,9 @@ public class BookkeeperPersistenceManage
          * Flag to release topic when encountering unrecoverable exceptions
          */
         AtomicBoolean doRelease = new AtomicBoolean(false);
+
+        final static int UNLIMITED = 0;
+        int messageBound = UNLIMITED;
     }
 
     Map<ByteString, TopicInfo> topicInfos = new ConcurrentHashMap<ByteString, TopicInfo>();
@@ -308,29 +314,123 @@ public class BookkeeperPersistenceManage
         // Nothing to do here. this is just a hint that we cannot use.
     }
 
+    class UpdateLedgerOp extends TopicOpQueuer.AsynchronousOp<Void> {
+        private long ledgerDeleted;
+
+        public UpdateLedgerOp(ByteString topic, final Callback<Void> cb, final Object ctx, final long ledgerDeleted) {
+            queuer.super(topic, cb, ctx);
+            this.ledgerDeleted = ledgerDeleted;
+        }
+
+        @Override
+        public void run() {
+            final TopicInfo topicInfo = topicInfos.get(topic);
+            if (topicInfo == null) {
+                logger.error("Server is not responsible for topic!");
+                return;
+            }
+            boolean needsUpdate = false;
+            LedgerRanges.Builder builder = LedgerRanges.newBuilder();
+            final Set<Long> keysToRemove = new HashSet<Long>();
+            for (Map.Entry<Long, InMemoryLedgerRange> e : topicInfo.ledgerRanges.entrySet()) {
+                if (e.getValue().range.getLedgerId() == ledgerDeleted) {
+                    needsUpdate = true;
+                    keysToRemove.add(e.getKey());
+                } else {
+                    builder.addRanges(e.getValue().range);
+                }
+            }
+            builder.addRanges(topicInfo.currentLedgerRange.range);
+
+            if (needsUpdate) {
+                final LedgerRanges newRanges = builder.build();
+                updateLedgerRangesNode(topic, newRanges, topicInfo.ledgerRangesZnodeVersion,
+                                       new Callback<Integer>() {
+                                           public void operationFinished(Object ctx, Integer newVersion) {
+                                               // Finally, all done
+                                               for (Long k : keysToRemove) {
+                                                   topicInfo.ledgerRanges.remove(k);
+                                               }
+                                               topicInfo.ledgerRangesZnodeVersion = newVersion;
+                                               cb.operationFinished(ctx, null);
+                                           }
+                                           public void operationFailed(Object ctx, PubSubException exception) {
+                                               cb.operationFailed(ctx, exception);
+                                           }
+                                       }, ctx);
+            } else {
+                cb.operationFinished(ctx, null);
+            }
+        }
+    }
+
+    class ConsumeUntilOp extends TopicOpQueuer.SynchronousOp {
+        private final long seqId;
+
+        public ConsumeUntilOp(ByteString topic, long seqId) {
+            queuer.super(topic);
+            this.seqId = seqId;
+        }
+
+        @Override
+        public void runInternal() {
+            TopicInfo topicInfo = topicInfos.get(topic);
+            if (topicInfo == null) {
+                logger.error("Server is not responsible for topic!");
+                return;
+            }
+
+            for (Long endSeqIdIncluded : topicInfo.ledgerRanges.keySet()) {
+                if (endSeqIdIncluded <= seqId) {
+                    // This ledger's message entries have all been consumed already
+                    // so it is safe to delete it from BookKeeper.
+                    long ledgerId = topicInfo.ledgerRanges.get(endSeqIdIncluded).range.getLedgerId();
+                    try {
+                        bk.deleteLedger(ledgerId);
+                        Callback<Void> cb = new Callback<Void>() {
+                            public void operationFinished(Object ctx, Void result) {
+                                // do nothing, op is async to stop other ops
+                                // occurring on the topic during the update
+                            }
+                            public void operationFailed(Object ctx, PubSubException exception) {
+                                logger.error("Failed to update ledger znode", exception);
+                            }
+                        };
+                        queuer.pushAndMaybeRun(topic, new UpdateLedgerOp(topic, cb, null, ledgerId));
+                    } catch (Exception e) {
+                        // For now, just log an exception error message. In the
+                        // future, we can have more complicated retry logic to
+                        // delete a consumed ledger. The next time the ledger
+                        // garbage collection job runs, we'll once again try to
+                        // delete this ledger.
+                        logger.error("Exception while deleting consumed ledgerId: " + ledgerId, e);
+                    }
+                } else
+                    break;
+            }
+        }
+    }
+
     public void consumedUntil(ByteString topic, Long seqId) {
+        queuer.pushAndMaybeRun(topic, new ConsumeUntilOp(topic, Math.max(seqId, getMinSeqIdForTopic(topic))));
+    }
+
+    public void consumeToBound(ByteString topic) {
         TopicInfo topicInfo = topicInfos.get(topic);
-        if (topicInfo == null) {
-            logger.error("Server is not responsible for topic!");
+
+        if (topicInfo == null || topicInfo.messageBound == topicInfo.UNLIMITED) {
             return;
         }
-        for (Long endSeqIdIncluded : topicInfo.ledgerRanges.keySet()) {
-            if (endSeqIdIncluded <= seqId) {
-                // This ledger's message entries have all been consumed already
-                // so it is safe to delete it from BookKeeper.
-                long ledgerId = topicInfo.ledgerRanges.get(endSeqIdIncluded).range.getLedgerId();
-                try {
-                    bk.deleteLedger(ledgerId);
-                } catch (Exception e) {
-                    // For now, just log an exception error message. In the
-                    // future, we can have more complicated retry logic to
-                    // delete a consumed ledger. The next time the ledger
-                    // garbage collection job runs, we'll once again try to
-                    // delete this ledger.
-                    logger.error("Exception while deleting consumed ledgerId: " + ledgerId, e);
-                }
-            } else
-                break;
+        queuer.pushAndMaybeRun(topic, new ConsumeUntilOp(topic, getMinSeqIdForTopic(topic)));
+    }
+
+    public long getMinSeqIdForTopic(ByteString topic) {
+        TopicInfo topicInfo = topicInfos.get(topic);
+
+        if (topicInfo == null || topicInfo.messageBound == topicInfo.UNLIMITED) {
+            return Long.MIN_VALUE;
+        } else {
+            return (topicInfo.lastSeqIdPushed.getLocalComponent() - topicInfo.messageBound) + 1;
         }
     }
 
@@ -345,7 +445,7 @@ public class BookkeeperPersistenceManage
     }
 
     public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) {
-        return seqId + skipAmount;
+        return Math.max(seqId + skipAmount, getMinSeqIdForTopic(topic));
     }
 
     public class PersistOp extends TopicOpQueuer.SynchronousOp {
@@ -615,7 +715,7 @@ public class BookkeeperPersistenceManage
                             LedgerRange lr = LedgerRange.newBuilder().setLedgerId(ledgerId)
                                              .setEndSeqIdIncluded(lastMessage.getMsgId()).build();
                             topicInfo.ledgerRanges.put(lr.getEndSeqIdIncluded().getLocalComponent(),
-                                                       new InMemoryLedgerRange(lr, prevLedgerEnd + 1, lh));
+                                    new InMemoryLedgerRange(lr, prevLedgerEnd + 1, lh));
 
                             logger.info("Recovered unclosed ledger: " + ledgerId + " for topic: "
                                         + topic.toStringUtf8() + " with " + numEntriesInLastLedger + " entries");
@@ -673,33 +773,40 @@ public class BookkeeperPersistenceManage
                     }
                     builder.addRanges(lastRange);
 
-                    writeTopicLedgersNode(topic, builder.build().toByteArray(), expectedVersionOfLedgersNode,
-                                          topicInfo);
+                    updateLedgerRangesNode(topic, builder.build(), expectedVersionOfLedgersNode,
+                                           new Callback<Integer>() {
+                                               public void operationFinished(Object ctx, Integer newVersion) {
+                                                   // Finally, all done
+                                                   topicInfo.ledgerRangesZnodeVersion = newVersion;
+                                                   topicInfos.put(topic, topicInfo);
+                                                   cb.operationFinished(ctx, null);
+                                               }
+                                               public void operationFailed(Object ctx, PubSubException exception) {
+                                                   cb.operationFailed(ctx, exception);
+                                               }
+                                           }, ctx);
                     return;
                 }
             }, ctx);
         }
+    }
 
-        void writeTopicLedgersNode(final ByteString topic, byte[] data, int expectedVersion, final TopicInfo topicInfo) {
-            final String zNodePath = ledgersPath(topic);
+    public void updateLedgerRangesNode(final ByteString topic, LedgerRanges ranges,
+                                       int version, final Callback<Integer> callback, Object ctx) {
+        final String zNodePath = ledgersPath(topic);
 
-            zk.setData(zNodePath, data, expectedVersion, new SafeAsyncZKCallback.StatCallback() {
+        zk.setData(zNodePath, ranges.toByteArray(), version, new SafeAsyncZKCallback.StatCallback() {
                 @Override
                 public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
                     if (rc != KeeperException.Code.OK.intValue()) {
                         KeeperException ke = ZkUtils.logErrorAndCreateZKException(
-                                                 "Could not write ledgers node for topic: " + topic.toStringUtf8(), path, rc);
-                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+                                "Could not write ledgers node for topic: " + topic.toStringUtf8(), path, rc);
+                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
                         return;
                     }
-
-                    // Finally, all done
-                    topicInfos.put(topic, topicInfo);
-                    cb.operationFinished(ctx, null);
+                    callback.operationFinished(ctx, stat.getVersion());
                 }
             }, ctx);
-
-        }
     }
 
     /**
@@ -761,4 +868,28 @@ public class BookkeeperPersistenceManage
         queuer.pushAndMaybeRun(topic, new ReleaseOp(topic));
     }
 
+    class SetMessageBoundOp extends TopicOpQueuer.SynchronousOp {
+        final int bound;
+
+        public SetMessageBoundOp(ByteString topic, int bound) {
+            queuer.super(topic);
+            this.bound = bound;
+        }
+
+        @Override
+        public void runInternal() {
+            TopicInfo topicInfo = topicInfos.get(topic);
+            if (topicInfo != null) {
+                topicInfo.messageBound = bound;
+            }
+        }
+    }
+
+    public void setMessageBound(ByteString topic, Integer bound) {
+        queuer.pushAndMaybeRun(topic, new SetMessageBoundOp(topic, bound));
+    }
+
+    public void clearMessageBound(ByteString topic) {
+        queuer.pushAndMaybeRun(topic, new SetMessageBoundOp(topic, TopicInfo.UNLIMITED));
+    }
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java Wed Mar 14 11:29:03 2012
@@ -398,6 +398,18 @@ public class LocalDBPersistenceManager i
         }
     }
 
+    public void setMessageBound(ByteString topic, Integer bound) {
+        // noop; Maybe implement later
+    }
+
+    public void clearMessageBound(ByteString topic) {
+        // noop; Maybe implement later
+    }
+
+    public void consumeToBound(ByteString topic) {
+        // noop; Maybe implement later
+    }
+
     @Override
     protected void finalize() throws Throwable {
         if (driver.equals("org.apache.derby.jdbc.EmbeddedDriver")) {

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java Wed Mar 14 11:29:03 2012
@@ -88,4 +88,7 @@ public interface PersistenceManager {
      */
     public void consumedUntil(ByteString topic, Long seqId);
 
+    public void setMessageBound(ByteString topic, Integer bound);
+    public void clearMessageBound(ByteString topic);
+    public void consumeToBound(ByteString topic);
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java Wed Mar 14 11:29:03 2012
@@ -250,6 +250,18 @@ public class ReadAheadCache implements P
         realPersistenceManager.consumedUntil(topic, seqId);
     }
 
+    public void setMessageBound(ByteString topic, Integer bound) {
+        realPersistenceManager.setMessageBound(topic, bound);
+    }
+
+    public void clearMessageBound(ByteString topic) {
+        realPersistenceManager.clearMessageBound(topic);
+    }
+
+    public void consumeToBound(ByteString topic) {
+        realPersistenceManager.consumeToBound(topic);
+    }
+
     /**
      * ========================================================================
      * BEGINNING OF CODE FOR THE CACHE MAINTAINER THREAD

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java Wed Mar 14 11:29:03 2012
@@ -26,6 +26,7 @@ import org.apache.hedwig.exceptions.PubS
 import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
 import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
 import org.apache.hedwig.util.Callback;
 
 /**
@@ -46,13 +47,28 @@ public class HedwigHubSubscriber extends
     public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
             throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
         InvalidSubscriberIdException {
-        subscribe(topic, subscriberId, mode, true);
+        SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
+        subscribe(topic, subscriberId, options);
     }
 
     @Override
     public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
                                Object context) {
-        asyncSubscribe(topic, subscriberId, mode, callback, context, true);
+        SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
+        asyncSubscribe(topic, subscriberId, options, callback, context);
+    }
+
+    @Override
+    public void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options)
+            throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
+        InvalidSubscriberIdException {
+        subscribe(topic, subscriberId, options, true);
+    }
+
+    @Override
+    public void asyncSubscribe(ByteString topic, ByteString subscriberId,
+                               SubscriptionOptions options, Callback<Void> callback, Object context) {
+        asyncSubscribe(topic, subscriberId, options, callback, context, true);
     }
 
     @Override

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Wed Mar 14 11:29:03 2012
@@ -111,6 +111,7 @@ public abstract class AbstractSubscripti
             for (ByteString topic : top2sub2seq.keySet()) {
                 final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
                 long minConsumedMessage = Long.MAX_VALUE;
+                boolean hasBound = true;
                 // Loop through all subscribers to the current topic to find the
                 // minimum consumed message id. The consume pointers are
                 // persisted lazily so we'll use the stale in-memory value
@@ -119,20 +120,20 @@ public abstract class AbstractSubscripti
                 for (InMemorySubscriptionState curSubscription : topicSubscriptions.values()) {
                     if (curSubscription.getSubscriptionState().getMsgId().getLocalComponent() < minConsumedMessage)
                         minConsumedMessage = curSubscription.getSubscriptionState().getMsgId().getLocalComponent();
+                    hasBound = hasBound && curSubscription.getSubscriptionState().hasMessageBound();
                 }
                 boolean callPersistenceManager = true;
                 // Don't call the PersistenceManager if nobody is subscribed to
                 // the topic yet, or the consume pointer has not changed since
                 // the last time, or if this is the initial subscription.
                 if (topicSubscriptions.isEmpty()
-                        || (topic2MinConsumedMessagesMap.containsKey(topic) && topic2MinConsumedMessagesMap.get(topic) == minConsumedMessage)
-                        || minConsumedMessage == 0) {
-                    callPersistenceManager = false;
-                }
-                // Pass the new consume pointers to the PersistenceManager.
-                if (callPersistenceManager) {
+                    || (topic2MinConsumedMessagesMap.containsKey(topic)
+                        && topic2MinConsumedMessagesMap.get(topic) == minConsumedMessage)
+                    || minConsumedMessage == 0) {
                     topic2MinConsumedMessagesMap.put(topic, minConsumedMessage);
                     pm.consumedUntil(topic, minConsumedMessage);
+                } else if (hasBound) {
+                    pm.consumeToBound(topic);
                 }
             }
         }
@@ -198,6 +199,8 @@ public abstract class AbstractSubscripti
                     } else {
                         cb2.operationFinished(ctx, null);
                     }
+
+                    updateMessageBound(topic);
                 }
 
             }, ctx);
@@ -360,7 +363,12 @@ public abstract class AbstractSubscripti
             }
 
             // now the hard case, this is a brand new subscription, must record
-            final SubscriptionState newState = SubscriptionState.newBuilder().setMsgId(consumeSeqId).build();
+            SubscriptionState.Builder stateBuilder = SubscriptionState.newBuilder().setMsgId(consumeSeqId);
+            if (subRequest.hasMessageBound()) {
+                stateBuilder = stateBuilder.setMessageBound(subRequest.getMessageBound());
+            }
+            final SubscriptionState newState = stateBuilder.build();
+
             createSubscriptionState(topic, subscriberId, newState, new Callback<Void>() {
                 @Override
                 public void operationFailed(Object ctx, PubSubException exception) {
@@ -406,6 +414,9 @@ public abstract class AbstractSubscripti
                         @Override
                         public void operationFinished(Object ctx, Void resultOfOperation) {
                             topicSubscriptions.put(subscriberId, new InMemorySubscriptionState(newState));
+
+                            updateMessageBound(topic);
+
                             cb.operationFinished(ctx, consumeSeqId);
                         }
 
@@ -421,6 +432,27 @@ public abstract class AbstractSubscripti
         }
     }
 
+    public void updateMessageBound(ByteString topic) {
+        final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
+        if (topicSubscriptions == null) {
+            return;
+        }
+        int maxBound = Integer.MIN_VALUE;
+        for (Map.Entry<ByteString, InMemorySubscriptionState> e : topicSubscriptions.entrySet()) {
+            if (!e.getValue().getSubscriptionState().hasMessageBound()) {
+                maxBound = Integer.MIN_VALUE;
+                break;
+            } else {
+                maxBound = Math.max(maxBound, e.getValue().getSubscriptionState().getMessageBound());
+            }
+        }
+        if (maxBound == Integer.MIN_VALUE) {
+            pm.clearMessageBound(topic);
+        } else {
+            pm.setMessageBound(topic, maxBound);
+        }
+    }
+
     @Override
     public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
                                       Callback<MessageSeqId> callback, Object ctx) {
@@ -508,6 +540,8 @@ public abstract class AbstractSubscripti
                     if (!SubscriptionStateUtils.isHubSubscriber(subscriberId)
                             && topic2LocalCounts.get(topic).decrementAndGet() == 0)
                         notifyUnsubcribe(topic);
+
+                    updateMessageBound(topic);
                     cb.operationFinished(ctx, null);
                 }
             }, ctx);

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java?rev=1300510&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java Wed Mar 14 11:29:03 2012
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hedwig.client.HedwigClient;
+import org.apache.hedwig.client.api.MessageHandler;
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
+
+import org.apache.hedwig.client.api.Client;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.util.Callback;
+
+import org.apache.hedwig.server.HedwigHubTestBase;
+import org.apache.hedwig.server.common.ServerConfiguration;
+
+public class MessageBoundedPersistenceTest extends HedwigHubTestBase {
+    protected static Logger logger = LoggerFactory.getLogger(MessageBoundedPersistenceTest.class);
+
+    protected class SmallReadAheadServerConfiguration
+        extends HedwigHubTestBase.HubServerConfiguration {
+        SmallReadAheadServerConfiguration(int serverPort, int sslServerPort) {
+            super(serverPort, sslServerPort);
+        }
+        public long getMaximumCacheSize() {
+            return 1;
+        }
+
+        public int getReadAheadCount() {
+            return 1;
+        }
+
+        public int getMessagesConsumedThreadRunInterval() {
+            return 1000; // run every second
+        }
+    }
+
+    protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort) {
+        return new SmallReadAheadServerConfiguration(serverPort, sslServerPort);
+    }
+
+    private class MessageBoundClientConfiguration extends ClientConfiguration {
+        final int messageBound;
+
+        public MessageBoundClientConfiguration(int bound) {
+            this.messageBound = bound;
+        }
+
+        public MessageBoundClientConfiguration() {
+            this(5);
+        }
+
+        public int getSubscriptionMessageBound() {
+            return messageBound;
+        }
+    }
+
+    private void sendXExpectLastY(Publisher pub, Subscriber sub,
+                                  ByteString topic, ByteString subid,
+                                  final int X, final int Y) throws Exception {
+        for (int i = 0; i < X; i++) {
+            pub.publish(topic, Message.newBuilder().setBody(
+                                ByteString.copyFromUtf8(String.valueOf(i))).build());
+        }
+        sub.subscribe(topic, subid, CreateOrAttach.ATTACH);
+
+        final AtomicInteger expected = new AtomicInteger(X - Y);
+        final CountDownLatch latch = new CountDownLatch(1);
+        sub.startDelivery(topic, subid, new MessageHandler () {
+                synchronized public void deliver(ByteString topic, ByteString subscriberId,
+                                    Message msg, Callback<Void> callback,
+                                    Object context) {
+                    try {
+                        int value = Integer.valueOf(msg.getBody().toStringUtf8());
+
+                        if (value == expected.get()) {
+                            expected.incrementAndGet();
+                        } else {
+                            // error condition
+                            logger.error("Did not receive expected value, expected {}, got {}",
+                                         expected.get(), value);
+                            expected.set(0);
+                            latch.countDown();
+                        }
+                        if (expected.get() == X) {
+                            latch.countDown();
+                        }
+                        callback.operationFinished(context, null);
+                    } catch (Exception e) {
+                        logger.error("Received bad message", e);
+                        latch.countDown();// will error on match
+                    }
+                }
+            });
+        assertTrue("Timed out waiting for messages Y is " + Y
+                + " expected is currently " + expected.get(), latch.await(10, TimeUnit.SECONDS));
+        assertEquals("Should be expected message with " + X, X, expected.get());
+
+        sub.stopDelivery(topic, subid);
+        sub.closeSubscription(topic, subid);
+    }
+
+    @Test
+    public void testBasicBounding() throws Exception {
+        Client client = new HedwigClient(new MessageBoundClientConfiguration(5));
+        Publisher pub = client.getPublisher();
+        Subscriber sub = client.getSubscriber();
+
+        ByteString topic = ByteString.copyFromUtf8("basicBoundingTopic");
+        ByteString subid = ByteString.copyFromUtf8("basicBoundingSubId");
+        sub.subscribe(topic, subid, CreateOrAttach.CREATE);
+        sub.closeSubscription(topic, subid);
+
+        sendXExpectLastY(pub, sub, topic, subid, 1000, 5);
+
+        client.close();
+    }
+
+    @Test
+    public void testMultipleSubscribers() throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("multiSubTopic");
+
+        Client client = new HedwigClient(new ClientConfiguration());
+        Publisher pub = client.getPublisher();
+        Subscriber sub = client.getSubscriber();
+
+        SubscriptionOptions options5 = SubscriptionOptions.newBuilder()
+            .setCreateOrAttach(CreateOrAttach.CREATE).setMessageBound(5).build();
+        SubscriptionOptions options20 = SubscriptionOptions.newBuilder()
+            .setCreateOrAttach(CreateOrAttach.CREATE).setMessageBound(20).build();
+        SubscriptionOptions optionsUnbounded = SubscriptionOptions.newBuilder()
+            .setCreateOrAttach(CreateOrAttach.CREATE).build();
+
+        ByteString subid5 = ByteString.copyFromUtf8("bound5SubId");
+        ByteString subid20 = ByteString.copyFromUtf8("bound20SubId");
+        ByteString subidUnbounded = ByteString.copyFromUtf8("noboundSubId");
+
+        sub.subscribe(topic, subid5, options5);
+        sub.closeSubscription(topic, subid5);
+        sendXExpectLastY(pub, sub, topic, subid5, 1000, 5);
+
+        sub.subscribe(topic, subid20, options20);
+        sub.closeSubscription(topic, subid20);
+        sendXExpectLastY(pub, sub, topic, subid20, 1000, 20);
+
+        sub.subscribe(topic, subidUnbounded, optionsUnbounded);
+        sub.closeSubscription(topic, subidUnbounded);
+
+        sendXExpectLastY(pub, sub, topic, subidUnbounded, 10000, 10000);
+        sub.unsubscribe(topic, subidUnbounded);
+
+        sendXExpectLastY(pub, sub, topic, subid20, 1000, 20);
+        sub.unsubscribe(topic, subid20);
+
+        sendXExpectLastY(pub, sub, topic, subid5, 1000, 5);
+        sub.unsubscribe(topic, subid5);
+
+        client.close();
+    }
+
+    @Test
+    public void testLedgerGC() throws Exception {
+        Client client = new HedwigClient(new MessageBoundClientConfiguration());
+        Publisher pub = client.getPublisher();
+        Subscriber sub = client.getSubscriber();
+
+        String ledgersPath = "/hedwig/standalone/topics/testGCTopic/ledgers";
+        ByteString topic = ByteString.copyFromUtf8("testGCTopic");
+        ByteString subid = ByteString.copyFromUtf8("testGCSubId");
+        sub.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+        sub.closeSubscription(topic, subid);
+
+        for (int i = 1; i <= 100; i++) {
+            pub.publish(topic, Message.newBuilder().setBody(
+                                ByteString.copyFromUtf8(String.valueOf(i))).build());
+        }
+        LedgerRanges r = LedgerRanges.parseFrom(bktb.getZooKeeperClient().getData(ledgersPath, false, null));
+        assertEquals("Should only have 1 ledger yet", 1, r.getRangesList().size());
+        long firstLedger = r.getRangesList().get(0).getLedgerId();
+
+        stopHubServers();
+        startHubServers();
+
+        pub.publish(topic, Message.newBuilder().setBody(
+                            ByteString.copyFromUtf8(String.valueOf(0xdeadbeef))).build());
+
+        r = LedgerRanges.parseFrom(bktb.getZooKeeperClient().getData(ledgersPath, false, null));
+        assertEquals("Should have 2 ledgers after restart", 2, r.getRangesList().size());
+
+        for (int i = 100; i <= 200; i++) {
+            pub.publish(topic, Message.newBuilder().setBody(
+                                ByteString.copyFromUtf8(String.valueOf(i))).build());
+        }
+        Thread.sleep(5000); // give GC a chance to happen
+
+        r = LedgerRanges.parseFrom(bktb.getZooKeeperClient().getData(ledgersPath, false, null));
+        long secondLedger = r.getRangesList().get(0).getLedgerId();
+
+        assertEquals("Should only have 1 ledger after GC", 1, r.getRangesList().size());
+
+        // ensure original ledger doesn't exist
+        String firstLedgerPath = String.format("/ledgers/L%010d", firstLedger);
+        String secondLedgerPath = String.format("/ledgers/L%010d", secondLedger);
+        assertNull("Ledger should not exist", bktb.getZooKeeperClient().exists(firstLedgerPath, false));
+        assertNotNull("Ledger should exist", bktb.getZooKeeperClient().exists(secondLedgerPath, false));
+
+        client.close();
+    }
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java Wed Mar 14 11:29:03 2012
@@ -42,6 +42,18 @@ public class StubPersistenceManager impl
         // noop
     }
 
+    public void setMessageBound(ByteString topic, Integer bound) {
+        // noop
+    }
+
+    public void clearMessageBound(ByteString topic) {
+        // noop
+    }
+
+    public void consumeToBound(ByteString topic) {
+        // noop
+    }
+
     protected static class ArrayListMessageFactory implements Factory<List<Message>> {
         static ArrayListMessageFactory instance = new ArrayListMessageFactory();