You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2012/10/17 11:19:50 UTC

svn commit: r1399159 - in /zookeeper/bookkeeper/trunk: CHANGES.txt hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java

Author: fpj
Date: Wed Oct 17 09:19:49 2012
New Revision: 1399159

URL: http://svn.apache.org/viewvc?rev=1399159&view=rev
Log:
BOOKKEEPER-422: Simplify AbstractSubscriptionManager (stu via fpj)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1399159&r1=1399158&r2=1399159&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Oct 17 09:19:49 2012
@@ -182,6 +182,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-415: Rename DeliveryThrottle to MessageWindowSize (ivank via sijie)
 
+	BOOKKEEPER-422: Simplify AbstractSubscriptionManager (stu via fpj)
+
       hedwig-client:
 
         BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1399159&r1=1399158&r2=1399159&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Wed Oct 17 09:19:49 2012
@@ -52,13 +52,13 @@ import org.apache.hedwig.util.Concurrenc
 
 public abstract class AbstractSubscriptionManager implements SubscriptionManager, TopicOwnershipChangeListener {
 
-    ServerConfiguration cfg;
-    ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>> top2sub2seq = new ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>>();
     static Logger logger = LoggerFactory.getLogger(AbstractSubscriptionManager.class);
 
-    TopicOpQueuer queuer;
+    protected final ServerConfiguration cfg;
+    protected final ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>> top2sub2seq =
+      new ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>>();
+    protected final TopicOpQueuer queuer;
     private final ArrayList<SubscriptionEventListener> listeners = new ArrayList<SubscriptionEventListener>();
-    private final ConcurrentHashMap<ByteString, AtomicInteger> topic2LocalCounts = new ConcurrentHashMap<ByteString, AtomicInteger>();
 
     // Handle to the DeliveryManager for the server so we can stop serving subscribers
     // when losing topics
@@ -75,7 +75,7 @@ public abstract class AbstractSubscripti
     // for all subscribers to the topic.
     private final ConcurrentHashMap<ByteString, Long> topic2MinConsumedMessagesMap = new ConcurrentHashMap<ByteString, Long>();
 
-    Callback<Void> noopCallback = new NoopCallback<Void>();
+    protected final Callback<Void> noopCallback = new NoopCallback<Void>();
 
     static class NoopCallback<T> implements Callback<T> {
         @Override
@@ -177,15 +177,6 @@ public abstract class AbstractSubscripti
                     // number for bookkeeping so that future
                     // subscribes/unsubscribes can efficiently notify listeners.
 
-                    // Count the number of local subscribers we just inherited.
-                    // This loop is OK since the number of subscribers per topic
-                    // is expected to be small.
-                    int localCount = 0;
-                    for (ByteString subscriberId : resultOfOperation.keySet())
-                        if (!SubscriptionStateUtils.isHubSubscriber(subscriberId))
-                            localCount++;
-                    topic2LocalCounts.put(topic, new AtomicInteger(localCount));
-
                     // The final "commit" (and "abort") operations.
                     final Callback<Void> cb2 = new Callback<Void>() {
 
@@ -206,8 +197,8 @@ public abstract class AbstractSubscripti
                     };
 
                     // Notify listeners if necessary.
-                    if (localCount > 0) {
-                        notifySubscribe(topic, false, cb2, ctx);
+                    if (hasLocalSubscriptions(resultOfOperation)) {
+                        notifyFirstLocalSubscribe(topic, false, cb2, ctx);
                     } else {
                         cb2.operationFinished(ctx, null);
                     }
@@ -221,7 +212,7 @@ public abstract class AbstractSubscripti
 
     }
 
-    private void notifySubscribe(ByteString topic, boolean synchronous, final Callback<Void> cb, final Object ctx) {
+    private void notifyFirstLocalSubscribe(ByteString topic, boolean synchronous, final Callback<Void> cb, final Object ctx) {
         Callback<Void> mcb = CallbackUtils.multiCallback(listeners.size(), cb, ctx);
         for (SubscriptionEventListener listener : listeners) {
             listener.onFirstLocalSubscribe(topic, synchronous, mcb);
@@ -292,11 +283,10 @@ public abstract class AbstractSubscripti
                     if (logger.isDebugEnabled()) {
                         logger.debug("Stop serving topic " + topic.toStringUtf8());
                     }
-                    topic2LocalCounts.remove(topic);
                     // Since we decrement local count when some of remote subscriptions failed,
                     // while we don't unsubscribe those succeed subscriptions. so we can't depends
                     // on local count, just try to notify unsubscribe.
-                    notifyUnsubcribe(topic);
+                    notifyLastLocalUnsubscribe(topic);
                     cb.operationFinished(ctx, null);
                 }
             };
@@ -333,7 +323,7 @@ public abstract class AbstractSubscripti
         queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null, true));
     }
 
-    private void notifyUnsubcribe(ByteString topic) {
+    private void notifyLastLocalUnsubscribe(ByteString topic) {
         for (SubscriptionEventListener listener : listeners)
             listener.onLastLocalUnsubscribe(topic);
     }
@@ -454,7 +444,6 @@ public abstract class AbstractSubscripti
                 @Override
                 public void operationFinished(Object ctx, Void resultOfOperation) {
                     Callback<Void> cb2 = new Callback<Void>() {
-
                         @Override
                         public void operationFailed(final Object ctx, final PubSubException exception) {
                             logger.error("subscription for subscriber " + subscriberId.toStringUtf8() + " to topic "
@@ -474,15 +463,6 @@ public abstract class AbstractSubscripti
                                     finish();
                                 }
                                 private void finish() {
-                                    // we should decrement local count when remote subscription failed
-                                    if (!SubscriptionStateUtils.isHubSubscriber(subRequest.getSubscriberId())) {
-                                        // since the subscribe op is executed one by one in queue order,
-                                        // so the following codes only happened when remote subscription failed.
-                                        // it is safe to decrement the local count so next subscribe op
-                                        // could have the chance to subscribe remote.
-                                        AtomicInteger count = topic2LocalCounts.get(topic);
-                                        if (count != null) { count.decrementAndGet(); }
-                                    }
                                     cb.operationFailed(ctx, exception);
                                 }
                             }, ctx);
@@ -499,11 +479,10 @@ public abstract class AbstractSubscripti
 
                     };
 
-                    AtomicInteger count = topic2LocalCounts.get(topic);
+                    // if this will be the first local subscription, notifyFirstLocalSubscribe
                     if (!SubscriptionStateUtils.isHubSubscriber(subRequest.getSubscriberId())
-                        && count != null
-                        && count.incrementAndGet() == 1)
-                        notifySubscribe(topic, subRequest.getSynchronous(), cb2, ctx);
+                        && !hasLocalSubscriptions(topicSubscriptions))
+                        notifyFirstLocalSubscribe(topic, subRequest.getSynchronous(), cb2, ctx);
                     else
                         cb2.operationFinished(ctx, resultOfOperation);
                 }
@@ -511,6 +490,17 @@ public abstract class AbstractSubscripti
         }
     }
 
+    /**
+     * @return True if the given subscriberId-to-subscriberState map contains a local subscription:
+     * the vast majority of subscriptions are local, so we will quickly encounter one if it exists.
+     */
+    private static boolean hasLocalSubscriptions(Map<ByteString, InMemorySubscriptionState> topicSubscriptions) {
+      for (ByteString subId : topicSubscriptions.keySet())
+        if (!SubscriptionStateUtils.isHubSubscriber(subId))
+          return true;
+      return false;
+    }
+
     public void updateMessageBound(ByteString topic) {
         final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
         if (topicSubscriptions == null) {
@@ -619,10 +609,9 @@ public abstract class AbstractSubscripti
                 public void operationFinished(Object ctx, Void resultOfOperation) {
                     topicSubscriptions.remove(subscriberId);
                     // Notify listeners if necessary.
-                    AtomicInteger count = topic2LocalCounts.get(topic);
                     if (!SubscriptionStateUtils.isHubSubscriber(subscriberId)
-                        && count != null && count.decrementAndGet() == 0)
-                        notifyUnsubcribe(topic);
+                        && !hasLocalSubscriptions(topicSubscriptions))
+                        notifyLastLocalUnsubscribe(topic);
 
                     updateMessageBound(topic);
                     cb.operationFinished(ctx, null);