You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/26 11:55:51 UTC

svn commit: r1402459 - in /zookeeper/bookkeeper/trunk: ./ hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/

Author: ivank
Date: Fri Oct 26 09:55:51 2012
New Revision: 1402459

URL: http://svn.apache.org/viewvc?rev=1402459&view=rev
Log:
BOOKKEEPER-441: InMemorySubscriptionManager should back up top2sub2seq before change it (Yixue via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1402459&r1=1402458&r2=1402459&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Oct 26 09:55:51 2012
@@ -196,6 +196,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-411: Add CloseSubscription Request for multiplexing support (sijie via ivank)
 
+        BOOKKEEPER-441: InMemorySubscriptionManager should back up top2sub2seq before change it (Yixue via ivank)
+
       hedwig-client:
 
         BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1402459&r1=1402458&r2=1402459&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Fri Oct 26 09:55:51 2012
@@ -236,12 +236,8 @@ public abstract class AbstractSubscripti
 
     class ReleaseOp extends TopicOpQueuer.AsynchronousOp<Void> {
 
-        final boolean removeStates;
-
-        public ReleaseOp(final ByteString topic, final Callback<Void> cb, Object ctx,
-                         boolean removeStates) {
+        public ReleaseOp(final ByteString topic, final Callback<Void> cb, Object ctx) {
             queuer.super(topic, cb, ctx);
-            this.removeStates = removeStates;
         }
 
         @Override
@@ -263,12 +259,7 @@ public abstract class AbstractSubscripti
 
                 private void finish() {
                     // tell delivery manager to stop delivery for subscriptions of this topic
-                    final Map<ByteString, InMemorySubscriptionState> topicSubscriptions;
-                    if (removeStates) {
-                        topicSubscriptions = top2sub2seq.remove(topic);
-                    } else {
-                        topicSubscriptions = top2sub2seq.get(topic);
-                    }
+                    final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.remove(topic);
                     // no subscriptions now, it may be removed by other release ops
                     if (null != topicSubscriptions) {
                         for (ByteString subId : topicSubscriptions.keySet()) {
@@ -322,7 +313,7 @@ public abstract class AbstractSubscripti
      */
     @Override
     public void lostTopic(ByteString topic) {
-        queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null, true));
+        queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null));
     }
 
     private void notifyLastLocalUnsubscribe(ByteString topic) {

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java?rev=1402459&r1=1402458&r2=1402459&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java Fri Oct 26 09:55:51 2012
@@ -30,6 +30,9 @@ import org.apache.hedwig.server.topics.T
 import org.apache.hedwig.util.Callback;
 
 public class InMemorySubscriptionManager extends AbstractSubscriptionManager {
+    // Backup for top2sub2seq
+    final ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>> top2sub2seqBackup =
+        new ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>>();
 
     public InMemorySubscriptionManager(ServerConfiguration conf,
                                        TopicManager tm, PersistenceManager pm,
@@ -72,19 +75,22 @@ public class InMemorySubscriptionManager
 
     @Override
     public void lostTopic(ByteString topic) {
-        // Intentionally do nothing, so that we dont lose in-memory information
+        // Backup topic-sub2seq map for readSubscriptions
+        final Map<ByteString, InMemorySubscriptionState> sub2seq = top2sub2seq.get(topic);
+        if (null != sub2seq)
+            top2sub2seqBackup.put(topic, sub2seq);
+
         if (logger.isDebugEnabled()) {
             logger.debug("InMemorySubscriptionManager is losing topic " + topic.toStringUtf8());
         }
-        queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null, false));
+        queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null));
     }
 
     @Override
     protected void readSubscriptions(ByteString topic,
                                      Callback<Map<ByteString, InMemorySubscriptionState>> cb, Object ctx) {
-        // Since we don't lose in-memory information on lostTopic, we can just
-        // return that back
-        Map<ByteString, InMemorySubscriptionState> topicSubs = top2sub2seq.get(topic);
+        // Since we backed up in-memory information on lostTopic, we can just return that back
+        Map<ByteString, InMemorySubscriptionState> topicSubs = top2sub2seqBackup.remove(topic);
 
         if (topicSubs != null) {
             cb.operationFinished(ctx, topicSubs);