You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2012/10/17 11:19:50 UTC
svn commit: r1399159 - in /zookeeper/bookkeeper/trunk: CHANGES.txt
hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
Author: fpj
Date: Wed Oct 17 09:19:49 2012
New Revision: 1399159
URL: http://svn.apache.org/viewvc?rev=1399159&view=rev
Log:
BOOKKEEPER-422: Simplify AbstractSubscriptionManager (stu via fpj)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1399159&r1=1399158&r2=1399159&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Oct 17 09:19:49 2012
@@ -182,6 +182,8 @@ Trunk (unreleased changes)
BOOKKEEPER-415: Rename DeliveryThrottle to MessageWindowSize (ivank via sijie)
+ BOOKKEEPER-422: Simplify AbstractSubscriptionManager (stu via fpj)
+
hedwig-client:
BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1399159&r1=1399158&r2=1399159&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Wed Oct 17 09:19:49 2012
@@ -52,13 +52,13 @@ import org.apache.hedwig.util.Concurrenc
public abstract class AbstractSubscriptionManager implements SubscriptionManager, TopicOwnershipChangeListener {
- ServerConfiguration cfg;
- ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>> top2sub2seq = new ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>>();
static Logger logger = LoggerFactory.getLogger(AbstractSubscriptionManager.class);
- TopicOpQueuer queuer;
+ protected final ServerConfiguration cfg;
+ protected final ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>> top2sub2seq =
+ new ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>>();
+ protected final TopicOpQueuer queuer;
private final ArrayList<SubscriptionEventListener> listeners = new ArrayList<SubscriptionEventListener>();
- private final ConcurrentHashMap<ByteString, AtomicInteger> topic2LocalCounts = new ConcurrentHashMap<ByteString, AtomicInteger>();
// Handle to the DeliveryManager for the server so we can stop serving subscribers
// when losing topics
@@ -75,7 +75,7 @@ public abstract class AbstractSubscripti
// for all subscribers to the topic.
private final ConcurrentHashMap<ByteString, Long> topic2MinConsumedMessagesMap = new ConcurrentHashMap<ByteString, Long>();
- Callback<Void> noopCallback = new NoopCallback<Void>();
+ protected final Callback<Void> noopCallback = new NoopCallback<Void>();
static class NoopCallback<T> implements Callback<T> {
@Override
@@ -177,15 +177,6 @@ public abstract class AbstractSubscripti
// number for bookkeeping so that future
// subscribes/unsubscribes can efficiently notify listeners.
- // Count the number of local subscribers we just inherited.
- // This loop is OK since the number of subscribers per topic
- // is expected to be small.
- int localCount = 0;
- for (ByteString subscriberId : resultOfOperation.keySet())
- if (!SubscriptionStateUtils.isHubSubscriber(subscriberId))
- localCount++;
- topic2LocalCounts.put(topic, new AtomicInteger(localCount));
-
// The final "commit" (and "abort") operations.
final Callback<Void> cb2 = new Callback<Void>() {
@@ -206,8 +197,8 @@ public abstract class AbstractSubscripti
};
// Notify listeners if necessary.
- if (localCount > 0) {
- notifySubscribe(topic, false, cb2, ctx);
+ if (hasLocalSubscriptions(resultOfOperation)) {
+ notifyFirstLocalSubscribe(topic, false, cb2, ctx);
} else {
cb2.operationFinished(ctx, null);
}
@@ -221,7 +212,7 @@ public abstract class AbstractSubscripti
}
- private void notifySubscribe(ByteString topic, boolean synchronous, final Callback<Void> cb, final Object ctx) {
+ private void notifyFirstLocalSubscribe(ByteString topic, boolean synchronous, final Callback<Void> cb, final Object ctx) {
Callback<Void> mcb = CallbackUtils.multiCallback(listeners.size(), cb, ctx);
for (SubscriptionEventListener listener : listeners) {
listener.onFirstLocalSubscribe(topic, synchronous, mcb);
@@ -292,11 +283,10 @@ public abstract class AbstractSubscripti
if (logger.isDebugEnabled()) {
logger.debug("Stop serving topic " + topic.toStringUtf8());
}
- topic2LocalCounts.remove(topic);
// Since we decrement local count when some of remote subscriptions failed,
// while we don't unsubscribe those succeed subscriptions. so we can't depends
// on local count, just try to notify unsubscribe.
- notifyUnsubcribe(topic);
+ notifyLastLocalUnsubscribe(topic);
cb.operationFinished(ctx, null);
}
};
@@ -333,7 +323,7 @@ public abstract class AbstractSubscripti
queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null, true));
}
- private void notifyUnsubcribe(ByteString topic) {
+ private void notifyLastLocalUnsubscribe(ByteString topic) {
for (SubscriptionEventListener listener : listeners)
listener.onLastLocalUnsubscribe(topic);
}
@@ -454,7 +444,6 @@ public abstract class AbstractSubscripti
@Override
public void operationFinished(Object ctx, Void resultOfOperation) {
Callback<Void> cb2 = new Callback<Void>() {
-
@Override
public void operationFailed(final Object ctx, final PubSubException exception) {
logger.error("subscription for subscriber " + subscriberId.toStringUtf8() + " to topic "
@@ -474,15 +463,6 @@ public abstract class AbstractSubscripti
finish();
}
private void finish() {
- // we should decrement local count when remote subscription failed
- if (!SubscriptionStateUtils.isHubSubscriber(subRequest.getSubscriberId())) {
- // since the subscribe op is executed one by one in queue order,
- // so the following codes only happened when remote subscription failed.
- // it is safe to decrement the local count so next subscribe op
- // could have the chance to subscribe remote.
- AtomicInteger count = topic2LocalCounts.get(topic);
- if (count != null) { count.decrementAndGet(); }
- }
cb.operationFailed(ctx, exception);
}
}, ctx);
@@ -499,11 +479,10 @@ public abstract class AbstractSubscripti
};
- AtomicInteger count = topic2LocalCounts.get(topic);
+ // if this will be the first local subscription, notifyFirstLocalSubscribe
if (!SubscriptionStateUtils.isHubSubscriber(subRequest.getSubscriberId())
- && count != null
- && count.incrementAndGet() == 1)
- notifySubscribe(topic, subRequest.getSynchronous(), cb2, ctx);
+ && !hasLocalSubscriptions(topicSubscriptions))
+ notifyFirstLocalSubscribe(topic, subRequest.getSynchronous(), cb2, ctx);
else
cb2.operationFinished(ctx, resultOfOperation);
}
@@ -511,6 +490,17 @@ public abstract class AbstractSubscripti
}
}
+ /**
+ * @return True if the given subscriberId-to-subscriberState map contains a local subscription:
+ * the vast majority of subscriptions are local, so we will quickly encounter one if it exists.
+ */
+ private static boolean hasLocalSubscriptions(Map<ByteString, InMemorySubscriptionState> topicSubscriptions) {
+ for (ByteString subId : topicSubscriptions.keySet())
+ if (!SubscriptionStateUtils.isHubSubscriber(subId))
+ return true;
+ return false;
+ }
+
public void updateMessageBound(ByteString topic) {
final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
if (topicSubscriptions == null) {
@@ -619,10 +609,9 @@ public abstract class AbstractSubscripti
public void operationFinished(Object ctx, Void resultOfOperation) {
topicSubscriptions.remove(subscriberId);
// Notify listeners if necessary.
- AtomicInteger count = topic2LocalCounts.get(topic);
if (!SubscriptionStateUtils.isHubSubscriber(subscriberId)
- && count != null && count.decrementAndGet() == 0)
- notifyUnsubcribe(topic);
+ && !hasLocalSubscriptions(topicSubscriptions))
+ notifyLastLocalUnsubscribe(topic);
updateMessageBound(topic);
cb.operationFinished(ctx, null);