You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:26 UTC

[16/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
deleted file mode 100644
index 0480b22..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
+++ /dev/null
@@ -1,978 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.delivery;
-
-import static org.apache.hedwig.util.VarArgs.va;
-
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Queue;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
-
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.filter.ServerMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.common.UnexpectedError;
-import org.apache.hedwig.server.handlers.SubscriptionChannelManager.SubChannelDisconnectedListener;
-import org.apache.hedwig.server.netty.ServerStats;
-import org.apache.hedwig.server.persistence.CancelScanRequest;
-import org.apache.hedwig.server.persistence.Factory;
-import org.apache.hedwig.server.persistence.MapMethods;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.persistence.ReadAheadCache;
-import org.apache.hedwig.server.persistence.ScanCallback;
-import org.apache.hedwig.server.persistence.ScanRequest;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-
-public class FIFODeliveryManager implements DeliveryManager, SubChannelDisconnectedListener {
-
-    protected static final Logger logger = LoggerFactory.getLogger(FIFODeliveryManager.class);
-
-    private static Callback<Void> NOP_CALLBACK = new Callback<Void>() {
-        @Override
-        public void operationFinished(Object ctx, Void result) {
-        }
-        @Override
-        public void operationFailed(Object ctx, PubSubException exception) {
-        }
-    };
-
-    protected interface DeliveryManagerRequest {
-        public void performRequest();
-    }
-
-    /**
-     * Stores a mapping from topic to the delivery pointers on the topic. The
-     * delivery pointers are stored in a sorted map from seq-id to the set of
-     * subscribers at that seq-id
-     */
-    ConcurrentMap<ByteString, SortedMap<Long, Set<ActiveSubscriberState>>> perTopicDeliveryPtrs;
-
-    /**
-     * Mapping from delivery end point to the subscriber state that we are
-     * serving at that end point. This prevents us e.g., from serving two
-     * subscriptions to the same endpoint
-     */
-    ConcurrentMap<TopicSubscriber, ActiveSubscriberState> subscriberStates;
-
-    private final ReadAheadCache cache;
-    private final PersistenceManager persistenceMgr;
-    private TopicManager tm;
-    private ServerConfiguration cfg;
-
-    private final int numDeliveryWorkers;
-    private final DeliveryWorker[] deliveryWorkers;
-
-    private class DeliveryWorker implements Runnable {
-
-        BlockingQueue<DeliveryManagerRequest> requestQueue =
-            new LinkedBlockingQueue<DeliveryManagerRequest>();;
-
-        /**
-         * The queue of all subscriptions that are facing a transient error either
-         * in scanning from the persistence manager, or in sending to the consumer
-         */
-        Queue<ActiveSubscriberState> retryQueue =
-            new PriorityBlockingQueue<ActiveSubscriberState>(32, new Comparator<ActiveSubscriberState>() {
-                @Override
-                public int compare(ActiveSubscriberState as1, ActiveSubscriberState as2) {
-                    long s = as1.lastScanErrorTime - as2.lastScanErrorTime;
-                    return s > 0 ? 1 : (s < 0 ? -1 : 0);
-                }
-            });
-
-        // Boolean indicating if this thread should continue running. This is used
-        // when we want to stop the thread during a PubSubServer shutdown.
-        protected volatile boolean keepRunning = true;
-        private final Thread workerThread;
-        private final int idx;
-
-        private final Object suspensionLock = new Object();
-        private boolean suspended = false;
-
-        DeliveryWorker(int index) {
-            this.idx = index;
-            workerThread = new Thread(this, "DeliveryManagerThread-" + index);
-        }
-
-        void start() {
-            workerThread.start();
-        }
-
-        /**
-         * Stop method which will enqueue a ShutdownDeliveryManagerRequest.
-         */
-        void stop() {
-            enqueueWithoutFailure(new ShutdownDeliveryManagerRequest());
-        }
-
-        /**
-         * Stop FIFO delivery worker from processing requests. (for testing)
-         */
-        void suspendProcessing() {
-            synchronized(suspensionLock) {
-                suspended = true;
-            }
-        }
-
-        /**
-         * Resume FIFO delivery worker. (for testing)
-         */
-        void resumeProcessing() {
-            synchronized(suspensionLock) {
-                suspended = false;
-                suspensionLock.notify();
-            }
-        }
-
-        @Override
-        public void run() {
-            while (keepRunning) {
-                DeliveryManagerRequest request = null;
-
-                try {
-                    // We use a timeout of 1 second, so that we can wake up once in
-                    // a while to check if there is something in the retry queue.
-                    request = requestQueue.poll(1, TimeUnit.SECONDS);
-                    synchronized(suspensionLock) {
-                        while (suspended) {
-                            suspensionLock.wait();
-                        }
-                    }
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
-
-                // First retry any subscriptions that had failed and need a retry
-                retryErroredSubscribers();
-
-                if (request == null) {
-                    continue;
-                }
-
-                request.performRequest();
-
-            }
-        }
-
-        protected void enqueueWithoutFailure(DeliveryManagerRequest request) {
-            if (!requestQueue.offer(request)) {
-                throw new UnexpectedError("Could not enqueue object: " + request
-                    + " to request queue for delivery worker ." + idx);
-            }
-        }
-
-        public void retryErroredSubscriberAfterDelay(ActiveSubscriberState subscriber) {
-            subscriber.setLastScanErrorTime(MathUtils.now());
-
-            if (!retryQueue.offer(subscriber)) {
-                throw new UnexpectedError("Could not enqueue to retry queue for delivery worker " + idx);
-            }
-        }
-
-        public void clearRetryDelayForSubscriber(ActiveSubscriberState subscriber) {
-            subscriber.clearLastScanErrorTime();
-            if (!retryQueue.offer(subscriber)) {
-                throw new UnexpectedError("Could not enqueue to delivery manager retry queue");
-            }
-            // no request in request queue now
-            // issue a empty delivery request to not waiting for polling requests queue
-            if (requestQueue.isEmpty()) {
-                enqueueWithoutFailure(new DeliveryManagerRequest() {
-                        @Override
-                        public void performRequest() {
-                        // do nothing
-                        }
-                        });
-            }
-        }
-
-        protected void retryErroredSubscribers() {
-            long lastInterestingFailureTime = MathUtils.now() - cfg.getScanBackoffPeriodMs();
-            ActiveSubscriberState subscriber;
-
-            while ((subscriber = retryQueue.peek()) != null) {
-                if (subscriber.getLastScanErrorTime() > lastInterestingFailureTime) {
-                    // Not enough time has elapsed yet, will retry later
-                    // Since the queue is fifo, no need to check later items
-                    return;
-                }
-
-                // retry now
-                subscriber.deliverNextMessage();
-                retryQueue.poll();
-            }
-        }
-
-        protected class ShutdownDeliveryManagerRequest implements DeliveryManagerRequest {
-            // This is a simple type of Request we will enqueue when the
-            // PubSubServer is shut down and we want to stop the DeliveryManager
-            // thread.
-            @Override
-            public void performRequest() {
-                keepRunning = false;
-            }
-        }
-
-    }
-
-
-
-    public FIFODeliveryManager(TopicManager tm, PersistenceManager persistenceMgr,
-                               ServerConfiguration cfg) {
-        this.tm = tm;
-        this.persistenceMgr = persistenceMgr;
-        if (persistenceMgr instanceof ReadAheadCache) {
-            this.cache = (ReadAheadCache) persistenceMgr;
-        } else {
-            this.cache = null;
-        }
-        perTopicDeliveryPtrs =
-            new ConcurrentHashMap<ByteString, SortedMap<Long, Set<ActiveSubscriberState>>>();
-        subscriberStates =
-            new ConcurrentHashMap<TopicSubscriber, ActiveSubscriberState>();
-        this.cfg = cfg;
-        // initialize the delivery workers
-        this.numDeliveryWorkers = cfg.getNumDeliveryThreads();
-        this.deliveryWorkers = new DeliveryWorker[numDeliveryWorkers];
-        for (int i=0; i<numDeliveryWorkers; i++) {
-            deliveryWorkers[i] = new DeliveryWorker(i);
-        }
-    }
-
-    @Override
-    public void start() {
-        for (int i=0; i<numDeliveryWorkers; i++) {
-            deliveryWorkers[i].start();
-        }
-    }
-
-    /**
-     * Stop FIFO delivery manager from processing requests. (for testing)
-     */
-    @VisibleForTesting
-    public void suspendProcessing() {
-        for (int i=0; i<numDeliveryWorkers; i++) {
-            deliveryWorkers[i].suspendProcessing();
-        }
-    }
-
-    /**
-     * Resume FIFO delivery manager. (for testing)
-     */
-    @VisibleForTesting
-    public void resumeProcessing() {
-        for (int i=0; i<numDeliveryWorkers; i++) {
-            deliveryWorkers[i].resumeProcessing();
-        }
-    }
-
-    /**
-     * Stop the FIFO delivery manager.
-     */
-    @Override
-    public void stop() {
-        for (int i=0; i<numDeliveryWorkers; i++) {
-            deliveryWorkers[i].stop();
-        }
-    }
-
-    private DeliveryWorker getDeliveryWorker(ByteString topic) {
-        return deliveryWorkers[MathUtils.signSafeMod(topic.hashCode(), numDeliveryWorkers)];
-    }
-
-    /**
-     * ===================================================================== Our
-     * usual enqueue function, stop if error because of unbounded queue, should
-     * never happen
-     *
-     */
-    protected void enqueueWithoutFailure(ByteString topic, DeliveryManagerRequest request) {
-        getDeliveryWorker(topic).enqueueWithoutFailure(request);
-    }
-
-    /**
-     * Tells the delivery manager to start sending out messages for a particular
-     * subscription
-     *
-     * @param topic
-     * @param subscriberId
-     * @param seqIdToStartFrom
-     *            Message sequence-id from where delivery should be started
-     * @param endPoint
-     *            The delivery end point to which send messages to
-     * @param filter
-     *            Only messages passing this filter should be sent to this
-     *            subscriber
-     * @param callback
-     *            Callback instance
-     * @param ctx
-     *            Callback context
-     */
-    @Override
-    public void startServingSubscription(ByteString topic, ByteString subscriberId,
-                                         SubscriptionPreferences preferences,
-                                         MessageSeqId seqIdToStartFrom,
-                                         DeliveryEndPoint endPoint, ServerMessageFilter filter,
-                                         Callback<Void> callback, Object ctx) {
-        ActiveSubscriberState subscriber =
-            new ActiveSubscriberState(topic, subscriberId,
-                                      preferences,
-                                      seqIdToStartFrom.getLocalComponent() - 1,
-                                      endPoint, filter, callback, ctx);
-
-        enqueueWithoutFailure(topic, subscriber);
-    }
-
-    public void stopServingSubscriber(ByteString topic, ByteString subscriberId,
-                                      SubscriptionEvent event,
-                                      Callback<Void> cb, Object ctx) {
-        enqueueWithoutFailure(topic, new StopServingSubscriber(topic, subscriberId, event, cb, ctx));
-    }
-
-    /**
-     * Instructs the delivery manager to backoff on the given subscriber and
-     * retry sending after some time
-     *
-     * @param subscriber
-     */
-    public void retryErroredSubscriberAfterDelay(ActiveSubscriberState subscriber) {
-        getDeliveryWorker(subscriber.getTopic()).retryErroredSubscriberAfterDelay(subscriber);
-    }
-
-    public void clearRetryDelayForSubscriber(ActiveSubscriberState subscriber) {
-        getDeliveryWorker(subscriber.getTopic()).clearRetryDelayForSubscriber(subscriber);
-    }
-
-    // TODO: for now, I don't move messageConsumed request to delivery manager thread,
-    //       which is supposed to be fixed in {@link https://issues.apache.org/jira/browse/BOOKKEEPER-503}
-    @Override
-    public void messageConsumed(ByteString topic, ByteString subscriberId,
-                                MessageSeqId consumedSeqId) {
-        ActiveSubscriberState subState =
-            subscriberStates.get(new TopicSubscriber(topic, subscriberId));
-        if (null == subState) {
-            return;
-        }
-        subState.messageConsumed(consumedSeqId.getLocalComponent());
-    }
-
-    /**
-     * Instructs the delivery manager to move the delivery pointer for a given
-     * subscriber
-     *
-     * @param subscriber
-     * @param prevSeqId
-     * @param newSeqId
-     */
-    public void moveDeliveryPtrForward(ActiveSubscriberState subscriber, long prevSeqId, long newSeqId) {
-        enqueueWithoutFailure(subscriber.getTopic(),
-            new DeliveryPtrMove(subscriber, prevSeqId, newSeqId));
-    }
-
-    protected void removeDeliveryPtr(ActiveSubscriberState subscriber, Long seqId, boolean isAbsenceOk,
-                                     boolean pruneTopic) {
-
-        assert seqId != null;
-
-        // remove this subscriber from the delivery pointers data structure
-        ByteString topic = subscriber.getTopic();
-        SortedMap<Long, Set<ActiveSubscriberState>> deliveryPtrs = perTopicDeliveryPtrs.get(topic);
-
-        if (deliveryPtrs == null && !isAbsenceOk) {
-            throw new UnexpectedError("No delivery pointers found while disconnecting " + "channel for topic:" + topic);
-        }
-
-        if(null == deliveryPtrs) {
-            return;
-        }
-
-        if (!MapMethods.removeFromMultiMap(deliveryPtrs, seqId, subscriber) && !isAbsenceOk) {
-
-            throw new UnexpectedError("Could not find subscriber:" + subscriber + " at the expected delivery pointer");
-        }
-
-        if (pruneTopic && deliveryPtrs.isEmpty()) {
-            perTopicDeliveryPtrs.remove(topic);
-        }
-
-    }
-
-    protected long getMinimumSeqId(ByteString topic) {
-        SortedMap<Long, Set<ActiveSubscriberState>> deliveryPtrs = perTopicDeliveryPtrs.get(topic);
-
-        if (deliveryPtrs == null || deliveryPtrs.isEmpty()) {
-            return Long.MAX_VALUE - 1;
-        }
-        return deliveryPtrs.firstKey();
-    }
-
-    protected void addDeliveryPtr(ActiveSubscriberState subscriber, Long seqId) {
-
-        // If this topic doesn't exist in the per-topic delivery pointers table,
-        // create an entry for it
-        SortedMap<Long, Set<ActiveSubscriberState>> deliveryPtrs = MapMethods.getAfterInsertingIfAbsent(
-                    perTopicDeliveryPtrs, subscriber.getTopic(), TreeMapLongToSetSubscriberFactory.instance);
-
-        MapMethods.addToMultiMap(deliveryPtrs, seqId, subscriber, HashMapSubscriberFactory.instance);
-    }
-
-    public class ActiveSubscriberState
-        implements ScanCallback, DeliveryCallback, DeliveryManagerRequest, CancelScanRequest {
-
-        static final int UNLIMITED = 0;
-
-        ByteString topic;
-        ByteString subscriberId;
-        long lastLocalSeqIdDelivered;
-        boolean connected = true;
-        ReentrantReadWriteLock connectedLock = new ReentrantReadWriteLock();
-        DeliveryEndPoint deliveryEndPoint;
-        long lastScanErrorTime = -1;
-        long localSeqIdDeliveringNow;
-        long lastSeqIdCommunicatedExternally;
-        long lastSeqIdConsumedUtil;
-        boolean isThrottled = false;
-        final int messageWindowSize;
-        ServerMessageFilter filter;
-        Callback<Void> cb;
-        Object ctx;
-
-        // track the outstanding scan request
-        // so we could cancel it
-        ScanRequest outstandingScanRequest;
-
-        final static int SEQ_ID_SLACK = 10;
-
-        public ActiveSubscriberState(ByteString topic, ByteString subscriberId,
-                                     SubscriptionPreferences preferences,
-                                     long lastLocalSeqIdDelivered,
-                                     DeliveryEndPoint deliveryEndPoint,
-                                     ServerMessageFilter filter,
-                                     Callback<Void> cb, Object ctx) {
-            this.topic = topic;
-            this.subscriberId = subscriberId;
-            this.lastLocalSeqIdDelivered = lastLocalSeqIdDelivered;
-            this.lastSeqIdConsumedUtil = lastLocalSeqIdDelivered;
-            this.deliveryEndPoint = deliveryEndPoint;
-            this.filter = filter;
-            if (preferences.hasMessageWindowSize()) {
-                messageWindowSize = preferences.getMessageWindowSize();
-            } else {
-                if (FIFODeliveryManager.this.cfg.getDefaultMessageWindowSize() > 0) {
-                    messageWindowSize =
-                        FIFODeliveryManager.this.cfg.getDefaultMessageWindowSize();
-                } else {
-                    messageWindowSize = UNLIMITED;
-                }
-            }
-            this.cb = cb;
-            this.ctx = ctx;
-        }
-
-        public void setNotConnected(SubscriptionEvent event) {
-            this.connectedLock.writeLock().lock();
-            try {
-                // have closed it.
-                if (!connected) {
-                    return;
-                }
-                this.connected = false;
-                // put itself in ReadAhead queue to cancel outstanding scan request
-                // if outstanding scan request callback before cancel op executed,
-                // nothing it would cancel.
-                if (null != cache && null != outstandingScanRequest) {
-                    cache.cancelScanRequest(topic, this);
-                }
-            } finally {
-                this.connectedLock.writeLock().unlock();
-            }
-
-            if (null != event &&
-                (SubscriptionEvent.TOPIC_MOVED == event ||
-                 SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED == event)) {
-                // we should not close the channel now after enabling multiplexing
-                PubSubResponse response = PubSubResponseUtils.getResponseForSubscriptionEvent(
-                    topic, subscriberId, event
-                );
-                deliveryEndPoint.send(response, new DeliveryCallback() {
-                    @Override
-                    public void sendingFinished() {
-                        // do nothing now
-                    }
-                    @Override
-                    public void transientErrorOnSend() {
-                        // do nothing now
-                    }
-                    @Override
-                    public void permanentErrorOnSend() {
-                        // if channel is broken, close the channel
-                        deliveryEndPoint.close();
-                    }
-                });
-            }
-            // uninitialize filter
-            this.filter.uninitialize();
-        }
-
-        public ByteString getTopic() {
-            return topic;
-        }
-
-        public synchronized long getLastScanErrorTime() {
-            return lastScanErrorTime;
-        }
-
-        public synchronized void setLastScanErrorTime(long lastScanErrorTime) {
-            this.lastScanErrorTime = lastScanErrorTime;
-        }
-
-        /**
-         * Clear the last scan error time so it could be retry immediately.
-         */
-        protected synchronized void clearLastScanErrorTime() {
-            this.lastScanErrorTime = -1;
-        }
-
-        protected boolean isConnected() {
-            connectedLock.readLock().lock();
-            try {
-                return connected;
-            } finally {
-                connectedLock.readLock().unlock();
-            }
-        }
-
-        protected synchronized void messageConsumed(long newSeqIdConsumed) {
-            if (newSeqIdConsumed <= lastSeqIdConsumedUtil) {
-                return;
-            }
-            if (logger.isDebugEnabled()) {
-                logger.debug("Subscriber ({}) moved consumed ptr from {} to {}.",
-                             va(this, lastSeqIdConsumedUtil, newSeqIdConsumed));
-            }
-            lastSeqIdConsumedUtil = newSeqIdConsumed;
-            // after updated seq id check whether it still exceed msg limitation
-            if (msgLimitExceeded()) {
-                return;
-            }
-            if (isThrottled) {
-                isThrottled = false;
-                logger.info("Try to wake up subscriber ({}) to deliver messages again : last delivered {}, last consumed {}.",
-                            va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
-
-                enqueueWithoutFailure(topic, new DeliveryManagerRequest() {
-                    @Override
-                    public void performRequest() {
-                        // enqueue
-                        clearRetryDelayForSubscriber(ActiveSubscriberState.this);
-                    }
-                });
-            }
-        }
-
-        protected boolean msgLimitExceeded() {
-            if (messageWindowSize == UNLIMITED) {
-                return false;
-            }
-            if (lastLocalSeqIdDelivered - lastSeqIdConsumedUtil >= messageWindowSize) {
-                return true;
-            }
-            return false;
-        }
-
-        public void deliverNextMessage() {
-            connectedLock.readLock().lock();
-            try {
-                doDeliverNextMessage();
-            } finally {
-                connectedLock.readLock().unlock();
-            }
-        }
-
-        private void doDeliverNextMessage() {
-            if (!connected) {
-                return;
-            }
-
-            synchronized (this) {
-                // check whether we have delivered enough messages without receiving their consumes
-                if (msgLimitExceeded()) {
-                    logger.info("Subscriber ({}) is throttled : last delivered {}, last consumed {}.",
-                                va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
-                    isThrottled = true;
-                    // do nothing, since the delivery process would be throttled.
-                    // After message consumed, it would be added back to retry queue.
-                    return;
-                }
-
-                localSeqIdDeliveringNow = persistenceMgr.getSeqIdAfterSkipping(topic, lastLocalSeqIdDelivered, 1);
-
-                outstandingScanRequest = new ScanRequest(topic, localSeqIdDeliveringNow,
-                        /* callback= */this, /* ctx= */null);
-            }
-
-            persistenceMgr.scanSingleMessage(outstandingScanRequest);
-        }
-
-        /**
-         * ===============================================================
-         * {@link CancelScanRequest} methods
-         *
-         * This method runs ins same threads with ScanCallback. When it runs,
-         * it checked whether it is outstanding scan request. if there is one,
-         * cancel it.
-         */
-        @Override
-        public ScanRequest getScanRequest() {
-            // no race between cancel request and scan callback
-            // the only race is between stopServing and deliverNextMessage
-            // deliverNextMessage would be executed in netty callback which is in netty thread
-            // stopServing is run in delivery thread. if stopServing runs before deliverNextMessage
-            // deliverNextMessage would have chance to put a stub in ReadAheadCache
-            // then we don't have any chance to cancel it.
-            // use connectedLock to avoid such race.
-            return outstandingScanRequest;
-        }
-
-        private boolean checkConnected() {
-            connectedLock.readLock().lock();
-            try {
-                // message scanned means the outstanding request is executed
-                outstandingScanRequest = null;
-                return connected;
-            } finally {
-                connectedLock.readLock().unlock();
-            }
-        }
-
-        /**
-         * ===============================================================
-         * {@link ScanCallback} methods
-         */
-
-        public void messageScanned(Object ctx, Message message) {
-            if (!checkConnected()) {
-                return;
-            }
-
-            // only increment topic access times when tried to deliver a message
-            // for those subscribers just waiting for a published for a long time
-            // we don't increment topic access times, so the topic would be evicted
-            // in future.
-            tm.incrementTopicAccessTimes(topic);
-
-            if (!filter.testMessage(message)) {
-                // for filtered out messages, we don't deliver the message to client, so we would not
-                // receive its consume request which moves the <i>lastSeqIdConsumedUtil</i> pointer.
-                // we move the <i>lastSeqIdConsumedUtil</i> here for filtered out messages, which would
-                // avoid a subscriber being throttled due to the message gap introduced by filtering.
-                //
-                // it is OK to move <i>lastSeqIdConsumedUtil</i> here, since this pointer is subscriber's
-                // delivery state which to trottling deliver. changing <i>lastSeqIdConsumedUtil</i> would
-                // not affect the subscriber's consume pointer in zookeeper which is managed in subscription
-                // manager.
-                //
-                // And marking message consumed before calling sending finished, would avoid the subscriber
-                // being throttled first and released from throttled state laster.
-                messageConsumed(message.getMsgId().getLocalComponent());
-                sendingFinished();
-                return;
-            }
-
-            /**
-             * The method below will invoke our sendingFinished() method when
-             * done
-             */
-            PubSubResponse response = PubSubResponse.newBuilder()
-                                      .setProtocolVersion(ProtocolVersion.VERSION_ONE)
-                                      .setStatusCode(StatusCode.SUCCESS).setTxnId(0)
-                                      .setMessage(message).setTopic(topic)
-                                      .setSubscriberId(subscriberId).build();
-
-            deliveryEndPoint.send(response, //
-                                  // callback =
-                                  this);
-
-        }
-
-        @Override
-        public void scanFailed(Object ctx, Exception exception) {
-            if (!checkConnected()) {
-                return;
-            }
-
-            // wait for some time and then retry
-            retryErroredSubscriberAfterDelay(this);
-        }
-
-        @Override
-        public void scanFinished(Object ctx, ReasonForFinish reason) {
-            checkConnected();
-        }
-
-        /**
-         * ===============================================================
-         * {@link DeliveryCallback} methods
-         */
-        @Override
-        public void sendingFinished() {
-            if (!isConnected()) {
-                return;
-            }
-
-            synchronized (this) {
-                lastLocalSeqIdDelivered = localSeqIdDeliveringNow;
-
-                if (lastLocalSeqIdDelivered > lastSeqIdCommunicatedExternally + SEQ_ID_SLACK) {
-                    // Note: The order of the next 2 statements is important. We should
-                    // submit a request to change our delivery pointer only *after* we
-                    // have actually changed it. Otherwise, there is a race condition
-                    // with removal of this channel, w.r.t, maintaining the deliveryPtrs
-                    // tree map.
-                    long prevId = lastSeqIdCommunicatedExternally;
-                    lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
-                    moveDeliveryPtrForward(this, prevId, lastLocalSeqIdDelivered);
-                }
-            }
-            // increment deliveried message
-            ServerStats.getInstance().incrementMessagesDelivered();
-            deliverNextMessage();
-        }
-
-        public synchronized long getLastSeqIdCommunicatedExternally() {
-            return lastSeqIdCommunicatedExternally;
-        }
-
-
-        @Override
-        public void permanentErrorOnSend() {
-            // the underlying channel is broken, the channel will
-            // be closed in UmbrellaHandler when exception happened.
-            // so we don't need to close the channel again
-            stopServingSubscriber(topic, subscriberId, null,
-                                  NOP_CALLBACK, null);
-        }
-
-        @Override
-        public void transientErrorOnSend() {
-            retryErroredSubscriberAfterDelay(this);
-        }
-
-        /**
-         * ===============================================================
-         * {@link DeliveryManagerRequest} methods
-         */
-        @Override
-        public void performRequest() {
-            // Put this subscriber in the channel to subscriber mapping
-            ActiveSubscriberState prevSubscriber =
-                subscriberStates.put(new TopicSubscriber(topic, subscriberId), this);
-
-            // after put the active subscriber in subscriber states mapping
-            // trigger the callback to tell it started to deliver the message
-            // should let subscriber response go first before first delivered message.
-            cb.operationFinished(ctx, (Void)null);
-
-            if (prevSubscriber != null) {
-                // we already in the delivery thread, we don't need to equeue a stop request
-                // just stop it now, since stop is not blocking operation.
-                // and also it cleans the old state of the active subscriber immediately.
-                SubscriptionEvent se;
-                if (deliveryEndPoint.equals(prevSubscriber.deliveryEndPoint)) {
-                    logger.debug("Subscriber {} replaced a duplicated subscriber {} at same delivery point {}.",
-                                 va(this, prevSubscriber, deliveryEndPoint));
-                    se = null;
-                } else {
-                    logger.debug("Subscriber {} from delivery point {} forcelly closed delivery point {}.",
-                                 va(this, deliveryEndPoint, prevSubscriber.deliveryEndPoint));
-                    se = SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED;
-                }
-                doStopServingSubscriber(prevSubscriber, se);
-            }
-
-            synchronized (this) {
-                lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
-                addDeliveryPtr(this, lastLocalSeqIdDelivered);
-            }
-
-            deliverNextMessage();
-        };
-
-        @Override
-        public String toString() {
-            StringBuilder sb = new StringBuilder();
-            sb.append("Topic: ");
-            sb.append(topic.toStringUtf8());
-            sb.append("Subscriber: ");
-            sb.append(subscriberId.toStringUtf8());
-            sb.append(", DeliveryPtr: ");
-            sb.append(lastLocalSeqIdDelivered);
-            return sb.toString();
-
-        }
-    }
-
-    protected class StopServingSubscriber implements DeliveryManagerRequest {
-        TopicSubscriber ts;
-        SubscriptionEvent event;
-        final Callback<Void> cb;
-        final Object ctx;
-
-        public StopServingSubscriber(ByteString topic, ByteString subscriberId,
-                                     SubscriptionEvent event,
-                                     Callback<Void> callback, Object ctx) {
-            this.ts = new TopicSubscriber(topic, subscriberId);
-            this.event = event;
-            this.cb = callback;
-            this.ctx = ctx;
-        }
-
-        @Override
-        public void performRequest() {
-            ActiveSubscriberState subscriber = subscriberStates.remove(ts);
-            if (null != subscriber) {
-                doStopServingSubscriber(subscriber, event);
-            }
-            cb.operationFinished(ctx, null);
-        }
-
-    }
-
-    /**
-     * Stop serving a subscriber. This method should be called in a
-     * {@link DeliveryManagerRequest}.
-     *
-     * @param subscriber
-     *          Active Subscriber to stop
-     * @param event
-     *          Subscription Event for the stop reason
-     */
-    private void doStopServingSubscriber(ActiveSubscriberState subscriber, SubscriptionEvent event) {
-        // This will automatically stop delivery, and disconnect the channel
-        subscriber.setNotConnected(event);
-
-        // if the subscriber has moved on, a move request for its delivery
-        // pointer must be pending in the request queue. Note that the
-        // subscriber first changes its delivery pointer and then submits a
-        // request to move so this works.
-        removeDeliveryPtr(subscriber, subscriber.getLastSeqIdCommunicatedExternally(), //
-                          // isAbsenceOk=
-                          true,
-                          // pruneTopic=
-                          true);
-    }
-
-    protected class DeliveryPtrMove implements DeliveryManagerRequest {
-
-        ActiveSubscriberState subscriber;
-        Long oldSeqId;
-        Long newSeqId;
-
-        public DeliveryPtrMove(ActiveSubscriberState subscriber, Long oldSeqId, Long newSeqId) {
-            this.subscriber = subscriber;
-            this.oldSeqId = oldSeqId;
-            this.newSeqId = newSeqId;
-        }
-
-        @Override
-        public void performRequest() {
-            ByteString topic = subscriber.getTopic();
-            long prevMinSeqId = getMinimumSeqId(topic);
-
-            if (subscriber.isConnected()) {
-                removeDeliveryPtr(subscriber, oldSeqId, //
-                                  // isAbsenceOk=
-                                  false,
-                                  // pruneTopic=
-                                  false);
-
-                addDeliveryPtr(subscriber, newSeqId);
-            } else {
-                removeDeliveryPtr(subscriber, oldSeqId, //
-                                  // isAbsenceOk=
-                                  true,
-                                  // pruneTopic=
-                                  true);
-            }
-
-            long nowMinSeqId = getMinimumSeqId(topic);
-
-            if (nowMinSeqId > prevMinSeqId) {
-                persistenceMgr.deliveredUntil(topic, nowMinSeqId);
-            }
-        }
-    }
-
-    /**
-     * ====================================================================
-     *
-     * Dumb factories for our map methods
-     */
-    protected static class TreeMapLongToSetSubscriberFactory implements
-        Factory<SortedMap<Long, Set<ActiveSubscriberState>>> {
-        static TreeMapLongToSetSubscriberFactory instance = new TreeMapLongToSetSubscriberFactory();
-
-        @Override
-        public SortedMap<Long, Set<ActiveSubscriberState>> newInstance() {
-            return new TreeMap<Long, Set<ActiveSubscriberState>>();
-        }
-    }
-
-    protected static class HashMapSubscriberFactory implements Factory<Set<ActiveSubscriberState>> {
-        static HashMapSubscriberFactory instance = new HashMapSubscriberFactory();
-
-        @Override
-        public Set<ActiveSubscriberState> newInstance() {
-            return new HashSet<ActiveSubscriberState>();
-        }
-    }
-
-    @Override
-    public void onSubChannelDisconnected(TopicSubscriber topicSubscriber) {
-        stopServingSubscriber(topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(),
-                null, NOP_CALLBACK, null);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java
deleted file mode 100644
index 4189eb6..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.netty.ServerStats;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-public abstract class BaseHandler implements Handler {
-
-    protected TopicManager topicMgr;
-    protected ServerConfiguration cfg;
-
-    protected BaseHandler(TopicManager tm, ServerConfiguration cfg) {
-        this.topicMgr = tm;
-        this.cfg = cfg;
-    }
-
-
-    public void handleRequest(final PubSubRequest request, final Channel channel) {
-        topicMgr.getOwner(request.getTopic(), request.getShouldClaim(),
-        new Callback<HedwigSocketAddress>() {
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
-                ServerStats.getInstance().getOpStats(request.getType()).incrementFailedOps();
-            }
-
-            @Override
-            public void operationFinished(Object ctx, HedwigSocketAddress owner) {
-                if (!owner.equals(cfg.getServerAddr())) {
-                    channel.write(PubSubResponseUtils.getResponseForException(
-                                      new ServerNotResponsibleForTopicException(owner.toString()), request.getTxnId()));
-                    ServerStats.getInstance().incrementRequestsRedirect();
-                    return;
-                }
-                handleRequestAtOwner(request, channel);
-            }
-        }, null);
-    }
-
-    public abstract void handleRequestAtOwner(PubSubRequest request, Channel channel);
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java
deleted file mode 100644
index 458d301..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import org.jboss.netty.channel.Channel;
-
-public interface ChannelDisconnectListener {
-
-    /**
-     * Act on a particular channel being disconnected
-     * @param channel
-     */
-    public void channelDisconnected(Channel channel);
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java
deleted file mode 100644
index a6ccb7e..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.delivery.DeliveryManager;
-import org.apache.hedwig.server.netty.ServerStats;
-import org.apache.hedwig.server.netty.ServerStats.OpStats;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.server.subscriptions.SubscriptionManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-
-public class CloseSubscriptionHandler extends BaseHandler {
-    SubscriptionManager subMgr;
-    DeliveryManager deliveryMgr;
-    SubscriptionChannelManager subChannelMgr;
-    // op stats
-    final OpStats closesubStats;
-
-    public CloseSubscriptionHandler(ServerConfiguration cfg, TopicManager tm,
-                                    SubscriptionManager subMgr,
-                                    DeliveryManager deliveryMgr,
-                                    SubscriptionChannelManager subChannelMgr) {
-        super(tm, cfg);
-        this.subMgr = subMgr;
-        this.deliveryMgr = deliveryMgr;
-        this.subChannelMgr = subChannelMgr;
-        closesubStats = ServerStats.getInstance().getOpStats(OperationType.CLOSESUBSCRIPTION);
-    }
-
-    @Override
-    public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
-        if (!request.hasCloseSubscriptionRequest()) {
-            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
-                    "Missing closesubscription request data");
-            closesubStats.incrementFailedOps();
-            return;
-        }
-
-        final CloseSubscriptionRequest closesubRequest =
-                request.getCloseSubscriptionRequest();
-        final ByteString topic = request.getTopic();
-        final ByteString subscriberId = closesubRequest.getSubscriberId();
-
-        final long requestTime = System.currentTimeMillis();
-
-        subMgr.closeSubscription(topic, subscriberId, new Callback<Void>() {
-            @Override
-            public void operationFinished(Object ctx, Void result) {
-                // we should not close the channel in delivery manager
-                // since client waits the response for closeSubscription request
-                // client side would close the channel
-                deliveryMgr.stopServingSubscriber(topic, subscriberId, null,
-                new Callback<Void>() {
-                    @Override
-                    public void operationFailed(Object ctx, PubSubException exception) {
-                        channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
-                        closesubStats.incrementFailedOps();
-                    }
-                    @Override
-                    public void operationFinished(Object ctx, Void resultOfOperation) {
-                        // remove the topic subscription from subscription channels
-                        subChannelMgr.remove(new TopicSubscriber(topic, subscriberId),
-                                             channel);
-                        channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
-                        closesubStats.updateLatency(System.currentTimeMillis() - requestTime);
-                    }
-                }, null);
-            }
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
-                closesubStats.incrementFailedOps();
-            }
-        }, null);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java
deleted file mode 100644
index 5042a37..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.netty.ServerStats;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.server.netty.ServerStats.OpStats;
-import org.apache.hedwig.server.subscriptions.SubscriptionManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-
-public class ConsumeHandler extends BaseHandler {
-
-    SubscriptionManager sm;
-    Callback<Void> noopCallback = new NoopCallback<Void>();
-    final OpStats consumeStats = ServerStats.getInstance().getOpStats(OperationType.CONSUME);
-
-    class NoopCallback<T> implements Callback<T> {
-        @Override
-        public void operationFailed(Object ctx, PubSubException exception) {
-            consumeStats.incrementFailedOps();
-        }
-
-        public void operationFinished(Object ctx, T resultOfOperation) {
-            // we don't collect consume process time
-            consumeStats.updateLatency(0);
-        };
-    }
-
-    @Override
-    public void handleRequestAtOwner(PubSubRequest request, Channel channel) {
-        if (!request.hasConsumeRequest()) {
-            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
-                    "Missing consume request data");
-            consumeStats.incrementFailedOps();
-            return;
-        }
-
-        ConsumeRequest consumeRequest = request.getConsumeRequest();
-
-        sm.setConsumeSeqIdForSubscriber(request.getTopic(), consumeRequest.getSubscriberId(),
-                                        consumeRequest.getMsgId(), noopCallback, null);
-
-    }
-
-    public ConsumeHandler(TopicManager tm, SubscriptionManager sm, ServerConfiguration cfg) {
-        super(tm, cfg);
-        this.sm = sm;
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/Handler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/Handler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/Handler.java
deleted file mode 100644
index c391f5c..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/Handler.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-
-public interface Handler {
-
-    /**
-     * Handle a request synchronously or asynchronously. After handling the
-     * request, the appropriate response should be written on the given channel
-     *
-     * @param request
-     *            The request to handle
-     *
-     * @param channel
-     *            The channel on which to write the response
-     */
-    public void handleRequest(final PubSubRequest request, final Channel channel);
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java
deleted file mode 100644
index e0f1487..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.handlers;
-
-import org.apache.hedwig.server.handlers.SubscriptionChannelManager;
-import org.apache.hedwig.server.jmx.HedwigMBeanInfo;
-
-public class NettyHandlerBean implements NettyHandlerMXBean, HedwigMBeanInfo {
-
-    SubscriptionChannelManager subChannelMgr;
-
-   public NettyHandlerBean(SubscriptionChannelManager subChannelMgr) {
-       this.subChannelMgr = subChannelMgr;
-    }
-
-    @Override
-    public String getName() {
-        return "NettyHandlers";
-    }
-
-    @Override
-    public boolean isHidden() {
-        return false;
-    }
-
-    @Override
-    public int getNumSubscriptionChannels() {
-        return subChannelMgr.getNumSubscriptionChannels();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerMXBean.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerMXBean.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerMXBean.java
deleted file mode 100644
index ab8af29..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerMXBean.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.handlers;
-
-/**
- * Netty Handler MBean
- */
-public interface NettyHandlerMXBean {
-
-    /**
-     * @return number of subscription channels
-     */
-    public int getNumSubscriptionChannels();
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java
deleted file mode 100644
index 587f904..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.jboss.netty.channel.Channel;
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.netty.ServerStats;
-import org.apache.hedwig.server.netty.ServerStats.OpStats;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.server.persistence.PersistRequest;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-
-public class PublishHandler extends BaseHandler {
-
-    private PersistenceManager persistenceMgr;
-    private final OpStats pubStats;
-
-    public PublishHandler(TopicManager topicMgr, PersistenceManager persistenceMgr, ServerConfiguration cfg) {
-        super(topicMgr, cfg);
-        this.persistenceMgr = persistenceMgr;
-        this.pubStats = ServerStats.getInstance().getOpStats(OperationType.PUBLISH);
-    }
-
-    @Override
-    public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
-        if (!request.hasPublishRequest()) {
-            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
-                    "Missing publish request data");
-            pubStats.incrementFailedOps();
-            return;
-        }
-
-        Message msgToSerialize = Message.newBuilder(request.getPublishRequest().getMsg()).setSrcRegion(
-                                     cfg.getMyRegionByteString()).build();
-
-        final long requestTime = MathUtils.now();
-        PersistRequest persistRequest = new PersistRequest(request.getTopic(), msgToSerialize,
-        new Callback<PubSubProtocol.MessageSeqId>() {
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
-                pubStats.incrementFailedOps();
-            }
-
-            @Override
-            public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId resultOfOperation) {
-                channel.write(getSuccessResponse(request.getTxnId(), resultOfOperation));
-                pubStats.updateLatency(MathUtils.now() - requestTime);
-            }
-        }, null);
-
-        persistenceMgr.persistMessage(persistRequest);
-    }
-
-    private static PubSubProtocol.PubSubResponse getSuccessResponse(long txnId, PubSubProtocol.MessageSeqId publishedMessageSeqId) {
-        if (null == publishedMessageSeqId) {
-            return PubSubResponseUtils.getSuccessResponse(txnId);
-        }
-        PubSubProtocol.PublishResponse publishResponse = PubSubProtocol.PublishResponse.newBuilder().setPublishedMsgId(publishedMessageSeqId).build();
-        PubSubProtocol.ResponseBody responseBody = PubSubProtocol.ResponseBody.newBuilder().setPublishResponse(publishResponse).build();
-        return PubSubProtocol.PubSubResponse.newBuilder().
-            setProtocolVersion(PubSubResponseUtils.serverVersion).
-            setStatusCode(PubSubProtocol.StatusCode.SUCCESS).setTxnId(txnId).
-            setResponseBody(responseBody).build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
deleted file mode 100644
index 6df70a3..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
-import org.apache.hedwig.filter.PipelineFilter;
-import org.apache.hedwig.filter.ServerMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.delivery.ChannelEndPoint;
-import org.apache.hedwig.server.delivery.DeliveryManager;
-import org.apache.hedwig.server.netty.ServerStats;
-import org.apache.hedwig.server.netty.ServerStats.OpStats;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.subscriptions.SubscriptionManager;
-import org.apache.hedwig.server.subscriptions.AllToAllTopologyFilter;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-
-public class SubscribeHandler extends BaseHandler {
-    private static final Logger logger = LoggerFactory.getLogger(SubscribeHandler.class);
-
-    private final DeliveryManager deliveryMgr;
-    private final PersistenceManager persistenceMgr;
-    private final SubscriptionManager subMgr;
-    private final SubscriptionChannelManager subChannelMgr;
-
-    // op stats
-    private final OpStats subStats;
-
-    public SubscribeHandler(ServerConfiguration cfg, TopicManager topicMgr,
-                            DeliveryManager deliveryManager,
-                            PersistenceManager persistenceMgr,
-                            SubscriptionManager subMgr,
-                            SubscriptionChannelManager subChannelMgr) {
-        super(topicMgr, cfg);
-        this.deliveryMgr = deliveryManager;
-        this.persistenceMgr = persistenceMgr;
-        this.subMgr = subMgr;
-        this.subChannelMgr = subChannelMgr;
-        subStats = ServerStats.getInstance().getOpStats(OperationType.SUBSCRIBE);
-    }
-
-    @Override
-    public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
-
-        if (!request.hasSubscribeRequest()) {
-            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
-                    "Missing subscribe request data");
-            subStats.incrementFailedOps();
-            return;
-        }
-
-        final ByteString topic = request.getTopic();
-
-        MessageSeqId seqId;
-        try {
-            seqId = persistenceMgr.getCurrentSeqIdForTopic(topic);
-        } catch (ServerNotResponsibleForTopicException e) {
-            channel.write(PubSubResponseUtils.getResponseForException(e, request.getTxnId())).addListener(
-                ChannelFutureListener.CLOSE);
-            logger.error("Error getting current seq id for topic " + topic.toStringUtf8()
-                       + " when processing subscribe request (txnid:" + request.getTxnId() + ") :", e);
-            subStats.incrementFailedOps();
-            ServerStats.getInstance().incrementRequestsRedirect();
-            return;
-        }
-
-        final SubscribeRequest subRequest = request.getSubscribeRequest();
-        final ByteString subscriberId = subRequest.getSubscriberId();
-
-        MessageSeqId lastSeqIdPublished = MessageSeqId.newBuilder(seqId).setLocalComponent(seqId.getLocalComponent()).build();
-
-        final long requestTime = MathUtils.now();
-        subMgr.serveSubscribeRequest(topic, subRequest, lastSeqIdPublished, new Callback<SubscriptionData>() {
-
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId())).addListener(
-                    ChannelFutureListener.CLOSE);
-                logger.error("Error serving subscribe request (" + request.getTxnId() + ") for (topic: "
-                           + topic.toStringUtf8() + " , subscriber: " + subscriberId.toStringUtf8() + ")", exception);
-                subStats.incrementFailedOps();
-            }
-
-            @Override
-            public void operationFinished(Object ctx, final SubscriptionData subData) {
-
-                TopicSubscriber topicSub = new TopicSubscriber(topic, subscriberId);
-                synchronized (channel) {
-                    if (!channel.isConnected()) {
-                        // channel got disconnected while we were processing the
-                        // subscribe request,
-                        // nothing much we can do in this case
-                        subStats.incrementFailedOps();
-                        return;
-                    }
-                }
-                // initialize the message filter
-                PipelineFilter filter = new PipelineFilter();
-                try {
-                    // the filter pipeline should be
-                    // 1) AllToAllTopologyFilter to filter cross-region messages
-                    filter.addLast(new AllToAllTopologyFilter());
-                    // 2) User-Customized MessageFilter
-                    if (subData.hasPreferences() &&
-                        subData.getPreferences().hasMessageFilter()) {
-                        String messageFilterName = subData.getPreferences().getMessageFilter();
-                        filter.addLast(ReflectionUtils.newInstance(messageFilterName, ServerMessageFilter.class));
-                    }
-                    // initialize the filter
-                    filter.initialize(cfg.getConf());
-                    filter.setSubscriptionPreferences(topic, subscriberId,
-                                                      subData.getPreferences());
-                } catch (RuntimeException re) {
-                    String errMsg = "RuntimeException caught when instantiating message filter for (topic:"
-                                  + topic.toStringUtf8() + ", subscriber:" + subscriberId.toStringUtf8() + ")."
-                                  + "It might be introduced by programming error in message filter.";
-                    logger.error(errMsg, re);
-                    PubSubException pse = new PubSubException.InvalidMessageFilterException(errMsg, re);
-                    subStats.incrementFailedOps();
-                    // we should not close the subscription channel, just response error
-                    // client decide to close it or not.
-                    channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId()));
-                    return;
-                } catch (Throwable t) {
-                    String errMsg = "Failed to instantiate message filter for (topic:" + topic.toStringUtf8()
-                                  + ", subscriber:" + subscriberId.toStringUtf8() + ").";
-                    logger.error(errMsg, t);
-                    PubSubException pse = new PubSubException.InvalidMessageFilterException(errMsg, t);
-                    subStats.incrementFailedOps();
-                    channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId()))
-                    .addListener(ChannelFutureListener.CLOSE);
-                    return;
-                }
-                boolean forceAttach = false;
-                if (subRequest.hasForceAttach()) {
-                    forceAttach = subRequest.getForceAttach();
-                }
-                // Try to store the subscription channel for the topic subscriber
-                Channel oldChannel = subChannelMgr.put(topicSub, channel, forceAttach);
-                if (null != oldChannel) {
-                    PubSubException pse = new PubSubException.TopicBusyException(
-                        "Subscriber " + subscriberId.toStringUtf8() + " for topic " + topic.toStringUtf8()
-                        + " is already being served on a different channel " + oldChannel + ".");
-                    subStats.incrementFailedOps();
-                    channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId()))
-                    .addListener(ChannelFutureListener.CLOSE);
-                    return;
-                }
-
-                // want to start 1 ahead of the consume ptr
-                MessageSeqId lastConsumedSeqId = subData.getState().getMsgId();
-                MessageSeqId seqIdToStartFrom = MessageSeqId.newBuilder(lastConsumedSeqId).setLocalComponent(
-                                                    lastConsumedSeqId.getLocalComponent() + 1).build();
-                deliveryMgr.startServingSubscription(topic, subscriberId,
-                        subData.getPreferences(), seqIdToStartFrom, new ChannelEndPoint(channel), filter,
-                        new Callback<Void>() {
-                            @Override
-                            public void operationFinished(Object ctx, Void result) {
-                                // First write success and then tell the delivery manager,
-                                // otherwise the first message might go out before the response
-                                // to the subscribe
-                                SubscribeResponse.Builder subRespBuilder = SubscribeResponse.newBuilder()
-                                    .setPreferences(subData.getPreferences());
-                                ResponseBody respBody = ResponseBody.newBuilder()
-                                    .setSubscribeResponse(subRespBuilder).build();
-                                channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId(), respBody));
-                                logger.info("Subscribe request (" + request.getTxnId() + ") for (topic:"
-                                            + topic.toStringUtf8() + ", subscriber:" + subscriberId.toStringUtf8()
-                                            + ") from channel " + channel.getRemoteAddress()
-                                            + " succeed - its subscription data is "
-                                            + SubscriptionStateUtils.toString(subData));
-                                subStats.updateLatency(MathUtils.now() - requestTime);
-                            }
-                            @Override
-                            public void operationFailed(Object ctx, PubSubException exception) {
-                                // would not happened
-                            }
-                        }, null);
-            }
-        }, null);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java
deleted file mode 100644
index 3481d81..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import static org.apache.hedwig.util.VarArgs.va;
-
-public class SubscriptionChannelManager implements ChannelDisconnectListener {
-
-    private static final Logger logger = LoggerFactory.getLogger(SubscriptionChannelManager.class);
-
-    static class CloseSubscriptionListener implements ChannelFutureListener {
-
-        final TopicSubscriber ts;
-
-        CloseSubscriptionListener(TopicSubscriber topicSubscriber) {
-            this.ts = topicSubscriber;
-        }
-
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
-            if (!future.isSuccess()) {
-                logger.warn("Failed to write response to close old subscription {}.", ts);
-            } else {
-                logger.debug("Close old subscription {} succeed.", ts);
-            }
-        }
-    };
-
-    final List<SubChannelDisconnectedListener> listeners;
-
-    public interface SubChannelDisconnectedListener {
-        /**
-         * Act on a particular topicSubscriber being disconnected
-         * @param topicSubscriber
-         */
-        public void onSubChannelDisconnected(TopicSubscriber topicSubscriber);
-    }
-
-    final ConcurrentHashMap<TopicSubscriber, Channel> sub2Channel;
-    final ConcurrentHashMap<Channel, Set<TopicSubscriber>> channel2sub;
-
-    public SubscriptionChannelManager() {
-        sub2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
-        channel2sub = new ConcurrentHashMap<Channel, Set<TopicSubscriber>>();
-        listeners = new ArrayList<SubChannelDisconnectedListener>();
-    }
-
-    public void addSubChannelDisconnectedListener(SubChannelDisconnectedListener listener) {
-        if (null != listener) {
-            listeners.add(listener);
-        }
-    }
-
-    @Override
-    public void channelDisconnected(Channel channel) {
-        // Evils of synchronized programming: there is a race between a channel
-        // getting disconnected, and us adding it to the maps when a subscribe
-        // succeeds
-        Set<TopicSubscriber> topicSubs;
-        synchronized (channel) {
-            topicSubs = channel2sub.remove(channel);
-        }
-        if (topicSubs != null) {
-            for (TopicSubscriber topicSub : topicSubs) {
-                logger.info("Subscription channel {} for {} is disconnected.",
-                            va(channel.getRemoteAddress(), topicSub));
-                // remove entry only currently mapped to given value.
-                sub2Channel.remove(topicSub, channel);
-                for (SubChannelDisconnectedListener listener : listeners) {
-                    listener.onSubChannelDisconnected(topicSub);
-                }
-            }
-        }
-    }
-
-    public int getNumSubscriptionChannels() {
-        return channel2sub.size();
-    }
-
-    public int getNumSubscriptions() {
-        return sub2Channel.size();
-    }
-
-    /**
-     * Put <code>topicSub</code> on Channel <code>channel</code>.
-     *
-     * @param topicSub
-     *          Topic Subscription
-     * @param channel
-     *          Netty channel
-     * @param mode
-     *          Create or Attach mode
-     * @return null succeed, otherwise the old existed channel.
-     */
-    public Channel put(TopicSubscriber topicSub, Channel channel, boolean forceAttach) {
-        // race with channel getting disconnected while we are adding it
-        // to the 2 maps
-        synchronized (channel) {
-            Channel oldChannel = sub2Channel.putIfAbsent(topicSub, channel);
-            // if a subscribe request send from same channel,
-            // we treated it a success action.
-            if (null != oldChannel && !oldChannel.equals(channel)) {
-                boolean subSuccess = false;
-                if (forceAttach) {
-                    // it is safe to close old subscription here since the new subscription
-                    // has come from other channel succeed.
-                    synchronized (oldChannel) {
-                        Set<TopicSubscriber> oldTopicSubs = channel2sub.get(oldChannel);
-                        if (null != oldTopicSubs) {
-                            if (!oldTopicSubs.remove(topicSub)) {
-                                logger.warn("Failed to remove old subscription ({}) due to it isn't on channel ({}).",
-                                            va(topicSub, oldChannel));
-                            } else if (oldTopicSubs.isEmpty()) {
-                                channel2sub.remove(oldChannel);
-                            }
-                        }
-                    }
-                    PubSubResponse resp = PubSubResponseUtils.getResponseForSubscriptionEvent(
-                        topicSub.getTopic(), topicSub.getSubscriberId(),
-                        SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED
-                    );
-                    oldChannel.write(resp).addListener(new CloseSubscriptionListener(topicSub));
-                    logger.info("Subscribe request for ({}) from channel ({}) closes old subscripiton on channel ({}).",
-                                va(topicSub, channel, oldChannel));
-                    // try replace the oldChannel
-                    // if replace failure, it migth caused because channelDisconnect callback
-                    // has removed the old channel.
-                    if (!sub2Channel.replace(topicSub, oldChannel, channel)) {
-                        // try to add it now.
-                        // if add failure, it means other one has obtained the channel
-                        oldChannel = sub2Channel.putIfAbsent(topicSub, channel);
-                        if (null == oldChannel) {
-                            subSuccess = true;
-                        }
-                    } else {
-                        subSuccess = true;
-                    }
-                }
-                if (!subSuccess) {
-                    logger.error("Error serving subscribe request for ({}) from ({}) since it already served on ({}).",
-                                 va(topicSub, channel, oldChannel));
-                    return oldChannel;
-                }
-            }
-            // channel2sub is just a cache, so we can add to it
-            // without synchronization
-            Set<TopicSubscriber> topicSubs = channel2sub.get(channel);
-            if (null == topicSubs) {
-                topicSubs = new HashSet<TopicSubscriber>();
-                channel2sub.put(channel, topicSubs); 
-            }
-            topicSubs.add(topicSub);
-            return null;
-        }
-    }
-
-    /**
-     * Remove <code>topicSub</code> from Channel <code>channel</code>
-     *
-     * @param topicSub
-     *          Topic Subscription
-     * @param channel
-     *          Netty channel
-     */
-    public void remove(TopicSubscriber topicSub, Channel channel) {
-        synchronized (channel) {
-            Set<TopicSubscriber> topicSubs = channel2sub.get(channel);
-            if (null != topicSubs) {
-                if (!topicSubs.remove(topicSub)) {
-                    logger.warn("Failed to remove subscription ({}) due to it isn't on channel ({}).",
-                                va(topicSub, channel));
-                } else if (topicSubs.isEmpty()) {
-                    channel2sub.remove(channel);
-                }
-            }
-            if (!sub2Channel.remove(topicSub, channel)) {
-                logger.warn("Failed to remove channel ({}) due to it isn't ({})'s channel.",
-                            va(channel, topicSub));
-            }
-        }
-    }
-}