You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:20 UTC

[10/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
deleted file mode 100644
index eaed39d..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
+++ /dev/null
@@ -1,798 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.protobuf.ByteString;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protoextensions.MessageIdUtils;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.common.TopicOpQueuer;
-import org.apache.hedwig.server.delivery.DeliveryManager;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.CallbackUtils;
-import org.apache.hedwig.util.ConcurrencyUtils;
-
-public abstract class AbstractSubscriptionManager implements SubscriptionManager, TopicOwnershipChangeListener {
-
-    private static final Logger logger = LoggerFactory.getLogger(AbstractSubscriptionManager.class);
-
-    protected final ServerConfiguration cfg;
-    protected final ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>> top2sub2seq =
-      new ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>>();
-    protected final TopicOpQueuer queuer;
-    private final ArrayList<SubscriptionEventListener> listeners = new ArrayList<SubscriptionEventListener>();
-
-    // Handle to the DeliveryManager for the server so we can stop serving subscribers
-    // when losing topics
-    private final DeliveryManager dm;
-    // Handle to the PersistenceManager for the server so we can pass along the
-    // message consume pointers for each topic.
-    private final PersistenceManager pm;
-    // Timer for running a recurring thread task to get the minimum message
-    // sequence ID for each topic that all subscribers for it have consumed
-    // already. With that information, we can call the PersistenceManager to
-    // update it on the messages that are safe to be garbage collected.
-    private final Timer timer = new Timer(true);
-    // In memory mapping of topics to the minimum consumed message sequence ID
-    // for all subscribers to the topic.
-    private final ConcurrentHashMap<ByteString, Long> topic2MinConsumedMessagesMap = new ConcurrentHashMap<ByteString, Long>();
-
-    protected final Callback<Void> noopCallback = new NoopCallback<Void>();
-
-    static class NoopCallback<T> implements Callback<T> {
-        @Override
-        public void operationFailed(Object ctx, PubSubException exception) {
-            logger.warn("Exception found in AbstractSubscriptionManager : ", exception);
-        }
-
-        public void operationFinished(Object ctx, T resultOfOperation) {
-        };
-    }
-
-    public AbstractSubscriptionManager(ServerConfiguration cfg, TopicManager tm,
-                                       PersistenceManager pm, DeliveryManager dm,
-                                       ScheduledExecutorService scheduler) {
-        this.cfg = cfg;
-        queuer = new TopicOpQueuer(scheduler);
-        tm.addTopicOwnershipChangeListener(this);
-        this.pm = pm;
-        this.dm = dm;
-        // Schedule the recurring MessagesConsumedTask only if a
-        // PersistenceManager is passed.
-        if (pm != null) {
-            timer.schedule(new MessagesConsumedTask(), 0, cfg.getMessagesConsumedThreadRunInterval());
-        }
-    }
-
-    /**
-     * This is the Timer Task for finding out for each topic, what the minimum
-     * consumed message by the subscribers are. This information is used to pass
-     * along to the server's PersistenceManager so it can garbage collect older
-     * topic messages that are no longer needed by the subscribers.
-     */
-    class MessagesConsumedTask extends TimerTask {
-        /**
-         * Implement the TimerTask's abstract run method.
-         */
-        @Override
-        public void run() {
-            // We are looping through relatively small in memory data structures
-            // so it should be safe to run this fairly often.
-            for (ByteString topic : top2sub2seq.keySet()) {
-                final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
-                if (topicSubscriptions == null) {
-                    continue;
-                }
-
-                long minConsumedMessage = Long.MAX_VALUE;
-                boolean hasBound = true;
-                // Loop through all subscribers on the current topic to find the
-                // minimum persisted message id. The reason not using in-memory
-                // consumed message id is LedgerRangs and InMemorySubscriptionState
-                // may be inconsistent in case of a server crash.
-                for (InMemorySubscriptionState curSubscription : topicSubscriptions.values()) {
-                    if (curSubscription.getLastPersistedSeqId() < minConsumedMessage) {
-                        minConsumedMessage = curSubscription.getLastPersistedSeqId();
-                    }
-                    hasBound = hasBound && curSubscription.getSubscriptionPreferences().hasMessageBound();
-                }
-                boolean callPersistenceManager = true;
-                // Call the PersistenceManager if nobody subscribes to the topic
-                // yet, or the consume pointer has moved ahead since the last
-                // time, or if this is the initial subscription.
-                Long minConsumedFromMap = topic2MinConsumedMessagesMap.get(topic);
-                if (topicSubscriptions.isEmpty()
-                    || (minConsumedFromMap != null && minConsumedFromMap < minConsumedMessage)
-                    || (minConsumedFromMap == null && minConsumedMessage != 0)) {
-                    // Replace or put the new min consumed value. If it has changed
-                    // do nothing, as another thread has updated the min consumed message
-                    if ((minConsumedFromMap != null
-                         && (topic2MinConsumedMessagesMap.replace(topic, minConsumedFromMap,
-                                                                  minConsumedMessage)))
-                        || (topic2MinConsumedMessagesMap.putIfAbsent(topic, minConsumedMessage) == null)) {
-                        pm.consumedUntil(topic, minConsumedMessage);
-                    }
-                } else if (hasBound) {
-                    pm.consumeToBound(topic);
-                }
-            }
-        }
-    }
-
-    private class AcquireOp extends TopicOpQueuer.AsynchronousOp<Void> {
-        public AcquireOp(ByteString topic, Callback<Void> callback, Object ctx) {
-            queuer.super(topic, callback, ctx);
-        }
-
-        @Override
-        public void run() {
-            if (top2sub2seq.containsKey(topic)) {
-                cb.operationFinished(ctx, null);
-                return;
-            }
-
-            readSubscriptions(topic, new Callback<Map<ByteString, InMemorySubscriptionState>>() {
-                @Override
-                public void operationFailed(Object ctx, PubSubException exception) {
-                    cb.operationFailed(ctx, exception);
-                }
-
-                @Override
-                public void operationFinished(final Object ctx,
-                final Map<ByteString, InMemorySubscriptionState> resultOfOperation) {
-                    // We've just inherited a bunch of subscriber for this
-                    // topic, some of which may be local. If they are, then we
-                    // need to (1) notify listeners of this and (2) record the
-                    // number for bookkeeping so that future
-                    // subscribes/unsubscribes can efficiently notify listeners.
-
-                    // The final "commit" (and "abort") operations.
-                    final Callback<Void> cb2 = new Callback<Void>() {
-
-                        @Override
-                        public void operationFailed(Object ctx, PubSubException exception) {
-                            logger.error("Subscription manager failed to acquired topic " + topic.toStringUtf8(),
-                                         exception);
-                            cb.operationFailed(ctx, null);
-                        }
-
-                        @Override
-                        public void operationFinished(Object ctx, Void voidObj) {
-                            top2sub2seq.put(topic, resultOfOperation);
-                            logger.info("Subscription manager successfully acquired topic: " + topic.toStringUtf8());
-                            cb.operationFinished(ctx, null);
-                        }
-
-                    };
-
-                    // Notify listeners if necessary.
-                    if (hasLocalSubscriptions(resultOfOperation)) {
-                        notifyFirstLocalSubscribe(topic, false, cb2, ctx);
-                    } else {
-                        cb2.operationFinished(ctx, null);
-                    }
-
-                    updateMessageBound(topic);
-                }
-
-            }, ctx);
-
-        }
-
-    }
-
-    private void notifyFirstLocalSubscribe(ByteString topic, boolean synchronous, final Callback<Void> cb, final Object ctx) {
-        Callback<Void> mcb = CallbackUtils.multiCallback(listeners.size(), cb, ctx);
-        for (SubscriptionEventListener listener : listeners) {
-            listener.onFirstLocalSubscribe(topic, synchronous, mcb);
-        }
-    }
-
-    /**
-     * Figure out who is subscribed. Do nothing if already acquired. If there's
-     * an error reading the subscribers' sequence IDs, then the topic is not
-     * acquired.
-     *
-     * @param topic
-     * @param callback
-     * @param ctx
-     */
-    @Override
-    public void acquiredTopic(final ByteString topic, final Callback<Void> callback, Object ctx) {
-        queuer.pushAndMaybeRun(topic, new AcquireOp(topic, callback, ctx));
-    }
-
-    class ReleaseOp extends TopicOpQueuer.AsynchronousOp<Void> {
-
-        public ReleaseOp(final ByteString topic, final Callback<Void> cb, Object ctx) {
-            queuer.super(topic, cb, ctx);
-        }
-
-        @Override
-        public void run() {
-            Callback<Void> finalCb = new Callback<Void>() {
-                @Override
-                public void operationFinished(Object ctx, Void resultOfOperation) {
-                    logger.info("Finished update subscription states when losting topic "
-                              + topic.toStringUtf8());
-                    finish();
-                }
-
-                @Override
-                public void operationFailed(Object ctx,
-                        PubSubException exception) {
-                    logger.warn("Error when releasing topic : " + topic.toStringUtf8(), exception);
-                    finish();
-                }
-
-                private void finish() {
-                    // tell delivery manager to stop delivery for subscriptions of this topic
-                    final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.remove(topic);
-                    // no subscriptions now, it may be removed by other release ops
-                    if (null != topicSubscriptions) {
-                        for (ByteString subId : topicSubscriptions.keySet()) {
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("Stop serving subscriber (" + topic.toStringUtf8() + ", "
-                                           + subId.toStringUtf8() + ") when losing topic");
-                            }
-                            if (null != dm) {
-                                dm.stopServingSubscriber(topic, subId, SubscriptionEvent.TOPIC_MOVED,
-                                                         noopCallback, null);
-                            }
-                        }
-                    }
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Stop serving topic " + topic.toStringUtf8());
-                    }
-                    // Since we decrement local count when some of remote subscriptions failed,
-                    // while we don't unsubscribe those succeed subscriptions. so we can't depends
-                    // on local count, just try to notify unsubscribe.
-                    notifyLastLocalUnsubscribe(topic);
-                    cb.operationFinished(ctx, null);
-                }
-            };
-            if (logger.isDebugEnabled()) {
-                logger.debug("Try to update subscription states when losing topic " + topic.toStringUtf8());
-            }
-            updateSubscriptionStates(topic, finalCb, ctx);
-        }
-    }
-
-    void updateSubscriptionStates(ByteString topic, Callback<Void> finalCb, Object ctx) {
-        // Try to update subscription states of a specified topic
-        Map<ByteString, InMemorySubscriptionState> states = top2sub2seq.get(topic);
-        if (null == states) {
-            finalCb.operationFinished(ctx, null);
-        } else {
-            Callback<Void> mcb = CallbackUtils.multiCallback(states.size(), finalCb, ctx);
-            for (Entry<ByteString, InMemorySubscriptionState> entry : states.entrySet()) {
-                InMemorySubscriptionState memState = entry.getValue();
-                if (memState.setLastConsumeSeqIdImmediately()) {
-                    updateSubscriptionState(topic, entry.getKey(), memState, mcb, ctx);
-                } else {
-                    mcb.operationFinished(ctx, null);
-                }
-            }
-        }
-    }
-
-    /**
-     * Remove the local mapping.
-     */
-    @Override
-    public void lostTopic(ByteString topic) {
-        queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null));
-    }
-
-    private void notifyLastLocalUnsubscribe(ByteString topic) {
-        for (SubscriptionEventListener listener : listeners)
-            listener.onLastLocalUnsubscribe(topic);
-    }
-
-    protected abstract void readSubscriptions(final ByteString topic,
-            final Callback<Map<ByteString, InMemorySubscriptionState>> cb, final Object ctx);
-    
-    protected abstract void readSubscriptionData(final ByteString topic, final ByteString subscriberId, 
-            final Callback<InMemorySubscriptionState> cb, Object ctx);
-    
-    private class SubscribeOp extends TopicOpQueuer.AsynchronousOp<SubscriptionData> {
-        SubscribeRequest subRequest;
-        MessageSeqId consumeSeqId;
-
-        public SubscribeOp(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
-                           Callback<SubscriptionData> callback, Object ctx) {
-            queuer.super(topic, callback, ctx);
-            this.subRequest = subRequest;
-            this.consumeSeqId = consumeSeqId;
-        }
-
-        @Override
-        public void run() {
-
-            final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
-            if (topicSubscriptions == null) {
-                cb.operationFailed(ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
-                return;
-            }
-
-            final ByteString subscriberId = subRequest.getSubscriberId();
-            final InMemorySubscriptionState subscriptionState = topicSubscriptions.get(subscriberId);
-            CreateOrAttach createOrAttach = subRequest.getCreateOrAttach();
-
-            if (subscriptionState != null) {
-
-                if (createOrAttach.equals(CreateOrAttach.CREATE)) {
-                    String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
-                                 + " requested creating a subscription but it is already subscribed with state: "
-                                 + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState());
-                    logger.error(msg);
-                    cb.operationFailed(ctx, new PubSubException.ClientAlreadySubscribedException(msg));
-                    return;
-                }
-
-                // Subscription existed before, check whether new preferences provided
-                // if new preferences provided, merged the subscription data and updated them
-                // TODO: needs ACL mechanism when changing preferences
-                if (subRequest.hasPreferences() &&
-                    subscriptionState.updatePreferences(subRequest.getPreferences())) {
-                    updateSubscriptionPreferences(topic, subscriberId, subscriptionState, new Callback<Void>() {
-                        @Override
-                        public void operationFailed(Object ctx, PubSubException exception) {
-                            cb.operationFailed(ctx, exception);
-                        }
-
-                        @Override
-                        public void operationFinished(Object ctx, Void resultOfOperation) {
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
-                                             + " attaching to subscription with state: "
-                                             + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState())
-                                             + ", with preferences: "
-                                             + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionPreferences()));
-                            }
-                            // update message bound if necessary
-                            updateMessageBound(topic);
-                            cb.operationFinished(ctx, subscriptionState.toSubscriptionData());
-                        }
-                    }, ctx);
-                    return;
-                }
-
-                // otherwise just attach
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
-                                 + " attaching to subscription with state: "
-                                 + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState())
-                                 + ", with preferences: "
-                                 + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionPreferences()));
-                }
-
-                cb.operationFinished(ctx, subscriptionState.toSubscriptionData());
-                return;
-            }
-
-            // we don't have a mapping for this subscriber
-            if (createOrAttach.equals(CreateOrAttach.ATTACH)) {
-                String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
-                             + " requested attaching to an existing subscription but it is not subscribed";
-                logger.error(msg);
-                cb.operationFailed(ctx, new PubSubException.ClientNotSubscribedException(msg));
-                return;
-            }
-
-            // now the hard case, this is a brand new subscription, must record
-            SubscriptionState.Builder stateBuilder = SubscriptionState.newBuilder().setMsgId(consumeSeqId);
-
-            SubscriptionPreferences.Builder preferencesBuilder;
-            if (subRequest.hasPreferences()) {
-                preferencesBuilder = SubscriptionPreferences.newBuilder(subRequest.getPreferences());
-            } else {
-                preferencesBuilder = SubscriptionPreferences.newBuilder();
-            }
-
-            // backward compability
-            if (subRequest.hasMessageBound()) {
-                preferencesBuilder = preferencesBuilder.setMessageBound(subRequest.getMessageBound());
-            }
-
-            SubscriptionData.Builder subDataBuilder =
-                SubscriptionData.newBuilder().setState(stateBuilder).setPreferences(preferencesBuilder);
-            final SubscriptionData subData = subDataBuilder.build();
-
-            createSubscriptionData(topic, subscriberId, subData, new Callback<Version>() {
-                @Override
-                public void operationFailed(Object ctx, PubSubException exception) {
-                    cb.operationFailed(ctx, exception);
-                }
-
-                @Override
-                public void operationFinished(Object ctx, final Version version) {
-                    Callback<Void> cb2 = new Callback<Void>() {
-                        @Override
-                        public void operationFailed(final Object ctx, final PubSubException exception) {
-                            logger.error("subscription for subscriber " + subscriberId.toStringUtf8() + " to topic "
-                                         + topic.toStringUtf8() + " failed due to failed listener callback", exception);
-                            // should remove subscription when synchronized cross-region subscription failed
-                            deleteSubscriptionData(topic, subscriberId, version, new Callback<Void>() {
-                                @Override
-                                public void operationFinished(Object context,
-                                        Void resultOfOperation) {
-                                    finish();
-                                }
-                                @Override
-                                public void operationFailed(Object context,
-                                        PubSubException ex) {
-                                    logger.error("Remove subscription for subscriber " + subscriberId.toStringUtf8() + " to topic "
-                                                 + topic.toStringUtf8() + " failed : ", ex);
-                                    finish();
-                                }
-                                private void finish() {
-                                    cb.operationFailed(ctx, exception);
-                                }
-                            }, ctx);
-                        }
-
-                        @Override
-                        public void operationFinished(Object ctx, Void resultOfOperation) {
-                            topicSubscriptions.put(subscriberId, new InMemorySubscriptionState(subData, version));
-
-                            updateMessageBound(topic);
-
-                            cb.operationFinished(ctx, subData);
-                        }
-
-                    };
-
-                    // if this will be the first local subscription, notifyFirstLocalSubscribe
-                    if (!SubscriptionStateUtils.isHubSubscriber(subRequest.getSubscriberId())
-                        && !hasLocalSubscriptions(topicSubscriptions))
-                        notifyFirstLocalSubscribe(topic, subRequest.getSynchronous(), cb2, ctx);
-                    else
-                        cb2.operationFinished(ctx, null);
-                }
-            }, ctx);
-        }
-    }
-
-    /**
-     * @return True if the given subscriberId-to-subscriberState map contains a local subscription:
-     * the vast majority of subscriptions are local, so we will quickly encounter one if it exists.
-     */
-    private static boolean hasLocalSubscriptions(Map<ByteString, InMemorySubscriptionState> topicSubscriptions) {
-      for (ByteString subId : topicSubscriptions.keySet())
-        if (!SubscriptionStateUtils.isHubSubscriber(subId))
-          return true;
-      return false;
-    }
-
-    public void updateMessageBound(ByteString topic) {
-        final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
-        if (topicSubscriptions == null) {
-            return;
-        }
-        int maxBound = Integer.MIN_VALUE;
-        for (Map.Entry<ByteString, InMemorySubscriptionState> e : topicSubscriptions.entrySet()) {
-            if (!e.getValue().getSubscriptionPreferences().hasMessageBound()) {
-                maxBound = Integer.MIN_VALUE;
-                break;
-            } else {
-                maxBound = Math.max(maxBound, e.getValue().getSubscriptionPreferences().getMessageBound());
-            }
-        }
-        if (maxBound == Integer.MIN_VALUE) {
-            pm.clearMessageBound(topic);
-        } else {
-            pm.setMessageBound(topic, maxBound);
-        }
-    }
-
-    @Override
-    public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
-                                      Callback<SubscriptionData> callback, Object ctx) {
-        queuer.pushAndMaybeRun(topic, new SubscribeOp(topic, subRequest, consumeSeqId, callback, ctx));
-    }
-
-    private class ConsumeOp extends TopicOpQueuer.AsynchronousOp<Void> {
-        ByteString subscriberId;
-        MessageSeqId consumeSeqId;
-
-        public ConsumeOp(ByteString topic, ByteString subscriberId, MessageSeqId consumeSeqId, Callback<Void> callback,
-                         Object ctx) {
-            queuer.super(topic, callback, ctx);
-            this.subscriberId = subscriberId;
-            this.consumeSeqId = consumeSeqId;
-        }
-
-        @Override
-        public void run() {
-            Map<ByteString, InMemorySubscriptionState> topicSubs = top2sub2seq.get(topic);
-            if (topicSubs == null) {
-                cb.operationFinished(ctx, null);
-                return;
-            }
-
-            final InMemorySubscriptionState subState = topicSubs.get(subscriberId);
-            if (subState == null) {
-                cb.operationFinished(ctx, null);
-                return;
-            }
-
-            if (subState.setLastConsumeSeqId(consumeSeqId, cfg.getConsumeInterval())) {
-                updateSubscriptionState(topic, subscriberId, subState, new Callback<Void>() {
-                    @Override
-                    public void operationFinished(Object ctx, Void resultOfOperation) {
-                        subState.setLastPersistedSeqId(consumeSeqId.getLocalComponent());
-                        cb.operationFinished(ctx, resultOfOperation);
-                    }
-
-                    @Override
-                    public void operationFailed(Object ctx, PubSubException exception) {
-                        cb.operationFailed(ctx, exception);
-                    }
-                }, ctx);
-            } else {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Only advanced consume pointer in memory, will persist later, topic: "
-                                 + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
-                                 + " persistentState: " + SubscriptionStateUtils.toString(subState.getSubscriptionState())
-                                 + " in-memory consume-id: "
-                                 + MessageIdUtils.msgIdToReadableString(subState.getLastConsumeSeqId()));
-                }
-                cb.operationFinished(ctx, null);
-            }
-            // tell delivery manage about the consume event
-            if (null != dm) {
-                dm.messageConsumed(topic, subscriberId, consumeSeqId);
-            }
-        }
-    }
-
-    @Override
-    public void setConsumeSeqIdForSubscriber(ByteString topic, ByteString subscriberId, MessageSeqId consumeSeqId,
-            Callback<Void> callback, Object ctx) {
-        queuer.pushAndMaybeRun(topic, new ConsumeOp(topic, subscriberId, consumeSeqId, callback, ctx));
-    }
-
-    private class CloseSubscriptionOp extends TopicOpQueuer.AsynchronousOp<Void> {
-
-        public CloseSubscriptionOp(ByteString topic, ByteString subscriberId,
-                                   Callback<Void> callback, Object ctx) {
-            queuer.super(topic, callback, ctx);
-        }
-
-        @Override
-        public void run() {
-            // TODO: BOOKKEEPER-412: we might need to move the loaded subscription
-            //                       to reclaim memory
-            // But for now we do nothing
-            cb.operationFinished(ctx, null);
-        }
-    }
-
-    @Override
-    public void closeSubscription(ByteString topic, ByteString subscriberId,
-                                  Callback<Void> callback, Object ctx) {
-        queuer.pushAndMaybeRun(topic, new CloseSubscriptionOp(topic, subscriberId, callback, ctx));
-    }
-
-    private class UnsubscribeOp extends TopicOpQueuer.AsynchronousOp<Void> {
-        ByteString subscriberId;
-
-        public UnsubscribeOp(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object ctx) {
-            queuer.super(topic, callback, ctx);
-            this.subscriberId = subscriberId;
-        }
-
-        @Override
-        public void run() {
-            final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
-            if (topicSubscriptions == null) {
-                cb.operationFailed(ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
-                return;
-            }
-
-            if (!topicSubscriptions.containsKey(subscriberId)) {
-                cb.operationFailed(ctx, new PubSubException.ClientNotSubscribedException(""));
-                return;
-            }
-            
-            deleteSubscriptionData(topic, subscriberId, topicSubscriptions.get(subscriberId).getVersion(),
-                    new Callback<Void>() {
-                @Override
-                public void operationFailed(Object ctx, PubSubException exception) {
-                    cb.operationFailed(ctx, exception);
-                }
-
-                @Override
-                public void operationFinished(Object ctx, Void resultOfOperation) {
-                    topicSubscriptions.remove(subscriberId);
-                    // Notify listeners if necessary.
-                    if (!SubscriptionStateUtils.isHubSubscriber(subscriberId)
-                        && !hasLocalSubscriptions(topicSubscriptions))
-                        notifyLastLocalUnsubscribe(topic);
-
-                    updateMessageBound(topic);
-                    cb.operationFinished(ctx, null);
-                }
-            }, ctx);
-
-        }
-
-    }
-
-    @Override
-    public void unsubscribe(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object ctx) {
-        queuer.pushAndMaybeRun(topic, new UnsubscribeOp(topic, subscriberId, callback, ctx));
-    }
-
-    /**
-     * Not thread-safe.
-     */
-    @Override
-    public void addListener(SubscriptionEventListener listener) {
-        listeners.add(listener);
-    }
-
-    /**
-     * Method to stop this class gracefully including releasing any resources
-     * used and stopping all threads spawned.
-     */
-    public void stop() {
-        timer.cancel();
-        try {
-            final LinkedBlockingQueue<Boolean> queue = new LinkedBlockingQueue<Boolean>();
-            // update dirty subscriptions
-            for (ByteString topic : top2sub2seq.keySet()) {
-                Callback<Void> finalCb = new Callback<Void>() {
-                    @Override
-                    public void operationFinished(Object ctx, Void resultOfOperation) {
-                        ConcurrencyUtils.put(queue, true);
-                    }
-                    @Override
-                    public void operationFailed(Object ctx,
-                            PubSubException exception) {
-                        ConcurrencyUtils.put(queue, false);
-                    }
-                };
-                updateSubscriptionStates(topic, finalCb, null);
-                queue.take();
-            }
-        } catch (InterruptedException ie) {
-            logger.warn("Error during updating subscription states : ", ie);
-        }
-    }
-
-    private void updateSubscriptionState(final ByteString topic, final ByteString subscriberId,
-                                         final InMemorySubscriptionState state,
-                                         final Callback<Void> callback, Object ctx) {
-        SubscriptionData subData;
-        Callback<Version> cb = new Callback<Version>() {
-            @Override
-            public void operationFinished(Object ctx, Version version) {
-                state.setVersion(version);
-                callback.operationFinished(ctx, null);
-            }
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                if (exception instanceof PubSubException.BadVersionException) {
-                    readSubscriptionData(topic, subscriberId, new Callback<InMemorySubscriptionState>() {
-                        @Override
-                        public void operationFinished(Object ctx,
-                                InMemorySubscriptionState resultOfOperation) {
-                            state.setVersion(resultOfOperation.getVersion());
-                            updateSubscriptionState(topic, subscriberId, state, callback, ctx);
-                        }
-                        @Override
-                        public void operationFailed(Object ctx,
-                                PubSubException exception) {
-                            callback.operationFailed(ctx, exception);
-                        }
-                    }, ctx);
-                    
-                    return;
-                } 
-                callback.operationFailed(ctx, exception);
-            }
-        };
-        if (isPartialUpdateSupported()) {
-            subData = SubscriptionData.newBuilder().setState(state.getSubscriptionState()).build();
-            updateSubscriptionData(topic, subscriberId, subData, state.getVersion(), cb, ctx);
-        } else {
-            subData = state.toSubscriptionData();
-            replaceSubscriptionData(topic, subscriberId, subData, state.getVersion(), cb, ctx);
-        }
-    }
-
-    private void updateSubscriptionPreferences(final ByteString topic, final ByteString subscriberId,
-                                               final InMemorySubscriptionState state,
-                                               final Callback<Void> callback, Object ctx) {
-        SubscriptionData subData;
-        Callback<Version> cb = new Callback<Version>() {
-            @Override
-            public void operationFinished(Object ctx, Version version) {
-                state.setVersion(version);
-                callback.operationFinished(ctx, null);
-            }
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                if (exception instanceof PubSubException.BadVersionException) {
-                    readSubscriptionData(topic, subscriberId, new Callback<InMemorySubscriptionState>() {
-                        @Override
-                        public void operationFinished(Object ctx,
-                                InMemorySubscriptionState resultOfOperation) {
-                            state.setVersion(resultOfOperation.getVersion());
-                            updateSubscriptionPreferences(topic, subscriberId, state, callback, ctx);
-                        }
-                        @Override
-                        public void operationFailed(Object ctx,
-                                PubSubException exception) {
-                            callback.operationFailed(ctx, exception);
-                        }
-                    }, ctx);
-                    
-                    return;
-                } 
-                callback.operationFailed(ctx, exception);
-            }
-        };
-        if (isPartialUpdateSupported()) {
-            subData = SubscriptionData.newBuilder().setPreferences(state.getSubscriptionPreferences()).build();
-            updateSubscriptionData(topic, subscriberId, subData, state.getVersion(), cb, ctx);
-        } else {
-            subData = state.toSubscriptionData();
-            replaceSubscriptionData(topic, subscriberId, subData, state.getVersion(), cb, ctx);
-        }
-    }
-
-    protected abstract boolean isPartialUpdateSupported();
-
-    protected abstract void createSubscriptionData(final ByteString topic, ByteString subscriberId,
-            SubscriptionData data, Callback<Version> callback, Object ctx);
-
-    protected abstract void updateSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data,
-            Version version, Callback<Version> callback, Object ctx);
-
-    protected abstract void replaceSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data,
-            Version version, Callback<Version> callback, Object ctx);
-
-    protected abstract void deleteSubscriptionData(ByteString topic, ByteString subscriberId, Version version, Callback<Void> callback,
-            Object ctx);
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java
deleted file mode 100644
index 389ccc9..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import java.io.IOException;
-
-import com.google.protobuf.ByteString;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.hedwig.filter.MessageFilterBase;
-import org.apache.hedwig.filter.ServerMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-
-public class AllToAllTopologyFilter implements ServerMessageFilter {
-
-    ByteString subscriberRegion;
-    boolean isHubSubscriber;
-
-    @Override
-    public ServerMessageFilter initialize(Configuration conf)
-    throws ConfigurationException, IOException {
-        String region = conf.getString(ServerConfiguration.REGION, "standalone");
-        if (null == region) {
-            throw new IOException("No region found to run " + getClass().getName());
-        }
-        subscriberRegion = ByteString.copyFromUtf8(region);
-        return this;
-    }
-
-    @Override
-    public void uninitialize() {
-        // do nothing now
-    }
-
-    @Override
-    public MessageFilterBase setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
-                                                        SubscriptionPreferences preferences) {
-        isHubSubscriber = SubscriptionStateUtils.isHubSubscriber(subscriberId);
-        return this;
-    }
-
-    @Override
-    public boolean testMessage(Message message) {
-        // We're using a simple all-to-all network topology, so no region
-        // should ever need to forward messages to any other region.
-        // Otherwise, with the current logic, messages will end up
-        // ping-pong-ing back and forth between regions with subscriptions
-        // to each other without termination (or in any other cyclic
-        // configuration).
-        if (isHubSubscriber && !message.getSrcRegion().equals(subscriberRegion)) {
-            return false;
-        } else {
-            return true;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
deleted file mode 100644
index 4adbf1c..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.delivery.DeliveryManager;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class InMemorySubscriptionManager extends AbstractSubscriptionManager {
-	 private static final Logger logger = LoggerFactory.getLogger(InMemorySubscriptionManager.class);
-	// Backup for top2sub2seq
-    final ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>> top2sub2seqBackup =
-        new ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>>();
-
-    public InMemorySubscriptionManager(ServerConfiguration conf,
-                                       TopicManager tm, PersistenceManager pm,
-                                       DeliveryManager dm,
-                                       ScheduledExecutorService scheduler) {
-        super(conf, tm, pm, dm, scheduler);
-    }
-
-    @Override
-    protected void createSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData subData,
-                                           Callback<Version> callback, Object ctx) {
-        // nothing to do, in-memory info is already recorded by base class
-        callback.operationFinished(ctx, null);
-    }
-
-    @Override
-    protected void deleteSubscriptionData(ByteString topic, ByteString subscriberId, Version version, Callback<Void> callback,
-                                          Object ctx) {
-        // nothing to do, in-memory info is already deleted by base class
-        callback.operationFinished(ctx, null);
-    }
-
-    @Override
-    protected boolean isPartialUpdateSupported() {
-        return false;
-    }
-
-    @Override
-    protected void updateSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data,
-                                          Version version, Callback<Version> callback, Object ctx) {
-        throw new UnsupportedOperationException("Doesn't support partial update");
-    }
-
-    @Override
-    protected void replaceSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data,
-                                           Version version, Callback<Version> callback, Object ctx) {
-        // nothing to do, in-memory info is already updated by base class
-        callback.operationFinished(ctx, null);
-    }
-
-    @Override
-    public void lostTopic(ByteString topic) {
-        // Backup topic-sub2seq map for readSubscriptions
-        final Map<ByteString, InMemorySubscriptionState> sub2seq = top2sub2seq.get(topic);
-        if (null != sub2seq)
-            top2sub2seqBackup.put(topic, sub2seq);
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("InMemorySubscriptionManager is losing topic " + topic.toStringUtf8());
-        }
-        queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null));
-    }
-
-    @Override
-    protected void readSubscriptions(ByteString topic,
-                                     Callback<Map<ByteString, InMemorySubscriptionState>> cb, Object ctx) {
-        // Since we backed up in-memory information on lostTopic, we can just return that back
-        Map<ByteString, InMemorySubscriptionState> topicSubs = top2sub2seqBackup.remove(topic);
-
-        if (topicSubs != null) {
-            cb.operationFinished(ctx, topicSubs);
-        } else {
-            cb.operationFinished(ctx, new ConcurrentHashMap<ByteString, InMemorySubscriptionState>());
-        }
-
-    }
-
-    @Override
-    protected void readSubscriptionData(ByteString topic,
-            ByteString subscriberId, Callback<InMemorySubscriptionState> cb, Object ctx) {
-        // Since we backed up in-memory information on lostTopic, we can just return that back
-        Map<ByteString, InMemorySubscriptionState> sub2seqBackup = top2sub2seqBackup.get(topic);
-        if (sub2seqBackup == null) {
-            cb.operationFinished(ctx, new InMemorySubscriptionState(
-                    SubscriptionData.getDefaultInstance(), Version.NEW));
-            return;
-        }
-        InMemorySubscriptionState subState = sub2seqBackup.remove(subscriberId);
-        
-        if (subState != null) {
-            cb.operationFinished(ctx, subState);
-        } else {
-            cb.operationFinished(ctx, new InMemorySubscriptionState(
-                    SubscriptionData.getDefaultInstance(), Version.NEW));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
deleted file mode 100644
index ea74599..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-
-import java.util.Map;
-import com.google.protobuf.ByteString;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
-import org.apache.hedwig.protoextensions.MapUtils;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-
-public class InMemorySubscriptionState {
-    SubscriptionState subscriptionState;
-    SubscriptionPreferences subscriptionPreferences;
-    MessageSeqId lastConsumeSeqId;
-    Version version;
-    long lastPersistedSeqId;
-
-    public InMemorySubscriptionState(SubscriptionData subscriptionData, Version version, MessageSeqId lastConsumeSeqId) {
-        this.subscriptionState = subscriptionData.getState();
-        if (subscriptionData.hasPreferences()) {
-            this.subscriptionPreferences = subscriptionData.getPreferences();
-        } else {
-            // set initial subscription preferences
-            SubscriptionPreferences.Builder prefsBuilder = SubscriptionPreferences.newBuilder();
-            // progate the old system preferences from subscription state to preferences
-            prefsBuilder.setMessageBound(subscriptionState.getMessageBound());
-            this.subscriptionPreferences = prefsBuilder.build();
-
-        }
-        this.lastConsumeSeqId = lastConsumeSeqId;
-        this.version = version;
-        this.lastPersistedSeqId = subscriptionState.getMsgId().getLocalComponent();
-    }
-
-    public InMemorySubscriptionState(SubscriptionData subscriptionData, Version version) {
-        this(subscriptionData, version, subscriptionData.getState().getMsgId());
-    }
-
-    public SubscriptionData toSubscriptionData() {
-        SubscriptionState.Builder stateBuilder =
-            SubscriptionState.newBuilder(subscriptionState).setMsgId(lastConsumeSeqId);
-        return SubscriptionData.newBuilder().setState(stateBuilder)
-                                            .setPreferences(subscriptionPreferences)
-                                            .build();
-    }
-
-    public SubscriptionState getSubscriptionState() {
-        return subscriptionState;
-    }
-
-    public SubscriptionPreferences getSubscriptionPreferences() {
-        return subscriptionPreferences;
-    }
-
-    public MessageSeqId getLastConsumeSeqId() {
-        return lastConsumeSeqId;
-    }
-     
-    public Version getVersion() {
-        return version;
-    }
-    
-    public void setVersion(Version version) {
-        this.version = version;
-    }
-
-    /**
-     *
-     * @param lastConsumeSeqId
-     * @param consumeInterval
-     *            The amount of laziness we want in persisting the consume
-     *            pointers
-     * @return true if the resulting structure needs to be persisted, false
-     *         otherwise
-     */
-    public boolean setLastConsumeSeqId(MessageSeqId lastConsumeSeqId, int consumeInterval) {
-        long interval = lastConsumeSeqId.getLocalComponent() - subscriptionState.getMsgId().getLocalComponent();
-        if (interval <= 0) {
-            return false;
-        }
-
-        // set consume seq id when it is larger
-        this.lastConsumeSeqId = lastConsumeSeqId;
-        if (interval < consumeInterval) {
-            return false;
-        }
-
-        // subscription state will be updated, marked it as clean
-        subscriptionState = SubscriptionState.newBuilder(subscriptionState).setMsgId(lastConsumeSeqId).build();
-        return true;
-    }
-
-    /**
-     * Set lastConsumeSeqId Immediately
-     *
-     * @return true if the resulting structure needs to be persisted, false otherwise
-     */
-    public boolean setLastConsumeSeqIdImmediately() {
-        long interval = lastConsumeSeqId.getLocalComponent() - subscriptionState.getMsgId().getLocalComponent();
-        // no need to set
-        if (interval <= 0) {
-            return false;
-        }
-        subscriptionState = SubscriptionState.newBuilder(subscriptionState).setMsgId(lastConsumeSeqId).build();
-        return true;
-    }
-
-    public long getLastPersistedSeqId() {
-        return lastPersistedSeqId;
-    }
-
-    public void setLastPersistedSeqId(long lastPersistedSeqId) {
-        this.lastPersistedSeqId = lastPersistedSeqId;
-    }
-
-    /**
-     * Update preferences.
-     *
-     * @return true if preferences is updated, which needs to be persisted, false otherwise.
-     */
-    public boolean updatePreferences(SubscriptionPreferences preferences) {
-        boolean changed = false;
-        SubscriptionPreferences.Builder newPreferencesBuilder = SubscriptionPreferences.newBuilder(subscriptionPreferences);
-        if (preferences.hasMessageBound()) {
-            if (!subscriptionPreferences.hasMessageBound() ||
-                subscriptionPreferences.getMessageBound() != preferences.getMessageBound()) {
-                newPreferencesBuilder.setMessageBound(preferences.getMessageBound());
-                changed = true;
-            }
-        }
-        if (preferences.hasMessageFilter()) {
-            if (!subscriptionPreferences.hasMessageFilter() ||
-                !subscriptionPreferences.getMessageFilter().equals(preferences.getMessageFilter())) {
-                newPreferencesBuilder.setMessageFilter(preferences.getMessageFilter());
-                changed = true;
-            }
-        }
-        if (preferences.hasMessageWindowSize()) {
-            if (!subscriptionPreferences.hasMessageWindowSize() ||
-                subscriptionPreferences.getMessageWindowSize() !=
-                preferences.getMessageWindowSize()) {
-                newPreferencesBuilder.setMessageWindowSize(preferences.getMessageWindowSize());
-                changed = true;
-            }
-        }
-        if (preferences.hasOptions()) {
-            Map<String, ByteString> userOptions = SubscriptionStateUtils.buildUserOptions(subscriptionPreferences);
-            Map<String, ByteString> optUpdates = SubscriptionStateUtils.buildUserOptions(preferences);
-            boolean optChanged = false;
-            for (Map.Entry<String, ByteString> entry : optUpdates.entrySet()) {
-                String key = entry.getKey();
-                if (userOptions.containsKey(key)) {
-                    if (null == entry.getValue()) {
-                        userOptions.remove(key);
-                        optChanged = true;
-                    } else {
-                        if (!entry.getValue().equals(userOptions.get(key))) {
-                            userOptions.put(key, entry.getValue());
-                            optChanged = true;
-                        }
-                    }
-                } else {
-                    userOptions.put(key, entry.getValue());
-                    optChanged = true;
-                }
-            }
-            if (optChanged) {
-                changed = true;
-                newPreferencesBuilder.setOptions(MapUtils.buildMapBuilder(userOptions));
-            }
-        }
-        if (changed) {
-            subscriptionPreferences = newPreferencesBuilder.build();
-        }
-        return changed;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
deleted file mode 100644
index 47fdfd2..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.delivery.DeliveryManager;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.server.meta.SubscriptionDataManager;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * MetaManager-based subscription manager.
- */
-public class MMSubscriptionManager extends AbstractSubscriptionManager {
-
-	private static final Logger logger = LoggerFactory.getLogger(MMSubscriptionManager.class);
-	SubscriptionDataManager subManager;
-
-    public MMSubscriptionManager(ServerConfiguration cfg,
-                                 MetadataManagerFactory metaManagerFactory,
-                                 TopicManager topicMgr, PersistenceManager pm,
-                                 DeliveryManager dm,
-                                 ScheduledExecutorService scheduler) {
-        super(cfg, topicMgr, pm, dm, scheduler);
-        this.subManager = metaManagerFactory.newSubscriptionDataManager();
-    }
-
-    @Override
-    protected void readSubscriptions(final ByteString topic,
-                                     final Callback<Map<ByteString, InMemorySubscriptionState>> cb, final Object ctx) {
-        subManager.readSubscriptions(topic, new Callback<Map<ByteString, Versioned<SubscriptionData>>>() {
-            @Override
-            public void operationFailed(Object ctx, PubSubException pse) {
-                cb.operationFailed(ctx, pse);
-            }
-            @Override
-            public void operationFinished(Object ctx, Map<ByteString, Versioned<SubscriptionData>> subs) {
-                Map<ByteString, InMemorySubscriptionState> results = new ConcurrentHashMap<ByteString, InMemorySubscriptionState>();
-                for (Map.Entry<ByteString, Versioned<SubscriptionData>> subEntry : subs.entrySet()) {
-                    Versioned<SubscriptionData> vv = subEntry.getValue();
-                    results.put(subEntry.getKey(), new InMemorySubscriptionState(vv.getValue(), vv.getVersion()));
-                }
-                cb.operationFinished(ctx, results);
-            }
-        }, ctx);
-    }
-
-    @Override
-    protected void readSubscriptionData(final ByteString topic, final ByteString subscriberId,
-                                        final Callback<InMemorySubscriptionState> cb, final Object ctx) {
-        subManager.readSubscriptionData(topic, subscriberId, new Callback<Versioned<SubscriptionData>>() {
-            @Override
-            public void operationFinished(Object ctx,
-                    Versioned<SubscriptionData> subData) {
-                if (null != subData) {
-                    cb.operationFinished(ctx, 
-                            new InMemorySubscriptionState(subData.getValue(), subData.getVersion()));
-                } else {
-                    cb.operationFinished(ctx, new InMemorySubscriptionState(
-                            SubscriptionData.getDefaultInstance(), Version.NEW));
-                }
-            }
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                cb.operationFailed(ctx, exception);
-            }
-        }, ctx);
-    }
-
-    @Override
-    protected boolean isPartialUpdateSupported() {
-        return subManager.isPartialUpdateSupported();
-    }
-
-    @Override
-    protected void createSubscriptionData(final ByteString topic, final ByteString subscriberId,
-                                          final SubscriptionData subData, final Callback<Version> callback, final Object ctx) {
-        subManager.createSubscriptionData(topic, subscriberId, subData, callback, ctx);
-    }
-
-    @Override
-    protected void replaceSubscriptionData(final ByteString topic, final ByteString subscriberId, final SubscriptionData subData, 
-                                           final Version version, final Callback<Version> callback, final Object ctx) {
-        subManager.replaceSubscriptionData(topic, subscriberId, subData, version, callback, ctx);
-    }
-
-    @Override
-    protected void updateSubscriptionData(final ByteString topic, final ByteString subscriberId, final SubscriptionData subData, 
-                                          final Version version, final Callback<Version> callback, final Object ctx) {
-        subManager.updateSubscriptionData(topic, subscriberId, subData, version, callback, ctx);
-    }
-
-    @Override
-    protected void deleteSubscriptionData(final ByteString topic, final ByteString subscriberId, Version version,
-                                          final Callback<Void> callback, final Object ctx) {
-        subManager.deleteSubscriptionData(topic, subscriberId, version, callback, ctx);
-    }
-
-    @Override
-    public void stop() {
-        super.stop();
-        try {
-            subManager.close();
-        } catch (IOException ioe) {
-            logger.warn("Exception closing subscription data manager : ", ioe);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java
deleted file mode 100644
index 6c6e626..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.util.Callback;
-
-/**
- * For listening to events that are issued by a SubscriptionManager.
- *
- */
-public interface SubscriptionEventListener {
-
-    /**
-     * Called by the subscription manager when it previously had zero local
-     * subscribers for a topic and is currently accepting its first local
-     * subscriber.
-     *
-     * @param topic
-     *            The topic of interest.
-     * @param synchronous
-     *            Whether this request was actually initiated by a new local
-     *            subscriber, or whether it was an existing subscription
-     *            inherited by the hub (e.g. when recovering the state from ZK).
-     * @param cb
-     *            The subscription will not complete until success is called on
-     *            this callback. An error on cb will result in a subscription
-     *            error.
-     */
-    public void onFirstLocalSubscribe(ByteString topic, boolean synchronous, Callback<Void> cb);
-
-    /**
-     * Called by the SubscriptionManager when it previously had non-zero local
-     * subscribers for a topic and is currently dropping its last local
-     * subscriber. This is fully asynchronous so there is no callback.
-     *
-     * @param topic
-     *            The topic of interest.
-     */
-    public void onLastLocalUnsubscribe(ByteString topic);
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java
deleted file mode 100644
index eadebcb..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.util.Callback;
-
-/**
- * All methods are thread-safe.
- */
-public interface SubscriptionManager {
-
-    /**
-     *
-     * Register a new subscription for the given subscriber for the given topic.
-     * This method should reliably persist the existence of the subscription in
-     * a way that it can't be lost. If the subscription already exists,
-     * depending on the create or attach flag in the subscribe request, an
-     * exception may be returned.
-     *
-     * This is an asynchronous method.
-     *
-     * @param topic
-     * @param subRequest
-     * @param consumeSeqId
-     *            The seqId to start serving the subscription from, if this is a
-     *            brand new subscription
-     * @param callback
-     *            The subscription data returned by the callback.
-     * @param ctx
-     */
-    public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
-                                      Callback<SubscriptionData> callback, Object ctx);
-
-    /**
-     * Set the consume position of a given subscriber on a given topic. Note
-     * that this method need not persist the consume position immediately but
-     * can be lazy and persist it later asynchronously, if that is more
-     * efficient.
-     *
-     * @param topic
-     * @param subscriberId
-     * @param consumeSeqId
-     */
-    public void setConsumeSeqIdForSubscriber(ByteString topic, ByteString subscriberId, MessageSeqId consumeSeqId,
-            Callback<Void> callback, Object ctx);
-
-    /**
-     * Close a particular subscription
-     *
-     * @param topic
-     *          Topic Name
-     * @param subscriberId
-     *          Subscriber Id
-     * @param callback
-     *          Callback
-     * @param ctx
-     *          Callback context
-     */
-    public void closeSubscription(ByteString topic, ByteString subscriberId,
-                                  Callback<Void> callback, Object ctx);
-
-    /**
-     * Delete a particular subscription
-     *
-     * @param topic
-     * @param subscriberId
-     */
-    public void unsubscribe(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object ctx);
-
-    // Management API methods that we will fill in later
-    // /**
-    // * Get the ids of all subscribers for a given topic
-    // *
-    // * @param topic
-    // * @return A list of subscriber ids that are currently subscribed to the
-    // * given topic
-    // */
-    // public List<ByteString> getSubscriptionsForTopic(ByteString topic);
-    //
-    // /**
-    // * Get the topics to which a given subscriber is subscribed to
-    // *
-    // * @param subscriberId
-    // * @return A list of the topics to which the given subscriber is
-    // subscribed
-    // * to
-    // * @throws ServiceDownException
-    // * If there is an error in looking up the subscription
-    // * information
-    // */
-    // public List<ByteString> getTopicsForSubscriber(ByteString subscriberId)
-    // throws ServiceDownException;
-
-    /**
-     * Add a listener that is notified when topic-subscription pairs are added
-     * or removed.
-     */
-    public void addListener(SubscriptionEventListener listener);
-
-    /**
-     * Stop Subscription Manager
-     */
-    public void stop();
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
deleted file mode 100644
index 2d9aba2..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.common.TopicOpQueuer;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.CallbackUtils;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-
-public abstract class AbstractTopicManager implements TopicManager {
-
-    /**
-     * Stats for a topic. For now it just an empty stub class.
-     */
-    static class TopicStats {
-    }
-
-    final static TopicStats STUB_TOPIC_STATS = new TopicStats();
-
-    /**
-     * My name.
-     */
-    protected HedwigSocketAddress addr;
-
-    /**
-     * Topic change listeners.
-     */
-    protected ArrayList<TopicOwnershipChangeListener> listeners = new ArrayList<TopicOwnershipChangeListener>();
-
-    /**
-     * List of topics I believe I am responsible for.
-     */
-    protected Cache<ByteString, TopicStats> topics;
-
-    protected TopicOpQueuer queuer;
-    protected ServerConfiguration cfg;
-    protected ScheduledExecutorService scheduler;
-
-    private static final Logger logger = LoggerFactory.getLogger(AbstractTopicManager.class);
-
-    private class GetOwnerOp extends TopicOpQueuer.AsynchronousOp<HedwigSocketAddress> {
-        public boolean shouldClaim;
-
-        public GetOwnerOp(final ByteString topic, boolean shouldClaim,
-                          final Callback<HedwigSocketAddress> cb, Object ctx) {
-            queuer.super(topic, cb, ctx);
-            this.shouldClaim = shouldClaim;
-        }
-
-        @Override
-        public void run() {
-            realGetOwner(topic, shouldClaim, cb, ctx);
-        }
-    }
-
-    private class ReleaseOp extends TopicOpQueuer.AsynchronousOp<Void> {
-        final boolean checkExistence;
-
-        public ReleaseOp(ByteString topic, Callback<Void> cb, Object ctx) {
-            this(topic, cb, ctx, true);
-        }
-
-        ReleaseOp(ByteString topic, Callback<Void> cb, Object ctx,
-                  boolean checkExistence) {
-            queuer.super(topic, cb, ctx);
-            this.checkExistence = checkExistence;
-        }
-
-        @Override
-        public void run() {
-            if (checkExistence) {
-                TopicStats stats = topics.getIfPresent(topic);
-                if (null == stats) {
-                    cb.operationFinished(ctx, null);
-                    return;
-                }
-            }
-            realReleaseTopic(topic, cb, ctx);
-        }
-    }
-
-    /**
-     * Release topic when the topic is removed from topics cache.
-     */
-    class ReleaseTopicListener implements RemovalListener<ByteString, TopicStats> {
-        @Override
-        public void onRemoval(RemovalNotification<ByteString, TopicStats> notification) {
-            if (notification.wasEvicted()) {
-                logger.info("topic {} is evicted", notification.getKey().toStringUtf8());
-                // if the topic is evicted, we need to release the topic.
-                releaseTopicInternally(notification.getKey(), false);
-            }
-        }
-    }
-
-    public AbstractTopicManager(ServerConfiguration cfg, ScheduledExecutorService scheduler)
-            throws UnknownHostException {
-        this.cfg = cfg;
-        this.queuer = new TopicOpQueuer(scheduler);
-        this.scheduler = scheduler;
-        addr = cfg.getServerAddr();
-
-        // build the topic cache
-        CacheBuilder<ByteString, TopicStats> cacheBuilder = CacheBuilder.newBuilder()
-            .maximumSize(cfg.getMaxNumTopics())
-            .initialCapacity(cfg.getInitNumTopics())
-            // TODO: change to same number as topic op queuer threads
-            .concurrencyLevel(Runtime.getRuntime().availableProcessors())
-            .removalListener(new ReleaseTopicListener());
-        if (cfg.getRetentionSecsAfterAccess() > 0) {
-            cacheBuilder.expireAfterAccess(cfg.getRetentionSecsAfterAccess(), TimeUnit.SECONDS);
-        }
-        topics = cacheBuilder.build();
-    }
-
-    @Override
-    public void incrementTopicAccessTimes(ByteString topic) {
-        // let guava cache handle hits counting
-        topics.getIfPresent(topic);
-    }
-
-    @Override
-    public synchronized void addTopicOwnershipChangeListener(TopicOwnershipChangeListener listener) {
-        listeners.add(listener);
-    }
-
-    private void releaseTopicInternally(final ByteString topic, boolean checkExistence) {
-        // Enqueue a release operation. (Recall that release
-        // doesn't "fail" even if the topic is missing.)
-        queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, new Callback<Void>() {
-
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                logger.error("failure that should never happen when releasing topic "
-                             + topic, exception);
-            }
-
-            @Override
-            public void operationFinished(Object ctx, Void resultOfOperation) {
-                    logger.info("successfully release of topic "
-                        + topic.toStringUtf8());
-                if (logger.isDebugEnabled()) {
-                    logger.debug("successfully release of topic "
-                        + topic.toStringUtf8());
-                }
-            }
-
-        }, null, checkExistence));
-    }
-
-    protected final synchronized void notifyListenersAndAddToOwnedTopics(final ByteString topic,
-            final Callback<HedwigSocketAddress> originalCallback, final Object originalContext) {
-
-        Callback<Void> postCb = new Callback<Void>() {
-
-            @Override
-            public void operationFinished(Object ctx, Void resultOfOperation) {
-                topics.put(topic, STUB_TOPIC_STATS);
-                if (cfg.getRetentionSecs() > 0) {
-                    scheduler.schedule(new Runnable() {
-                        @Override
-                        public void run() {
-                            releaseTopicInternally(topic, true);
-                        }
-                    }, cfg.getRetentionSecs(), TimeUnit.SECONDS);
-                }
-                originalCallback.operationFinished(originalContext, addr);
-            }
-
-            @Override
-            public void operationFailed(final Object ctx, final PubSubException exception) {
-                // TODO: optimization: we can release this as soon as we experience the first error.
-                Callback<Void> cb = new Callback<Void>() {
-                    @Override
-                    public void operationFinished(Object _ctx, Void _resultOfOperation) {
-                        originalCallback.operationFailed(ctx, exception);
-                    }
-                    @Override
-                    public void operationFailed(Object _ctx, PubSubException _exception) {
-                        logger.error("Exception releasing topic", _exception);
-                        originalCallback.operationFailed(ctx, exception);
-                    }
-                };
-
-                realReleaseTopic(topic, cb, originalContext);
-            }
-        };
-
-        Callback<Void> mcb = CallbackUtils.multiCallback(listeners.size(), postCb, null);
-        for (TopicOwnershipChangeListener listener : listeners) {
-            listener.acquiredTopic(topic, mcb, null);
-        }
-    }
-
-    private void realReleaseTopic(ByteString topic, Callback<Void> callback, Object ctx) {
-        for (TopicOwnershipChangeListener listener : listeners)
-            listener.lostTopic(topic);
-        topics.invalidate(topic);
-        postReleaseCleanup(topic, callback, ctx);
-    }
-
-    @Override
-    public final void getOwner(ByteString topic, boolean shouldClaim,
-                               Callback<HedwigSocketAddress> cb, Object ctx) {
-        queuer.pushAndMaybeRun(topic, new GetOwnerOp(topic, shouldClaim, cb, ctx));
-    }
-
-    @Override
-    public final void releaseTopic(ByteString topic, Callback<Void> cb, Object ctx) {
-        queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, cb, ctx));
-    }
-
-    @Override
-    public final void releaseTopics(int numTopics, final Callback<Long> callback, final Object ctx) {
-        // This is a best effort function. We sacrifice accuracy to not hold a lock on the topics set.
-        List<ByteString> topicList = getTopicList();
-        // Make sure we release only as many topics as we own.
-        final long numTopicsToRelease = Math.min(topicList.size(), numTopics);
-        // Shuffle the list of topics we own, so that we release a random subset.
-        Collections.shuffle(topicList);
-        Callback<Void> mcb = CallbackUtils.multiCallback((int)numTopicsToRelease, new Callback<Void>() {
-            @Override
-            public void operationFinished(Object ctx, Void ignoreVal) {
-                callback.operationFinished(ctx, numTopicsToRelease);
-            }
-
-            @Override
-            public void operationFailed(Object ctx, PubSubException e) {
-                long notReleased = 0;
-                if (e instanceof PubSubException.CompositeException) {
-                    notReleased = ((PubSubException.CompositeException)e).getExceptions().size();
-                }
-                callback.operationFinished(ctx, numTopicsToRelease - notReleased);
-            }
-        }, ctx);
-
-        // Try to release "numTopicsToRelease" topics. It's okay if we're not
-        // able to release some topics. We signal that we tried by invoking the callback's
-        // operationFinished() with the actual number of topics released.
-        logger.info("This hub is releasing {} topics", numTopicsToRelease);
-        long releaseCount = 0;
-        for (ByteString topic : topicList) {
-            if (++releaseCount > numTopicsToRelease) {
-                break;
-            }
-            releaseTopic(topic, mcb, ctx);
-        }
-    }
-
-    @Override
-    public List<ByteString> getTopicList() {
-        List<ByteString> topicList;
-        synchronized (this.topics) {
-            topicList = Lists.newArrayList(this.topics.asMap().keySet());
-        }
-        return topicList;
-    }
-
-    /**
-     * This method should "return" the owner of the topic if one has been chosen
-     * already. If there is no pre-chosen owner, either this hub or some other
-     * should be chosen based on the shouldClaim parameter. If its ends up
-     * choosing this hub as the owner, the {@code
-     * AbstractTopicManager#notifyListenersAndAddToOwnedTopics(ByteString,
-     * OperationCallback, Object)} method must be called.
-     *
-     */
-    protected abstract void realGetOwner(ByteString topic, boolean shouldClaim,
-                                         Callback<HedwigSocketAddress> cb, Object ctx);
-
-    /**
-     * The method should do any cleanup necessary to indicate to other hubs that
-     * this topic has been released
-     */
-    protected abstract void postReleaseCleanup(ByteString topic, Callback<Void> cb, Object ctx);
-
-    @Override
-    public void stop() {
-        // do nothing now
-    }
-}