You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/26 11:55:51 UTC
svn commit: r1402459 - in /zookeeper/bookkeeper/trunk: ./
hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/
Author: ivank
Date: Fri Oct 26 09:55:51 2012
New Revision: 1402459
URL: http://svn.apache.org/viewvc?rev=1402459&view=rev
Log:
BOOKKEEPER-441: InMemorySubscriptionManager should back up top2sub2seq before change it (Yixue via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1402459&r1=1402458&r2=1402459&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Oct 26 09:55:51 2012
@@ -196,6 +196,8 @@ Trunk (unreleased changes)
BOOKKEEPER-411: Add CloseSubscription Request for multiplexing support (sijie via ivank)
+ BOOKKEEPER-441: InMemorySubscriptionManager should back up top2sub2seq before change it (Yixue via ivank)
+
hedwig-client:
BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1402459&r1=1402458&r2=1402459&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Fri Oct 26 09:55:51 2012
@@ -236,12 +236,8 @@ public abstract class AbstractSubscripti
class ReleaseOp extends TopicOpQueuer.AsynchronousOp<Void> {
- final boolean removeStates;
-
- public ReleaseOp(final ByteString topic, final Callback<Void> cb, Object ctx,
- boolean removeStates) {
+ public ReleaseOp(final ByteString topic, final Callback<Void> cb, Object ctx) {
queuer.super(topic, cb, ctx);
- this.removeStates = removeStates;
}
@Override
@@ -263,12 +259,7 @@ public abstract class AbstractSubscripti
private void finish() {
// tell delivery manager to stop delivery for subscriptions of this topic
- final Map<ByteString, InMemorySubscriptionState> topicSubscriptions;
- if (removeStates) {
- topicSubscriptions = top2sub2seq.remove(topic);
- } else {
- topicSubscriptions = top2sub2seq.get(topic);
- }
+ final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.remove(topic);
// no subscriptions now, it may be removed by other release ops
if (null != topicSubscriptions) {
for (ByteString subId : topicSubscriptions.keySet()) {
@@ -322,7 +313,7 @@ public abstract class AbstractSubscripti
*/
@Override
public void lostTopic(ByteString topic) {
- queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null, true));
+ queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null));
}
private void notifyLastLocalUnsubscribe(ByteString topic) {
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java?rev=1402459&r1=1402458&r2=1402459&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java Fri Oct 26 09:55:51 2012
@@ -30,6 +30,9 @@ import org.apache.hedwig.server.topics.T
import org.apache.hedwig.util.Callback;
public class InMemorySubscriptionManager extends AbstractSubscriptionManager {
+ // Backup for top2sub2seq
+ final ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>> top2sub2seqBackup =
+ new ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>>();
public InMemorySubscriptionManager(ServerConfiguration conf,
TopicManager tm, PersistenceManager pm,
@@ -72,19 +75,22 @@ public class InMemorySubscriptionManager
@Override
public void lostTopic(ByteString topic) {
- // Intentionally do nothing, so that we dont lose in-memory information
+ // Backup topic-sub2seq map for readSubscriptions
+ final Map<ByteString, InMemorySubscriptionState> sub2seq = top2sub2seq.get(topic);
+ if (null != sub2seq)
+ top2sub2seqBackup.put(topic, sub2seq);
+
if (logger.isDebugEnabled()) {
logger.debug("InMemorySubscriptionManager is losing topic " + topic.toStringUtf8());
}
- queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null, false));
+ queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null));
}
@Override
protected void readSubscriptions(ByteString topic,
Callback<Map<ByteString, InMemorySubscriptionState>> cb, Object ctx) {
- // Since we don't lose in-memory information on lostTopic, we can just
- // return that back
- Map<ByteString, InMemorySubscriptionState> topicSubs = top2sub2seq.get(topic);
+ // Since we backed up in-memory information on lostTopic, we can just return that back
+ Map<ByteString, InMemorySubscriptionState> topicSubs = top2sub2seqBackup.remove(topic);
if (topicSubs != null) {
cb.operationFinished(ctx, topicSubs);