You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:33 UTC

[23/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java
deleted file mode 100644
index 614efa1..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java
+++ /dev/null
@@ -1,622 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
-import org.apache.hedwig.client.handlers.MessageConsumeCallback;
-import org.apache.hedwig.client.netty.CleanupChannelMap;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.client.netty.SubscriptionEventEmitter;
-import org.apache.hedwig.client.ssl.SslClientContextFactory;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.filter.ClientMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageHeader;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.util.Callback;
-import static org.apache.hedwig.util.VarArgs.va;
-
-/**
- * Basic HChannel Manager Implementation
- */
-public abstract class AbstractHChannelManager implements HChannelManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(AbstractHChannelManager.class);
-
-    // Empty Topic List
-    private final static Set<ByteString> EMPTY_TOPIC_SET =
-        new HashSet<ByteString>();
-
-    // Boolean indicating if the channel manager is running or has been closed.
-    // Once we stop the manager, we should sidestep all of the connect, write callback
-    // and channel disconnected logic.
-    protected boolean closed = false;
-    protected final ReentrantReadWriteLock closedLock =
-        new ReentrantReadWriteLock();
-
-    // Global counter used for generating unique transaction ID's for
-    // publish and subscribe requests
-    protected final AtomicLong globalCounter = new AtomicLong();
-
-    // Concurrent Map to store the mapping from the Topic to the Host.
-    // This could change over time since servers can drop mastership of topics
-    // for load balancing or failover. If a server host ever goes down, we'd
-    // also want to remove all topic mappings the host was responsible for.
-    // The second Map is used as the inverted version of the first one.
-    protected final ConcurrentMap<ByteString, InetSocketAddress> topic2Host =
-        new ConcurrentHashMap<ByteString, InetSocketAddress>();
-    // The inverse mapping is used only when clearing all topics. For performance
-    // consideration, we don't guarantee host2Topics to be consistent with
-    // topic2Host. it would be better to not rely on this mapping for anything
-    // significant.
-    protected final ConcurrentMap<InetSocketAddress, Set<ByteString>> host2Topics =
-        new ConcurrentHashMap<InetSocketAddress, Set<ByteString>>();
-
-    // This channels will be used for publish and unsubscribe requests
-    protected final CleanupChannelMap<InetSocketAddress> host2NonSubscriptionChannels =
-        new CleanupChannelMap<InetSocketAddress>();
-
-    private final ClientConfiguration cfg;
-    // The Netty socket factory for making connections to the server.
-    protected final ChannelFactory socketFactory;
-    // PipelineFactory to create non-subscription netty channels to the appropriate server
-    private final ClientChannelPipelineFactory nonSubscriptionChannelPipelineFactory;
-    // ssl context factory
-    private SslClientContextFactory sslFactory = null;
-
-    // default server channel
-    private final HChannel defaultServerChannel;
-
-    // Each client instantiation will have a Timer for running recurring
-    // threads. One such timer task thread to is to timeout long running
-    // PubSubRequests that are waiting for an ack response from the server.
-    private final Timer clientTimer = new Timer(true);
-    // a common consume callback for all consume requests.
-    private final MessageConsumeCallback consumeCb;
-    // A event emitter to emit subscription events
-    private final SubscriptionEventEmitter eventEmitter;
-
-    protected AbstractHChannelManager(ClientConfiguration cfg,
-                                      ChannelFactory socketFactory) {
-        this.cfg = cfg;
-        this.socketFactory = socketFactory;
-        this.nonSubscriptionChannelPipelineFactory =
-            new NonSubscriptionChannelPipelineFactory(cfg, this);
-
-        // create a default server channel
-        defaultServerChannel =
-            new DefaultServerChannel(cfg.getDefaultServerHost(), this);
-
-        if (cfg.isSSLEnabled()) {
-            sslFactory = new SslClientContextFactory(cfg);
-        }
-
-        consumeCb = new MessageConsumeCallback(cfg, this);
-        eventEmitter = new SubscriptionEventEmitter();
-
-        // Schedule Request Timeout task.
-        clientTimer.schedule(new PubSubRequestTimeoutTask(), 0,
-                             cfg.getTimeoutThreadRunInterval());
-    }
-
-    @Override
-    public SubscriptionEventEmitter getSubscriptionEventEmitter() {
-        return eventEmitter;
-    }
-
-    public MessageConsumeCallback getConsumeCallback() {
-        return consumeCb;
-    }
-
-    public SslClientContextFactory getSslFactory() {
-        return sslFactory;
-    }
-
-    protected ChannelFactory getChannelFactory() {
-        return socketFactory;
-    }
-
-    protected ClientChannelPipelineFactory getNonSubscriptionChannelPipelineFactory() {
-        return this.nonSubscriptionChannelPipelineFactory;
-    }
-
-    protected abstract ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory();
-
-    @Override
-    public void schedule(final TimerTask task, final long delay) {
-        this.closedLock.readLock().lock();
-        try {
-            if (closed) {
-                logger.warn("Task {} is not scheduled due to the channel manager is closed.",
-                            task);
-                return;
-            }
-            clientTimer.schedule(task, delay);
-        } finally {
-            this.closedLock.readLock().unlock();
-        }
-    }
-
-    @Override
-    public void submitOpAfterDelay(final PubSubData pubSubData, final long delay) {
-        this.closedLock.readLock().lock();
-        try {
-            if (closed) {
-                pubSubData.getCallback().operationFailed(pubSubData.context,
-                    new ServiceDownException("Client has been closed."));
-                return;
-            }
-            clientTimer.schedule(new TimerTask() {
-                @Override
-                public void run() {
-                    logger.debug("Submit request {} in {} ms later.",
-                                 va(pubSubData, delay));
-                    submitOp(pubSubData);
-                }
-            }, delay);
-        } finally {
-            closedLock.readLock().unlock();
-        }
-    }
-
-    @Override
-    public void submitOp(PubSubData pubSubData) {
-        HChannel hChannel;
-        if (OperationType.PUBLISH.equals(pubSubData.operationType) ||
-            OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) {
-            hChannel = getNonSubscriptionChannelByTopic(pubSubData.topic);
-        } else {
-            TopicSubscriber ts = new TopicSubscriber(pubSubData.topic,
-                                                     pubSubData.subscriberId);
-            hChannel = getSubscriptionChannelByTopicSubscriber(ts);
-        }
-        // no channel found to submit pubsub data
-        // choose the default server
-        if (null == hChannel) {
-            hChannel = defaultServerChannel;
-        }
-        hChannel.submitOp(pubSubData);
-    }
-
-    @Override
-    public void redirectToHost(PubSubData pubSubData, InetSocketAddress host) {
-        logger.debug("Submit operation {} to host {}.",
-                     va(pubSubData, host));
-        HChannel hChannel;
-        if (OperationType.PUBLISH.equals(pubSubData.operationType) ||
-            OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) {
-            hChannel = getNonSubscriptionChannel(host);
-            if (null == hChannel) {
-                // create a channel to connect to specified host
-                hChannel = createAndStoreNonSubscriptionChannel(host);
-            }
-        } else {
-            hChannel = getSubscriptionChannel(host);
-            if (null == hChannel) {
-                // create a subscription channel to specified host
-                hChannel = createAndStoreSubscriptionChannel(host);
-            }
-        }
-        // no channel found to submit pubsub data
-        // choose the default server
-        if (null == hChannel) {
-            hChannel = defaultServerChannel;
-        }
-        hChannel.submitOp(pubSubData);
-    }
-
-    void submitOpThruChannel(PubSubData pubSubData, Channel channel) {
-        logger.debug("Submit operation {} to thru channel {}.",
-                     va(pubSubData, channel));
-        HChannel hChannel;
-        if (OperationType.PUBLISH.equals(pubSubData.operationType) ||
-            OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) {
-            hChannel = createAndStoreNonSubscriptionChannel(channel);
-        } else {
-            hChannel = createAndStoreSubscriptionChannel(channel);
-        }
-        hChannel.submitOp(pubSubData);
-    }
-
-    @Override
-    public void submitOpToDefaultServer(PubSubData pubSubData) {
-        logger.debug("Submit operation {} to default server {}.",
-                     va(pubSubData, defaultServerChannel));
-        defaultServerChannel.submitOp(pubSubData);
-    }
-
-    // Synchronized method to store the host2Channel mapping (if it doesn't
-    // exist yet). Retrieve the hostname info from the Channel created via the
-    // RemoteAddress tied to it.
-    private HChannel createAndStoreNonSubscriptionChannel(Channel channel) {
-        InetSocketAddress host = NetUtils.getHostFromChannel(channel);
-        HChannel newHChannel = new HChannelImpl(host, channel, this,
-                                                getNonSubscriptionChannelPipelineFactory());
-        return storeNonSubscriptionChannel(host, newHChannel);
-    }
-
-    private HChannel createAndStoreNonSubscriptionChannel(InetSocketAddress host) {
-        HChannel newHChannel = new HChannelImpl(host, this,
-                                                getNonSubscriptionChannelPipelineFactory());
-        return storeNonSubscriptionChannel(host, newHChannel);
-    }
-
-    private HChannel storeNonSubscriptionChannel(InetSocketAddress host,
-                                                 HChannel newHChannel) {
-        return host2NonSubscriptionChannels.addChannel(host, newHChannel);
-    }
-
-    /**
-     * Is there a {@link HChannel} existed for a given host.
-     *
-     * @param host
-     *          Target host address.
-     */
-    private HChannel getNonSubscriptionChannel(InetSocketAddress host) {
-        return host2NonSubscriptionChannels.getChannel(host);
-    }
-
-    /**
-     * Get a non-subscription channel for a given <code>topic</code>.
-     *
-     * @param topic
-     *          Topic Name
-     * @return if <code>topic</code>'s owner is unknown, return null.
-     *         if <code>topic</code>'s owner is know and there is a channel
-     *         existed before, return the existed channel, otherwise created
-     *         a new one.
-     */
-    private HChannel getNonSubscriptionChannelByTopic(ByteString topic) {
-        InetSocketAddress host = topic2Host.get(topic);
-        if (null == host) {
-            // we don't know where is the topic
-            return null;
-        } else {
-            // we had know which server owned the topic
-            HChannel channel = getNonSubscriptionChannel(host);
-            if (null == channel) {
-                // create a channel to connect to specified host
-                channel = createAndStoreNonSubscriptionChannel(host);
-            }
-            return channel;
-        }
-    }
-
-    /**
-     * Handle the disconnected event from a non-subscription {@link HChannel}.
-     *
-     * @param host
-     *          Which host is disconnected.
-     * @param channel
-     *          The underlying established channel.
-     */
-    protected void onNonSubscriptionChannelDisconnected(InetSocketAddress host,
-                                                        Channel channel) {
-        // Only remove the Channel from the mapping if this current
-        // disconnected channel is the same as the cached entry.
-        // Due to race concurrency situations, it is possible to
-        // create multiple channels to the same host for publish
-        // and unsubscribe requests.
-        HChannel hChannel = host2NonSubscriptionChannels.getChannel(host);
-        if (null == hChannel) {
-            return;
-        }
-        Channel underlyingChannel = hChannel.getChannel();
-        if (null == underlyingChannel ||
-            !underlyingChannel.equals(channel)) {
-            return;
-        }
-        logger.info("NonSubscription Channel {} to {} disconnected.",
-                    va(channel, host));
-        // remove existed channel
-        if (host2NonSubscriptionChannels.removeChannel(host, hChannel)) {
-            clearAllTopicsForHost(host);
-        }
-    }
-
-    /**
-     * Create and store a subscription {@link HChannel} thru the underlying established
-     * <code>channel</code>
-     *
-     * @param channel
-     *          The underlying established subscription channel.
-     */
-    protected abstract HChannel createAndStoreSubscriptionChannel(Channel channel);
-
-    /**
-     * Create and store a subscription {@link HChannel} to target host.
-     *
-     * @param host
-     *          Target host address.
-     */
-    protected abstract HChannel createAndStoreSubscriptionChannel(InetSocketAddress host);
-
-    /**
-     * Is there a subscription {@link HChannel} existed for a given host.
-     *
-     * @param host
-     *          Target host address.
-     */
-    protected abstract HChannel getSubscriptionChannel(InetSocketAddress host);
-
-    /**
-     * Get a subscription channel for a given <code>topicSubscriber</code>.
-     *
-     * @param topicSubscriber
-     *          Topic Subscriber
-     * @return if <code>topic</code>'s owner is unknown, return null.
-     *         if <code>topic</code>'s owner is know and there is a channel
-     *         existed before, return the existed channel, otherwise created
-     *         a new one for the <code>topicSubscriber</code>.
-     */
-    protected abstract HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber topicSubscriber);
-
-    /**
-     * Handle the disconnected event from a subscription {@link HChannel}.
-     *
-     * @param host
-     *          Which host is disconnected.
-     * @param channel
-     *          The underlying established channel.
-     */
-    protected abstract void onSubscriptionChannelDisconnected(InetSocketAddress host,
-                                                              Channel channel);
-
-    private void sendConsumeRequest(final TopicSubscriber topicSubscriber,
-                                    final MessageSeqId messageSeqId,
-                                    final Channel channel) {
-        PubSubRequest.Builder pubsubRequestBuilder =
-            NetUtils.buildConsumeRequest(nextTxnId(), topicSubscriber, messageSeqId);  
-
-        // For Consume requests, we will send them from the client in a fire and
-        // forget manner. We are not expecting the server to send back an ack
-        // response so no need to register this in the ResponseHandler. There
-        // are no callbacks to invoke since this isn't a client initiated
-        // action. Instead, just have a future listener that will log an error
-        // message if there was a problem writing the consume request.
-        logger.debug("Writing a Consume request to host: {} with messageSeqId: {} for {}",
-                     va(NetUtils.getHostFromChannel(channel), messageSeqId, topicSubscriber));
-        ChannelFuture future = channel.write(pubsubRequestBuilder.build());
-        future.addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-                if (!future.isSuccess()) {
-                    logger.error("Error writing a Consume request to host: {} with messageSeqId: {} for {}",
-                                 va(NetUtils.getHostFromChannel(channel),
-                                    messageSeqId, topicSubscriber));
-                }
-            }
-        });
-    }
-
-    /**
-     * Helper method to store the topic2Host mapping in the channel manager cache
-     * map. This method is assumed to be called when we've done a successful
-     * connection to the correct server topic master.
-     *
-     * @param topic
-     *            Topic Name
-     * @param host
-     *            Host Address
-     */
-    protected void storeTopic2HostMapping(ByteString topic, InetSocketAddress host) {
-        InetSocketAddress oldHost = topic2Host.putIfAbsent(topic, host);
-        if (null != oldHost && oldHost.equals(host)) {
-            // Entry in map exists for the topic but it is the same as the
-            // current host. In this case there is nothing to do.
-            return;
-        }
-
-        if (null != oldHost) {
-            if (topic2Host.replace(topic, oldHost, host)) {
-                // Store the relevant mappings for this topic and host combination.
-                logger.debug("Storing info for topic: {}, old host: {}, new host: {}.",
-                             va(topic.toStringUtf8(), oldHost, host));
-                clearHostForTopic(topic, oldHost);
-            } else {
-                logger.warn("Ownership of topic: {} has been changed from {} to {} when storeing host: {}",
-                            va(topic.toStringUtf8(), oldHost, topic2Host.get(topic), host));
-                return;
-            }
-        } else {
-            logger.debug("Storing info for topic: {}, host: {}.",
-                         va(topic.toStringUtf8(), host));
-        }
-        Set<ByteString> topicsForHost = host2Topics.get(host);
-        if (null == topicsForHost) {
-            Set<ByteString> newTopicsSet = new HashSet<ByteString>();
-            topicsForHost = host2Topics.putIfAbsent(host, newTopicsSet);
-            if (null == topicsForHost) {
-              topicsForHost = newTopicsSet;
-            }
-        }
-        synchronized (topicsForHost) {
-            // check whether the ownership changed, since it might happened
-            // after replace succeed
-            if (host.equals(topic2Host.get(topic))) {
-                topicsForHost.add(topic);
-            }
-        }
-    }
-
-    // If a server host goes down or the channel to it gets disconnected,
-    // we want to clear out all relevant cached information. We'll
-    // need to remove all of the topic mappings that the host was
-    // responsible for.
-    protected void clearAllTopicsForHost(InetSocketAddress host) {
-        logger.debug("Clearing all topics for host: {}", host);
-        // For each of the topics that the host was responsible for,
-        // remove it from the topic2Host mapping.
-        Set<ByteString> topicsForHost = host2Topics.get(host);
-        if (null != topicsForHost) {
-            synchronized (topicsForHost) {
-                for (ByteString topic : topicsForHost) {
-                    logger.debug("Removing mapping for topic: {} from host: {}.",
-                                 va(topic.toStringUtf8(), host));
-                    topic2Host.remove(topic, host);
-                }
-            }
-            // Now it is safe to remove the host2Topics mapping entry.
-            host2Topics.remove(host, topicsForHost);
-        }
-    }
-
-    // If a subscribe channel goes down, the topic might have moved.
-    // We only clear out that topic for the host and not all cached information.
-    public void clearHostForTopic(ByteString topic, InetSocketAddress host) {
-        logger.debug("Clearing topic: {} from host: {}.",
-                     va(topic.toStringUtf8(), host));
-        if (topic2Host.remove(topic, host)) {
-            logger.debug("Removed topic to host mapping for topic: {} and host: {}.",
-                         va(topic.toStringUtf8(), host));
-        }
-        Set<ByteString> topicsForHost = host2Topics.get(host);
-        if (null != topicsForHost) {
-            boolean removed;
-            synchronized (topicsForHost) {
-                removed = topicsForHost.remove(topic);
-            }
-            if (removed) {
-                logger.debug("Removed topic: {} from host: {}.",
-                             topic.toStringUtf8(), host);
-                if (topicsForHost.isEmpty()) {
-                    // remove only topic list is empty
-                    host2Topics.remove(host, EMPTY_TOPIC_SET);
-                }
-            }
-        }
-    }
-
-    @Override
-    public long nextTxnId() {
-        return globalCounter.incrementAndGet();
-    }
-
-    // We need to deal with the possible problem of a PubSub request being
-    // written to successfully to the server host but for some reason, the
-    // ack message back never comes. What could happen is that the VoidCallback
-    // stored in the ResponseHandler.txn2PublishData map will never be called.
-    // We should have a configured timeout so if that passes from the time a
-    // write was successfully done to the server, we can fail this async PubSub
-    // transaction. The caller could possibly redo the transaction if needed at
-    // a later time. Creating a timeout cleaner TimerTask to do this here.
-    class PubSubRequestTimeoutTask extends TimerTask {
-        /**
-         * Implement the TimerTask's abstract run method.
-         */
-        @Override
-        public void run() {
-            if (isClosed()) {
-                return;
-            }
-            logger.debug("Running the PubSubRequest Timeout Task");
-            // First check those non-subscription channels
-            for (HChannel channel : host2NonSubscriptionChannels.getChannels()) {
-                try {
-                    HChannelHandler channelHandler =
-                        HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel());
-                    channelHandler.checkTimeoutRequests();
-                } catch (NoResponseHandlerException nrhe) {
-                    continue;
-                }
-            }
-            // Then check those subscription channels
-            checkTimeoutRequestsOnSubscriptionChannels();
-        }
-    }
-
-    protected abstract void restartDelivery(TopicSubscriber topicSubscriber)
-        throws ClientNotSubscribedException, AlreadyStartDeliveryException;
-
-    /**
-     * Chekout the pub/sub requests on subscription channels.
-     */
-    protected abstract void checkTimeoutRequestsOnSubscriptionChannels();
-
-    @Override
-    public boolean isClosed() {
-        closedLock.readLock().lock();
-        try {
-            return closed; 
-        } finally {
-            closedLock.readLock().unlock();
-        }
-    }
-
-    /**
-     * Close all subscription channels when close channel manager.
-     */
-    protected abstract void closeSubscriptionChannels();
-
-    @Override
-    public void close() {
-        logger.info("Shutting down the channels manager.");
-        closedLock.writeLock().lock();
-        try {
-            // Not first time to close
-            if (closed) {
-                return;
-            }
-            closed = true;
-        } finally {
-            closedLock.writeLock().unlock();
-        }
-        clientTimer.cancel();
-        // Clear all existed channels
-        host2NonSubscriptionChannels.close();
-
-        // clear all subscription channels
-        closeSubscriptionChannels();
-
-        // Clear out all Maps
-        topic2Host.clear();
-        host2Topics.clear();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java
deleted file mode 100644
index 7fcfc44..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.google.protobuf.ByteString;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
-import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protoextensions.MessageIdUtils;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.util.Either;
-import static org.apache.hedwig.util.VarArgs.va;
-
-public abstract class AbstractSubscribeResponseHandler extends SubscribeResponseHandler {
-
-    private static final Logger logger =
-        LoggerFactory.getLogger(AbstractSubscribeResponseHandler.class);
-
-    protected final ReentrantReadWriteLock disconnectLock =
-        new ReentrantReadWriteLock();
-
-    protected final ConcurrentMap<TopicSubscriber, ActiveSubscriber> subscriptions
-        = new ConcurrentHashMap<TopicSubscriber, ActiveSubscriber>();
-    protected final AbstractHChannelManager aChannelManager;
-
-    protected AbstractSubscribeResponseHandler(ClientConfiguration cfg,
-                                               HChannelManager channelManager) {
-        super(cfg, channelManager);
-        this.aChannelManager = (AbstractHChannelManager) channelManager;
-    }
-
-    protected HChannelManager getHChannelManager() {
-        return this.channelManager;
-    }
-
-    protected ClientConfiguration getConfiguration() {
-        return cfg;
-    }
-
-    protected ActiveSubscriber getActiveSubscriber(TopicSubscriber ts) {
-        return subscriptions.get(ts);
-    }
-
-    protected ActiveSubscriber createActiveSubscriber(
-        ClientConfiguration cfg, AbstractHChannelManager channelManager,
-        TopicSubscriber ts, PubSubData op, SubscriptionPreferences preferences,
-        Channel channel, HChannel hChannel) {
-        return new ActiveSubscriber(cfg, channelManager, ts, op, preferences, channel, hChannel);
-    }
-
-    @Override
-    public void handleResponse(PubSubResponse response, PubSubData pubSubData,
-                               Channel channel) throws Exception {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Handling a Subscribe response: {}, pubSubData: {}, host: {}.",
-                         va(response, pubSubData, NetUtils.getHostFromChannel(channel)));
-        }
-        switch (response.getStatusCode()) {
-        case SUCCESS:
-            TopicSubscriber ts = new TopicSubscriber(pubSubData.topic,
-                                                     pubSubData.subscriberId);
-            SubscriptionPreferences preferences = null;
-            if (response.hasResponseBody()) {
-                ResponseBody respBody = response.getResponseBody();
-                if (respBody.hasSubscribeResponse()) {
-                    SubscribeResponse resp = respBody.getSubscribeResponse();
-                    if (resp.hasPreferences()) {
-                        preferences = resp.getPreferences();
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Receive subscription preferences for {} : {}",
-                                         va(ts,
-                                            SubscriptionStateUtils.toString(preferences)));
-                        }
-                    }
-                }
-            }
-
-            Either<StatusCode, HChannel> result;
-            StatusCode statusCode;
-            ActiveSubscriber ss = null;
-            // Store the Subscribe state
-            disconnectLock.readLock().lock();
-            try {
-                result = handleSuccessResponse(ts, pubSubData, channel);
-                statusCode = result.left();
-                if (StatusCode.SUCCESS == statusCode) {
-                    ss = createActiveSubscriber(
-                        cfg, aChannelManager, ts, pubSubData, preferences, channel, result.right());
-                    statusCode = addSubscription(ts, ss);
-                }
-            } finally {
-                disconnectLock.readLock().unlock();
-            }
-            if (StatusCode.SUCCESS == statusCode) {
-                postHandleSuccessResponse(ts, ss);
-                // Response was success so invoke the callback's operationFinished
-                // method.
-                pubSubData.getCallback().operationFinished(pubSubData.context, null);
-            } else {
-                PubSubException exception = PubSubException.create(statusCode,
-                    "Client is already subscribed for " + ts);
-                pubSubData.getCallback().operationFailed(pubSubData.context, exception);
-            }
-            break;
-        case CLIENT_ALREADY_SUBSCRIBED:
-            // For Subscribe requests, the server says that the client is
-            // already subscribed to it.
-            pubSubData.getCallback().operationFailed(pubSubData.context,
-                    new ClientAlreadySubscribedException("Client is already subscribed for topic: "
-                                                         + pubSubData.topic.toStringUtf8() + ", subscriberId: "
-                                                         + pubSubData.subscriberId.toStringUtf8()));
-            break;
-        case SERVICE_DOWN:
-            // Response was service down failure so just invoke the callback's
-            // operationFailed method.
-            pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
-                                                     "Server responded with a SERVICE_DOWN status"));
-            break;
-        case NOT_RESPONSIBLE_FOR_TOPIC:
-            // Redirect response so we'll need to repost the original Subscribe
-            // Request
-            handleRedirectResponse(response, pubSubData, channel);
-            break;
-        default:
-            // Consider all other status codes as errors, operation failed
-            // cases.
-            logger.error("Unexpected error response from server for PubSubResponse: " + response);
-            pubSubData.getCallback().operationFailed(pubSubData.context,
-                    new ServiceDownException("Server responded with a status code of: "
-                            + response.getStatusCode(),
-                            PubSubException.create(response.getStatusCode(),
-                                                   "Original Exception")));
-            break;
-        }
-    }
-
-    /**
-     * Handle success response for a specific TopicSubscriber <code>ts</code>. The method
-     * is triggered after subscribed successfully.
-     *
-     * @param ts
-     *          Topic Subscriber.
-     * @param pubSubData
-     *          Pub/Sub Request data for this subscribe request.
-     * @param channel
-     *          Subscription Channel.
-     * @return status code to indicate what happened
-     */
-    protected abstract Either<StatusCode, HChannel> handleSuccessResponse(
-        TopicSubscriber ts, PubSubData pubSubData, Channel channel);
-
-    protected void postHandleSuccessResponse(TopicSubscriber ts, ActiveSubscriber ss) {
-        // do nothing now
-    }
-
-    private StatusCode addSubscription(TopicSubscriber ts, ActiveSubscriber ss) {
-        ActiveSubscriber oldSS = subscriptions.putIfAbsent(ts, ss);
-        if (null != oldSS) {
-            return StatusCode.CLIENT_ALREADY_SUBSCRIBED;
-        } else {
-            return StatusCode.SUCCESS;
-        }
-    }
-
-    @Override
-    public void handleSubscribeMessage(PubSubResponse response) {
-        Message message = response.getMessage();
-        TopicSubscriber ts = new TopicSubscriber(response.getTopic(),
-                                                 response.getSubscriberId());
-        if (logger.isDebugEnabled()) {
-            logger.debug("Handling a Subscribe message in response: {}, {}",
-                         va(response, ts));
-        }
-        ActiveSubscriber ss = getActiveSubscriber(ts);
-        if (null == ss) {
-            logger.error("Subscriber {} is not found receiving its message {}.",
-                         va(ts, MessageIdUtils.msgIdToReadableString(message.getMsgId())));
-            return;
-        }
-        ss.handleMessage(message);
-    }
-
-    @Override
-    protected void asyncMessageDeliver(TopicSubscriber topicSubscriber,
-                                       Message message) {
-        ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
-        if (null == ss) {
-            logger.error("Subscriber {} is not found delivering its message {}.",
-                         va(topicSubscriber,
-                            MessageIdUtils.msgIdToReadableString(message.getMsgId())));
-            return;
-        }
-        ss.asyncMessageDeliver(message);
-    }
-
-    @Override
-    protected void messageConsumed(TopicSubscriber topicSubscriber,
-                                   Message message) {
-        ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
-        if (null == ss) {
-            logger.warn("Subscriber {} is not found consumed its message {}.",
-                        va(topicSubscriber,
-                           MessageIdUtils.msgIdToReadableString(message.getMsgId())));
-            return;
-        }
-        if (logger.isDebugEnabled()) {
-            logger.debug("Message has been successfully consumed by the client app : {}, {}",
-                         va(message, topicSubscriber));
-        }
-        ss.messageConsumed(message);
-    }
-
-    @Override
-    public void handleSubscriptionEvent(ByteString topic, ByteString subscriberId,
-                                        SubscriptionEvent event) {
-        TopicSubscriber ts = new TopicSubscriber(topic, subscriberId);
-        ActiveSubscriber ss = getActiveSubscriber(ts);
-        if (null == ss) {
-            logger.warn("No subscription {} found receiving subscription event {}.",
-                        va(ts, event));
-            return;
-        }
-        if (logger.isDebugEnabled()) {
-            logger.debug("Received subscription event {} for ({}).",
-                         va(event, ts));
-        }
-        processSubscriptionEvent(ss, event);
-    }
-
-    protected void processSubscriptionEvent(ActiveSubscriber as, SubscriptionEvent event) {
-        switch (event) {
-        // for all cases we need to resubscribe for the subscription
-        case TOPIC_MOVED:
-        case SUBSCRIPTION_FORCED_CLOSED:
-            resubscribeIfNecessary(as, event);
-            break;
-        default:
-            logger.error("Receive unknown subscription event {} for {}.",
-                         va(event, as.getTopicSubscriber()));
-        }
-    }
-
-    @Override
-    public void startDelivery(final TopicSubscriber topicSubscriber,
-                              MessageHandler messageHandler)
-    throws ClientNotSubscribedException, AlreadyStartDeliveryException {
-        ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
-        if (null == ss) {
-            throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
-        }
-        if (logger.isDebugEnabled()) {
-            logger.debug("Start delivering message for {} using message handler {}",
-                         va(topicSubscriber, messageHandler));
-        }
-        ss.startDelivery(messageHandler);
-    }
-
-    @Override
-    public void stopDelivery(final TopicSubscriber topicSubscriber)
-    throws ClientNotSubscribedException {
-        ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
-        if (null == ss) {
-            throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
-        }
-        if (logger.isDebugEnabled()) {
-            logger.debug("Stop delivering messages for {}", topicSubscriber);
-        }
-        ss.stopDelivery();
-    }
-
-    @Override
-    public boolean hasSubscription(TopicSubscriber topicSubscriber) {
-        return subscriptions.containsKey(topicSubscriber);
-    }
-
-    @Override
-    public void consume(final TopicSubscriber topicSubscriber,
-                        final MessageSeqId messageSeqId) {
-        ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
-        if (null == ss) {
-            logger.warn("Subscriber {} is not found consuming message {}.",
-                        va(topicSubscriber,
-                           MessageIdUtils.msgIdToReadableString(messageSeqId)));
-            return;
-        }
-        ss.consume(messageSeqId);
-    }
-
-    @Override
-    public void onChannelDisconnected(InetSocketAddress host, Channel channel) {
-        disconnectLock.writeLock().lock();
-        try {
-            onDisconnect(host);
-        } finally {
-            disconnectLock.writeLock().unlock();
-        }
-    }
-
-    private void onDisconnect(InetSocketAddress host) {
-        for (ActiveSubscriber ss : subscriptions.values()) {
-            onDisconnect(ss, host);
-        }
-    }
-
-    private void onDisconnect(ActiveSubscriber ss, InetSocketAddress host) {
-        logger.info("Subscription channel for ({}) is disconnected.", ss);
-        resubscribeIfNecessary(ss, SubscriptionEvent.TOPIC_MOVED);
-    }
-
-    protected boolean removeSubscription(TopicSubscriber ts, ActiveSubscriber ss) {
-        return subscriptions.remove(ts, ss);
-    }
-
-    protected void resubscribeIfNecessary(ActiveSubscriber ss, SubscriptionEvent event) {
-        // if subscriber has been changed, we don't need to resubscribe
-        if (!removeSubscription(ss.getTopicSubscriber(), ss)) {
-            return;
-        }
-        ss.resubscribeIfNecessary(event);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java
deleted file mode 100644
index 10506d8..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import static org.apache.hedwig.util.VarArgs.va;
-
-import java.util.LinkedList;
-import java.util.Queue;
-
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.MessageConsumeData;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.netty.FilterableMessageHandler;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.filter.ClientMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protoextensions.MessageIdUtils;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * an active subscriber handles subscription actions in a channel.
- */
-public class ActiveSubscriber {
-
-    private static final Logger logger = LoggerFactory.getLogger(ActiveSubscriber.class);
-
-    protected final ClientConfiguration cfg;
-    protected final AbstractHChannelManager channelManager;
-
-    // Subscriber related variables
-    protected final TopicSubscriber topicSubscriber;
-    protected final PubSubData op;
-    protected final SubscriptionPreferences preferences;
-
-    // the underlying netty channel to send request
-    protected final Channel channel;
-    protected final HChannel hChannel;
-
-    // Counter for the number of consumed messages so far to buffer up before we
-    // send the Consume message back to the server along with the last/largest
-    // message seq ID seen so far in that batch.
-    private int numConsumedMessagesInBuffer = 0;
-    private MessageSeqId lastMessageSeqId = null;
-
-    // Message Handler
-    private MessageHandler msgHandler = null;
-
-    // Queue used for subscribes when the MessageHandler hasn't been registered
-    // yet but we've already received subscription messages from the server.
-    // This will be lazily created as needed.
-    private final Queue<Message> msgQueue = new LinkedList<Message>();
-
-    /**
-     * Construct an active subscriber instance.
-     *
-     * @param cfg
-     *          Client configuration object.
-     * @param channelManager
-     *          Channel manager instance.
-     * @param ts
-     *          Topic subscriber.
-     * @param op
-     *          Pub/Sub request.
-     * @param preferences
-     *          Subscription preferences for the subscriber.
-     * @param channel
-     *          Netty channel the subscriber lived.
-     */
-    public ActiveSubscriber(ClientConfiguration cfg,
-                            AbstractHChannelManager channelManager,
-                            TopicSubscriber ts, PubSubData op,
-                            SubscriptionPreferences preferences,
-                            Channel channel,
-                            HChannel hChannel) {
-        this.cfg = cfg;
-        this.channelManager = channelManager;
-        this.topicSubscriber = ts;
-        this.op = op;
-        this.preferences = preferences;
-        this.channel = channel;
-        this.hChannel = hChannel;
-    }
-
-    /**
-     * @return pub/sub request for the subscription.
-     */
-    public PubSubData getPubSubData() {
-        return this.op;
-    }
-
-    /**
-     * @return topic subscriber id for the active subscriber.
-     */
-    public TopicSubscriber getTopicSubscriber() {
-        return this.topicSubscriber;
-    }
-
-    /**
-     * Start delivering messages using given message handler.
-     *
-     * @param messageHandler
-     *          Message handler to deliver messages
-     * @throws AlreadyStartDeliveryException if someone already started delivery.
-     * @throws ClientNotSubscribedException when start delivery before subscribe.
-     */
-    public synchronized void startDelivery(MessageHandler messageHandler)
-    throws AlreadyStartDeliveryException, ClientNotSubscribedException {
-        if (null != this.msgHandler) {
-            throw new AlreadyStartDeliveryException("A message handler " + msgHandler
-                + " has been started for " + topicSubscriber);
-        }
-        if (null != messageHandler && messageHandler instanceof FilterableMessageHandler) {
-            FilterableMessageHandler filterMsgHandler =
-                (FilterableMessageHandler) messageHandler;
-            if (filterMsgHandler.hasMessageFilter()) {
-                if (null == preferences) {
-                    // no preferences means talking to an old version hub server
-                    logger.warn("Start delivering messages with filter but no subscription "
-                              + "preferences found. It might due to talking to an old version"
-                              + " hub server.");
-                    // use the original message handler.
-                    messageHandler = filterMsgHandler.getMessageHandler();
-                } else {
-                    // pass subscription preferences to message filter
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Start delivering messages with filter on {}, preferences: {}",
-                                     va(topicSubscriber,
-                                        SubscriptionStateUtils.toString(preferences)));
-                    }
-                    ClientMessageFilter msgFilter = filterMsgHandler.getMessageFilter();
-                    msgFilter.setSubscriptionPreferences(topicSubscriber.getTopic(),
-                                                         topicSubscriber.getSubscriberId(),
-                                                         preferences);
-                }
-            }
-        }
-
-        this.msgHandler = messageHandler;
-        // Once the MessageHandler is registered, see if we have any queued up
-        // subscription messages sent to us already from the server. If so,
-        // consume those first. Do this only if the MessageHandler registered is
-        // not null (since that would be the HedwigSubscriber.stopDelivery
-        // call).
-        if (null == msgHandler) {
-            return;
-        }
-        if (msgQueue.size() > 0) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Consuming {} queued up messages for {}",
-                             va(msgQueue.size(), topicSubscriber));
-            }
-            for (Message message : msgQueue) {
-                asyncMessageDeliver(message);
-            }
-            // Now we can remove the queued up messages since they are all
-            // consumed.
-            msgQueue.clear();
-        }
-    }
-
-    /**
-     * Stop delivering messages to the subscriber.
-     */
-    public synchronized void stopDelivery() {
-        this.msgHandler = null;
-    }
-
-    /**
-     * Handle received message.
-     *
-     * @param message
-     *          Received message.
-     */
-    public synchronized void handleMessage(Message message) {
-        if (null != msgHandler) {
-            asyncMessageDeliver(message);
-        } else {
-            // MessageHandler has not yet been registered so queue up these
-            // messages for the Topic Subscription. Make the initial lazy
-            // creation of the message queue thread safe just so we don't
-            // run into a race condition where two simultaneous threads process
-            // a received message and both try to create a new instance of
-            // the message queue. Performance overhead should be okay
-            // because the delivery of the topic has not even started yet
-            // so these messages are not consumed and just buffered up here.
-            if (logger.isDebugEnabled()) {
-                logger.debug("Message {} has arrived but no MessageHandler provided for {}"
-                             + " yet so queueing up the message.",
-                             va(MessageIdUtils.msgIdToReadableString(message.getMsgId()),
-                                topicSubscriber));
-            }
-            msgQueue.add(message);
-        }
-    }
-
-    /**
-     * Deliver message to the client.
-     *
-     * @param message
-     *          Message to deliver.
-     */
-    public synchronized void asyncMessageDeliver(Message message) {
-        if (null == msgHandler) {
-            logger.error("No message handler found to deliver message {} to {}.",
-                         va(MessageIdUtils.msgIdToReadableString(message.getMsgId()),
-                            topicSubscriber));
-            return;
-        }
-        if (logger.isDebugEnabled()) {
-            logger.debug("Call the client app's MessageHandler asynchronously to deliver the message {} to {}",
-                         va(message, topicSubscriber));
-        }
-        unsafeDeliverMessage(message);
-    }
-
-    /**
-     * Unsafe version to deliver message to a message handler.
-     * Caller need to handle synchronization issue.
-     *
-     * @param message
-     *          Message to deliver.
-     */
-    protected void unsafeDeliverMessage(Message message) {
-        MessageConsumeData messageConsumeData =
-            new MessageConsumeData(topicSubscriber, message);
-        msgHandler.deliver(topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(),
-                           message, channelManager.getConsumeCallback(),
-                           messageConsumeData);
-    }
-
-    private synchronized boolean updateLastMessageSeqId(MessageSeqId seqId) {
-        if (null != lastMessageSeqId &&
-            seqId.getLocalComponent() <= lastMessageSeqId.getLocalComponent()) {
-            return false;
-        }
-        ++numConsumedMessagesInBuffer;
-        lastMessageSeqId = seqId;
-        if (numConsumedMessagesInBuffer >= cfg.getConsumedMessagesBufferSize()) {
-            numConsumedMessagesInBuffer = 0;
-            lastMessageSeqId = null;
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * Consume a specific message.
-     *
-     * @param messageSeqId
-     *          Message seq id.
-     */
-    public void consume(final MessageSeqId messageSeqId) {
-        PubSubRequest.Builder pubsubRequestBuilder =
-            NetUtils.buildConsumeRequest(channelManager.nextTxnId(),
-                                         topicSubscriber, messageSeqId);
-
-        // For Consume requests, we will send them from the client in a fire and
-        // forget manner. We are not expecting the server to send back an ack
-        // response so no need to register this in the ResponseHandler. There
-        // are no callbacks to invoke since this isn't a client initiated
-        // action. Instead, just have a future listener that will log an error
-        // message if there was a problem writing the consume request.
-        if (logger.isDebugEnabled()) {
-            logger.debug("Writing a Consume request to channel: {} with messageSeqId: {} for {}",
-                         va(channel, messageSeqId, topicSubscriber));
-        }
-        ChannelFuture future = channel.write(pubsubRequestBuilder.build());
-        future.addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-                if (!future.isSuccess()) {
-                    logger.error("Error writing a Consume request to channel: {} with messageSeqId: {} for {}",
-                                 va(channel, messageSeqId, topicSubscriber));
-                }
-            }
-        });
-    }
-
-    /**
-     * Application acked to consume message.
-     *
-     * @param message
-     *          Message consumed by application.
-     */
-    public void messageConsumed(Message message) {
-        // For consume response to server, there is a config param on how many
-        // messages to consume and buffer up before sending the consume request.
-        // We just need to keep a count of the number of messages consumed
-        // and the largest/latest msg ID seen so far in this batch. Messages
-        // should be delivered in order and without gaps. Do this only if
-        // auto-sending of consume messages is enabled.
-        if (cfg.isAutoSendConsumeMessageEnabled()) {
-            // Update these variables only if we are auto-sending consume
-            // messages to the server. Otherwise the onus is on the client app
-            // to call the Subscriber consume API to let the server know which
-            // messages it has successfully consumed.
-            if (updateLastMessageSeqId(message.getMsgId())) {
-                // Send the consume request and reset the consumed messages buffer
-                // variables. We will use the same Channel created from the
-                // subscribe request for the TopicSubscriber.
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Consume message {} when reaching consumed message buffer limit.",
-                                 message.getMsgId());
-                }
-                consume(message.getMsgId());
-            }
-        }
-    }
-
-    /**
-     * Resubscribe a subscriber if necessary.
-     *
-     * @param event
-     *          Subscription Event.
-     */
-    public void resubscribeIfNecessary(SubscriptionEvent event) {
-        // clear topic ownership
-        if (SubscriptionEvent.TOPIC_MOVED == event) {
-            channelManager.clearHostForTopic(topicSubscriber.getTopic(),
-                                             NetUtils.getHostFromChannel(channel));
-        }
-        if (!op.options.getEnableResubscribe()) {
-            channelManager.getSubscriptionEventEmitter().emitSubscriptionEvent(
-                topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(), event);
-            return;
-        }
-        // Since the connection to the server host that was responsible
-        // for the topic died, we are not sure about the state of that
-        // server. Resend the original subscribe request data to the default
-        // server host/VIP. Also clear out all of the servers we've
-        // contacted or attempted to from this request as we are starting a
-        // "fresh" subscribe request.
-        op.clearServersList();
-        // Set a new type of VoidCallback for this async call. We need this
-        // hook so after the resubscribe has completed, delivery for
-        // that topic subscriber should also be restarted (if it was that
-        // case before the channel disconnect).
-        final long retryWaitTime = cfg.getSubscribeReconnectRetryWaitTime();
-        ResubscribeCallback resubscribeCb =
-            new ResubscribeCallback(topicSubscriber, op,
-                                    channelManager, retryWaitTime);
-        op.setCallback(resubscribeCb);
-        op.shouldClaim = false;
-        op.context = null;
-        op.setOriginalChannelForResubscribe(hChannel);
-        if (logger.isDebugEnabled()) {
-            logger.debug("Resubscribe {} with origSubData {}",
-                         va(topicSubscriber, op));
-        }
-        // resubmit the request
-        channelManager.submitOp(op);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java
deleted file mode 100644
index ab86f23..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import java.util.Map;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
-import org.jboss.netty.handler.ssl.SslHandler;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.handlers.AbstractResponseHandler;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-
-public abstract class ClientChannelPipelineFactory implements ChannelPipelineFactory {
-
-    protected ClientConfiguration cfg;
-    protected AbstractHChannelManager channelManager;
-
-    public ClientChannelPipelineFactory(ClientConfiguration cfg,
-                                        AbstractHChannelManager channelManager) {
-        this.cfg = cfg;
-        this.channelManager = channelManager;
-    }
-
-    protected abstract Map<OperationType, AbstractResponseHandler> createResponseHandlers();
-
-    private HChannelHandler createHChannelHandler() {
-        return new HChannelHandler(cfg, channelManager, createResponseHandlers());
-    }
-
-    // Retrieve a ChannelPipeline from the factory.
-    public ChannelPipeline getPipeline() throws Exception {
-        // Create a new ChannelPipline using the factory method from the
-        // Channels helper class.
-        ChannelPipeline pipeline = Channels.pipeline();
-        if (channelManager.getSslFactory() != null) {
-            pipeline.addLast("ssl", new SslHandler(channelManager.getSslFactory().getEngine()));
-        }
-        pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(
-                         cfg.getMaximumMessageSize(), 0, 4, 0, 4));
-        pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
-
-        pipeline.addLast("protobufdecoder", new ProtobufDecoder(PubSubProtocol.PubSubResponse.getDefaultInstance()));
-        pipeline.addLast("protobufencoder", new ProtobufEncoder());
-
-        pipeline.addLast("responsehandler", createHChannelHandler());
-        return pipeline;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java
deleted file mode 100644
index 065b2f7..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import java.net.InetSocketAddress;
-
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import static org.apache.hedwig.util.VarArgs.va;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handle requests sent to default hub server. <b>DefaultServerChannel</b> would not
- * be used as a channel to send requests directly. It just takes the responsibility to
- * connect to the default server. After the underlying netty channel is established,
- * it would call {@link HChannelManager#submitOpThruChannel()} to send requests thru
- * the underlying netty channel.
- */
-class DefaultServerChannel extends HChannelImpl {
-
-    private static final Logger logger = LoggerFactory.getLogger(DefaultServerChannel.class);
-
-    DefaultServerChannel(InetSocketAddress host, AbstractHChannelManager channelManager) {
-        super(host, channelManager);
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("[DefaultServer: ").append(host).append("]");
-        return sb.toString();
-    }
-
-    @Override
-    public void submitOp(final PubSubData pubSubData) {
-        // for each pub/sub request sent to default hub server
-        // we would establish a fresh connection for it
-        ClientChannelPipelineFactory pipelineFactory;
-        if (OperationType.PUBLISH.equals(pubSubData.operationType) ||
-            OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) {
-            pipelineFactory = channelManager.getNonSubscriptionChannelPipelineFactory();
-        } else {
-            pipelineFactory = channelManager.getSubscriptionChannelPipelineFactory();
-        }
-        ChannelFuture future = connect(host, pipelineFactory);
-        future.addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-                // If the channel has been closed, there is no need to proceed with any callback
-                // logic here.
-                if (closed) {
-                    future.getChannel().close();
-                    return;
-                }
-
-                // Check if the connection to the server was done successfully.
-                if (!future.isSuccess()) {
-                    logger.error("Error connecting to host {}.", host);
-                    future.getChannel().close();
-
-                    retryOrFailOp(pubSubData);
-                    // Finished with failure logic so just return.
-                    return;
-                }
-                logger.debug("Connected to host {} for pubSubData: {}",
-                             va(host, pubSubData));
-                channelManager.submitOpThruChannel(pubSubData, future.getChannel());
-            }
-        });
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java
deleted file mode 100644
index c41a329..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.handler.ssl.SslHandler;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.client.handlers.AbstractResponseHandler;
-import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
-import org.apache.hedwig.exceptions.PubSubException.UncertainStateException;
-import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEventResponse;
-import static org.apache.hedwig.util.VarArgs.va;
-
-public class HChannelHandler extends SimpleChannelHandler {
-
-    private static final Logger logger = LoggerFactory.getLogger(HChannelHandler.class);
-
-    // Concurrent Map to store for each async PubSub request, the txn ID
-    // and the corresponding PubSub call's data which stores the VoidCallback to
-    // invoke when we receive a PubSub ack response from the server.
-    // This is specific to this instance of the HChannelHandler which is
-    // tied to a specific netty Channel Pipeline.
-    private final ConcurrentMap<Long, PubSubData> txn2PubSubData =
-        new ConcurrentHashMap<Long, PubSubData>();
-
-    // Boolean indicating if we closed the channel this HChannelHandler is
-    // attached to explicitly or not. If so, we do not need to do the
-    // channel disconnected logic here.
-    private volatile boolean channelClosedExplicitly = false;
-
-    private final AbstractHChannelManager channelManager;
-    private final ClientConfiguration cfg;
-
-    private final Map<OperationType, AbstractResponseHandler> handlers;
-    private final SubscribeResponseHandler subHandler;
-
-    public HChannelHandler(ClientConfiguration cfg,
-                           AbstractHChannelManager channelManager,
-                           Map<OperationType, AbstractResponseHandler> handlers) {
-        this.cfg = cfg;
-        this.channelManager = channelManager;
-        this.handlers = handlers;
-        subHandler = (SubscribeResponseHandler) handlers.get(OperationType.SUBSCRIBE);
-    }
-
-    public SubscribeResponseHandler getSubscribeResponseHandler() {
-        return subHandler;
-    }
-
-    public void removeTxn(long txnId) {
-        txn2PubSubData.remove(txnId);
-    }
-
-    public void addTxn(long txnId, PubSubData pubSubData) {
-        txn2PubSubData.put(txnId, pubSubData);
-    }
-
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-        // If the Message is not a PubSubResponse, just send it upstream and let
-        // something else handle it.
-        if (!(e.getMessage() instanceof PubSubResponse)) {
-            ctx.sendUpstream(e);
-            return;
-        }
-        // Retrieve the PubSubResponse from the Message that was sent by the
-        // server.
-        PubSubResponse response = (PubSubResponse) e.getMessage();
-        logger.debug("Response received from host: {}, response: {}.",
-                     va(NetUtils.getHostFromChannel(ctx.getChannel()), response));
-
-        // Determine if this PubSubResponse is an ack response for a PubSub
-        // Request or if it is a message being pushed to the client subscriber.
-        if (response.hasMessage()) {
-            // Subscribed messages being pushed to the client so handle/consume
-            // it and return.
-            if (null == subHandler) {
-                logger.error("Received message from a non-subscription channel : {}",
-                             response);
-            } else {
-                subHandler.handleSubscribeMessage(response);
-            }
-            return;
-        }
-
-        // Process Subscription Events
-        if (response.hasResponseBody()) {
-            ResponseBody resp = response.getResponseBody();
-            // A special subscription event indicates the state of a subscriber
-            if (resp.hasSubscriptionEvent()) {
-                if (null == subHandler) {
-                    logger.error("Received subscription event from a non-subscription channel : {}",
-                                 response); 
-                } else {
-                    SubscriptionEventResponse eventResp = resp.getSubscriptionEvent();
-                    logger.debug("Received subscription event {} for (topic:{}, subscriber:{}).",
-                                 va(eventResp.getEvent(), response.getTopic(),
-                                    response.getSubscriberId()));
-                    subHandler.handleSubscriptionEvent(response.getTopic(),
-                                                       response.getSubscriberId(),
-                                                       eventResp.getEvent());
-                }
-                return;
-            }
-        }
-
-        // Response is an ack to a prior PubSubRequest so first retrieve the
-        // PubSub data for this txn.
-        PubSubData pubSubData = txn2PubSubData.remove(response.getTxnId());
-
-        // Validate that the PubSub data for this txn is stored. If not, just
-        // log an error message and return since we don't know how to handle
-        // this.
-        if (pubSubData == null) {
-            logger.error("PubSub Data was not found for PubSubResponse: {}", response);
-            return;
-        }
-
-        // Store the topic2Host mapping if this wasn't a server redirect. We'll
-        // assume that if the server was able to have an open Channel connection
-        // to the client, and responded with an ack message other than the
-        // NOT_RESPONSIBLE_FOR_TOPIC one, it is the correct topic master.
-        if (!response.getStatusCode().equals(StatusCode.NOT_RESPONSIBLE_FOR_TOPIC)) {
-            // Retrieve the server host that we've connected to and store the
-            // mapping from the topic to this host. For all other non-redirected
-            // server statuses, we consider that as a successful connection to the
-            // correct topic master.
-            InetSocketAddress host = NetUtils.getHostFromChannel(ctx.getChannel());
-            channelManager.storeTopic2HostMapping(pubSubData.topic, host);
-        }
-
-        // Depending on the operation type, call the appropriate handler.
-        logger.debug("Handling a {} response: {}, pubSubData: {}, host: {}.",
-                     va(pubSubData.operationType, response, pubSubData, ctx.getChannel()));
-        AbstractResponseHandler respHandler = handlers.get(pubSubData.operationType);
-        if (null == respHandler) {
-            // The above are the only expected PubSubResponse messages received
-            // from the server for the various client side requests made.
-            logger.error("Response received from server is for an unhandled operation {}, txnId: {}.",
-                         va(pubSubData.operationType, response.getTxnId()));
-            pubSubData.getCallback().operationFailed(pubSubData.context,
-                new UnexpectedConditionException("Can't find response handler for operation "
-                                                 + pubSubData.operationType));
-            return;
-        }
-        respHandler.handleResponse(response, pubSubData, ctx.getChannel());
-    }
-
-    public void checkTimeoutRequests() {
-        long curTime = System.currentTimeMillis();
-        long timeoutInterval = cfg.getServerAckResponseTimeout();
-        for (PubSubData pubSubData : txn2PubSubData.values()) {
-            checkTimeoutRequest(pubSubData, curTime, timeoutInterval);
-        }
-    }
-
-    private void checkTimeoutRequest(PubSubData pubSubData,
-                                     long curTime, long timeoutInterval) {
-        if (curTime > pubSubData.requestWriteTime + timeoutInterval) {
-            // Current PubSubRequest has timed out so remove it from the
-            // ResponseHandler's map and invoke the VoidCallback's
-            // operationFailed method.
-            logger.error("Current PubSubRequest has timed out for pubSubData: " + pubSubData);
-            txn2PubSubData.remove(pubSubData.txnId);
-            pubSubData.getCallback().operationFailed(pubSubData.context,
-                new UncertainStateException("Server ack response never received so PubSubRequest has timed out!"));
-        }
-    }
-
-    // Logic to deal with what happens when a Channel to a server host is
-    // disconnected.
-    @Override
-    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        // If this channel was closed explicitly by the client code,
-        // we do not need to do any of this logic. This could happen
-        // for redundant Publish channels created or redirected subscribe
-        // channels that are not used anymore or when we shutdown the
-        // client and manually close all of the open channels.
-        // Also don't do any of the disconnect logic if the client has stopped.
-        if (channelClosedExplicitly || channelManager.isClosed()) {
-            return;
-        }
-
-        // Make sure the host retrieved is not null as there could be some weird
-        // channel disconnect events happening during a client shutdown.
-        // If it is, just return as there shouldn't be anything we need to do.
-        InetSocketAddress host = NetUtils.getHostFromChannel(ctx.getChannel());
-        if (host == null) {
-            return;
-        }
-
-        logger.info("Channel {} was disconnected to host {}.",
-                    va(ctx.getChannel(), host));
-
-        // If this Channel was used for Publish and Unsubscribe flows, just
-        // remove it from the HewdigPublisher's host2Channel map. We will
-        // re-establish a Channel connection to that server when the next
-        // publish/unsubscribe request to a topic that the server owns occurs.
-
-        // Now determine what type of operation this channel was used for.
-        if (null == subHandler) {
-            channelManager.onNonSubscriptionChannelDisconnected(host, ctx.getChannel());
-        } else {
-            channelManager.onSubscriptionChannelDisconnected(host, ctx.getChannel());
-        }
-
-        // Finally, all of the PubSubRequests that are still waiting for an ack
-        // response from the server need to be removed and timed out. Invoke the
-        // operationFailed callbacks on all of them. Use the
-        // UncertainStateException since the server did receive the request but
-        // we're not sure of the state of the request since the ack response was
-        // never received.
-        for (PubSubData pubSubData : txn2PubSubData.values()) {
-            logger.debug("Channel disconnected so invoking the operationFailed callback for pubSubData: {}",
-                         pubSubData);
-            pubSubData.getCallback().operationFailed(pubSubData.context, new UncertainStateException(
-                                                     "Server ack response never received before server connection disconnected!"));
-        }
-        txn2PubSubData.clear();
-    }
-
-    // Logic to deal with what happens when a Channel to a server host is
-    // connected. This is needed if the client is using an SSL port to
-    // communicate with the server. If so, we need to do the SSL handshake here
-    // when the channel is first connected.
-    @Override
-    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        // No need to initiate the SSL handshake if we are closing this channel
-        // explicitly or the client has been stopped.
-        if (cfg.isSSLEnabled() && !channelClosedExplicitly && !channelManager.isClosed()) {
-            logger.debug("Initiating the SSL handshake");
-            ctx.getPipeline().get(SslHandler.class).handshake();
-        }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-        logger.error("Exception caught on client channel", e.getCause());
-        e.getChannel().close();
-    }
-
-    public void closeExplicitly() {
-        // TODO: BOOKKEEPER-350 : Handle consume buffering, etc here - in a different patch
-        channelClosedExplicitly = true;
-    }
-}