You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/27 01:52:20 UTC

svn commit: r1390777 [3/4] - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/data/ hedwig-client/src/main/java/org/apache/hedwig/client/handlers/ hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig...

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,621 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
+import org.apache.hedwig.client.handlers.MessageConsumeCallback;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+import org.apache.hedwig.client.netty.CleanupChannelMap;
+import org.apache.hedwig.client.netty.HChannel;
+import org.apache.hedwig.client.netty.HChannelManager;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.client.netty.SubscriptionEventEmitter;
+import org.apache.hedwig.client.ssl.SslClientContextFactory;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.filter.ClientMessageFilter;
+import org.apache.hedwig.protocol.PubSubProtocol;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageHeader;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
+
+/**
+ * Basic HChannel Manager Implementation
+ */
+public abstract class AbstractHChannelManager implements HChannelManager {
+
+    private static Logger logger = LoggerFactory.getLogger(AbstractHChannelManager.class);
+
+    // Empty Topic List
+    private final static Set<ByteString> EMPTY_TOPIC_SET =
+        new HashSet<ByteString>();
+
+    // Boolean indicating if the channel manager is running or has been closed.
+    // Once we stop the manager, we should sidestep all of the connect, write callback
+    // and channel disconnected logic.
+    protected boolean closed = false;
+    protected final ReentrantReadWriteLock closedLock =
+        new ReentrantReadWriteLock();
+
+    // Global counter used for generating unique transaction ID's for
+    // publish and subscribe requests
+    protected final AtomicLong globalCounter = new AtomicLong();
+
+    // Concurrent Map to store the mapping from the Topic to the Host.
+    // This could change over time since servers can drop mastership of topics
+    // for load balancing or failover. If a server host ever goes down, we'd
+    // also want to remove all topic mappings the host was responsible for.
+    // The second Map is used as the inverted version of the first one.
+    protected final ConcurrentMap<ByteString, InetSocketAddress> topic2Host =
+        new ConcurrentHashMap<ByteString, InetSocketAddress>();
+    // The inverse mapping is used only when clearing all topics. For performance
+    // consideration, we don't guarantee host2Topics to be consistent with
+    // topic2Host. it would be better to not rely on this mapping for anything
+    // significant.
+    protected final ConcurrentMap<InetSocketAddress, Set<ByteString>> host2Topics =
+        new ConcurrentHashMap<InetSocketAddress, Set<ByteString>>();
+
+    // This channels will be used for publish and unsubscribe requests
+    protected final CleanupChannelMap<InetSocketAddress> host2NonSubscriptionChannels =
+        new CleanupChannelMap<InetSocketAddress>();
+
+    private final ClientConfiguration cfg;
+    // The Netty socket factory for making connections to the server.
+    protected final ChannelFactory socketFactory;
+    // PipelineFactory to create non-subscription netty channels to the appropriate server
+    private final ClientChannelPipelineFactory nonSubscriptionChannelPipelineFactory;
+    // ssl context factory
+    private SslClientContextFactory sslFactory = null;
+
+    // default server channel
+    private final HChannel defaultServerChannel;
+
+    // Each client instantiation will have a Timer for running recurring
+    // threads. One such timer task thread to is to timeout long running
+    // PubSubRequests that are waiting for an ack response from the server.
+    private final Timer clientTimer = new Timer(true);
+    // a common consume callback for all consume requests.
+    private final MessageConsumeCallback consumeCb;
+    // A event emitter to emit subscription events
+    private final SubscriptionEventEmitter eventEmitter;
+
+    protected AbstractHChannelManager(ClientConfiguration cfg,
+                                      ChannelFactory socketFactory) {
+        this.cfg = cfg;
+        this.socketFactory = socketFactory;
+        this.nonSubscriptionChannelPipelineFactory =
+            new NonSubscriptionChannelPipelineFactory(cfg, this);
+
+        // create a default server channel
+        defaultServerChannel =
+            new DefaultServerChannel(cfg.getDefaultServerHost(), this);
+
+        if (cfg.isSSLEnabled()) {
+            sslFactory = new SslClientContextFactory(cfg);
+        }
+
+        consumeCb = new MessageConsumeCallback(cfg, this);
+        eventEmitter = new SubscriptionEventEmitter();
+
+        // Schedule Request Timeout task.
+        clientTimer.schedule(new PubSubRequestTimeoutTask(), 0,
+                             cfg.getTimeoutThreadRunInterval());
+    }
+
+    @Override
+    public SubscriptionEventEmitter getSubscriptionEventEmitter() {
+        return eventEmitter;
+    }
+
+    public MessageConsumeCallback getConsumeCallback() {
+        return consumeCb;
+    }
+
+    public SslClientContextFactory getSslFactory() {
+        return sslFactory;
+    }
+
+    protected ChannelFactory getChannelFactory() {
+        return socketFactory;
+    }
+
+    protected ClientChannelPipelineFactory getNonSubscriptionChannelPipelineFactory() {
+        return this.nonSubscriptionChannelPipelineFactory;
+    }
+
+    protected abstract ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory();
+
+    @Override
+    public void schedule(final TimerTask task, final long delay) {
+        this.closedLock.readLock().lock();
+        try {
+            if (closed) {
+                logger.warn("Task {} is not scheduled due to the channel manager is closed.",
+                            task);
+                return;
+            }
+            clientTimer.schedule(task, delay);
+        } finally {
+            this.closedLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void submitOpAfterDelay(final PubSubData pubSubData, final long delay) {
+        this.closedLock.readLock().lock();
+        try {
+            if (closed) {
+                pubSubData.getCallback().operationFailed(pubSubData.context,
+                    new ServiceDownException("Client has been closed."));
+                return;
+            }
+            clientTimer.schedule(new TimerTask() {
+                @Override
+                public void run() {
+                    logger.debug("Submit request {} in {} ms later.",
+                                 va(pubSubData, delay));
+                    submitOp(pubSubData);
+                }
+            }, delay);
+        } finally {
+            closedLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void submitOp(PubSubData pubSubData) {
+        HChannel hChannel;
+        if (OperationType.PUBLISH.equals(pubSubData.operationType) ||
+            OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) {
+            hChannel = getNonSubscriptionChannelByTopic(pubSubData.topic);
+        } else {
+            TopicSubscriber ts = new TopicSubscriber(pubSubData.topic,
+                                                     pubSubData.subscriberId);
+            hChannel = getSubscriptionChannelByTopicSubscriber(ts);
+        }
+        // no channel found to submit pubsub data
+        // choose the default server
+        if (null == hChannel) {
+            hChannel = defaultServerChannel;
+        }
+        hChannel.submitOp(pubSubData);
+    }
+
+    @Override
+    public void redirectToHost(PubSubData pubSubData, InetSocketAddress host) {
+        logger.debug("Submit operation {} to host {}.",
+                     va(pubSubData, host));
+        HChannel hChannel;
+        if (OperationType.PUBLISH.equals(pubSubData.operationType) ||
+            OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) {
+            hChannel = getNonSubscriptionChannel(host);
+            if (null == hChannel) {
+                // create a channel to connect to specified host
+                hChannel = createAndStoreNonSubscriptionChannel(host);
+            }
+        } else {
+            hChannel = getSubscriptionChannel(host);
+            if (null == hChannel) {
+                // create a subscription channel to specified host
+                hChannel = createAndStoreSubscriptionChannel(host);
+            }
+        }
+        // no channel found to submit pubsub data
+        // choose the default server
+        if (null == hChannel) {
+            hChannel = defaultServerChannel;
+        }
+        hChannel.submitOp(pubSubData);
+    }
+
+    void submitOpThruChannel(PubSubData pubSubData, Channel channel) {
+        logger.debug("Submit operation {} to thru channel {}.",
+                     va(pubSubData, channel));
+        HChannel hChannel;
+        if (OperationType.PUBLISH.equals(pubSubData.operationType) ||
+            OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) {
+            hChannel = createAndStoreNonSubscriptionChannel(channel);
+        } else {
+            hChannel = createAndStoreSubscriptionChannel(channel);
+        }
+        hChannel.submitOp(pubSubData);
+    }
+
+    @Override
+    public void submitOpToDefaultServer(PubSubData pubSubData) {
+        logger.debug("Submit operation {} to default server {}.",
+                     va(pubSubData, defaultServerChannel));
+        defaultServerChannel.submitOp(pubSubData);
+    }
+
+    // Synchronized method to store the host2Channel mapping (if it doesn't
+    // exist yet). Retrieve the hostname info from the Channel created via the
+    // RemoteAddress tied to it.
+    private HChannel createAndStoreNonSubscriptionChannel(Channel channel) {
+        InetSocketAddress host = NetUtils.getHostFromChannel(channel);
+        HChannel newHChannel = new HChannelImpl(host, channel, this,
+                                                getNonSubscriptionChannelPipelineFactory());
+        return storeNonSubscriptionChannel(host, newHChannel);
+    }
+
+    private HChannel createAndStoreNonSubscriptionChannel(InetSocketAddress host) {
+        HChannel newHChannel = new HChannelImpl(host, this,
+                                                getNonSubscriptionChannelPipelineFactory());
+        return storeNonSubscriptionChannel(host, newHChannel);
+    }
+
+    private HChannel storeNonSubscriptionChannel(InetSocketAddress host,
+                                                 HChannel newHChannel) {
+        return host2NonSubscriptionChannels.addChannel(host, newHChannel);
+    }
+
+    /**
+     * Is there a {@link HChannel} existed for a given host.
+     *
+     * @param host
+     *          Target host address.
+     */
+    private HChannel getNonSubscriptionChannel(InetSocketAddress host) {
+        return host2NonSubscriptionChannels.getChannel(host);
+    }
+
+    /**
+     * Get a non-subscription channel for a given <code>topic</code>.
+     *
+     * @param topic
+     *          Topic Name
+     * @return if <code>topic</code>'s owner is unknown, return null.
+     *         if <code>topic</code>'s owner is know and there is a channel
+     *         existed before, return the existed channel, otherwise created
+     *         a new one.
+     */
+    private HChannel getNonSubscriptionChannelByTopic(ByteString topic) {
+        InetSocketAddress host = topic2Host.get(topic);
+        if (null == host) {
+            // we don't know where is the topic
+            return null;
+        } else {
+            // we had know which server owned the topic
+            HChannel channel = getNonSubscriptionChannel(host);
+            if (null == channel) {
+                // create a channel to connect to specified host
+                channel = createAndStoreNonSubscriptionChannel(host);
+            }
+            return channel;
+        }
+    }
+
+    /**
+     * Handle the disconnected event from a non-subscription {@link HChannel}.
+     *
+     * @param host
+     *          Which host is disconnected.
+     * @param channel
+     *          The underlying established channel.
+     */
+    protected void onNonSubscriptionChannelDisconnected(InetSocketAddress host,
+                                                        Channel channel) {
+        // Only remove the Channel from the mapping if this current
+        // disconnected channel is the same as the cached entry.
+        // Due to race concurrency situations, it is possible to
+        // create multiple channels to the same host for publish
+        // and unsubscribe requests.
+        HChannel hChannel = host2NonSubscriptionChannels.getChannel(host);
+        if (null == hChannel) {
+            return;
+        }
+        Channel underlyingChannel = hChannel.getChannel();
+        if (null == underlyingChannel ||
+            !underlyingChannel.equals(channel)) {
+            return;
+        }
+        logger.info("NonSubscription Channel {} to {} disconnected.",
+                    va(channel, host));
+        // remove existed channel
+        if (host2NonSubscriptionChannels.removeChannel(host, hChannel)) {
+            clearAllTopicsForHost(host);
+        }
+    }
+
+    /**
+     * Create and store a subscription {@link HChannel} thru the underlying established
+     * <code>channel</code>
+     *
+     * @param channel
+     *          The underlying established subscription channel.
+     */
+    protected abstract HChannel createAndStoreSubscriptionChannel(Channel channel);
+
+    /**
+     * Create and store a subscription {@link HChannel} to target host.
+     *
+     * @param host
+     *          Target host address.
+     */
+    protected abstract HChannel createAndStoreSubscriptionChannel(InetSocketAddress host);
+
+    /**
+     * Is there a subscription {@link HChannel} existed for a given host.
+     *
+     * @param host
+     *          Target host address.
+     */
+    protected abstract HChannel getSubscriptionChannel(InetSocketAddress host);
+
+    /**
+     * Get a subscription channel for a given <code>topicSubscriber</code>.
+     *
+     * @param topicSubscriber
+     *          Topic Subscriber
+     * @return if <code>topic</code>'s owner is unknown, return null.
+     *         if <code>topic</code>'s owner is know and there is a channel
+     *         existed before, return the existed channel, otherwise created
+     *         a new one for the <code>topicSubscriber</code>.
+     */
+    protected abstract HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber topicSubscriber);
+
+    /**
+     * Handle the disconnected event from a subscription {@link HChannel}.
+     *
+     * @param host
+     *          Which host is disconnected.
+     * @param channel
+     *          The underlying established channel.
+     */
+    protected abstract void onSubscriptionChannelDisconnected(InetSocketAddress host,
+                                                              Channel channel);
+
+    private void sendConsumeRequest(final TopicSubscriber topicSubscriber,
+                                    final MessageSeqId messageSeqId,
+                                    final Channel channel) {
+        PubSubRequest.Builder pubsubRequestBuilder =
+            NetUtils.buildConsumeRequest(nextTxnId(), topicSubscriber, messageSeqId);  
+
+        // For Consume requests, we will send them from the client in a fire and
+        // forget manner. We are not expecting the server to send back an ack
+        // response so no need to register this in the ResponseHandler. There
+        // are no callbacks to invoke since this isn't a client initiated
+        // action. Instead, just have a future listener that will log an error
+        // message if there was a problem writing the consume request.
+        logger.debug("Writing a Consume request to host: {} with messageSeqId: {} for {}",
+                     va(NetUtils.getHostFromChannel(channel), messageSeqId, topicSubscriber));
+        ChannelFuture future = channel.write(pubsubRequestBuilder.build());
+        future.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                if (!future.isSuccess()) {
+                    logger.error("Error writing a Consume request to host: {} with messageSeqId: {} for {}",
+                                 va(NetUtils.getHostFromChannel(channel),
+                                    messageSeqId, topicSubscriber));
+                }
+            }
+        });
+    }
+
+    /**
+     * Helper method to store the topic2Host mapping in the channel manager cache
+     * map. This method is assumed to be called when we've done a successful
+     * connection to the correct server topic master.
+     *
+     * @param topic
+     *            Topic Name
+     * @param host
+     *            Host Address
+     */
+    protected void storeTopic2HostMapping(ByteString topic, InetSocketAddress host) {
+        InetSocketAddress oldHost = topic2Host.putIfAbsent(topic, host);
+        if (null != oldHost && oldHost.equals(host)) {
+            // Entry in map exists for the topic but it is the same as the
+            // current host. In this case there is nothing to do.
+            return;
+        }
+
+        if (null != oldHost) {
+            if (topic2Host.replace(topic, oldHost, host)) {
+                // Store the relevant mappings for this topic and host combination.
+                logger.debug("Storing info for topic: {}, old host: {}, new host: {}.",
+                             va(topic.toStringUtf8(), oldHost, host));
+                clearHostForTopic(topic, oldHost);
+            } else {
+                logger.warn("Ownership of topic: {} has been changed from {} to {} when storeing host: {}",
+                            va(topic.toStringUtf8(), oldHost, topic2Host.get(topic), host));
+                return;
+            }
+        } else {
+            logger.debug("Storing info for topic: {}, host: {}.",
+                         va(topic.toStringUtf8(), host));
+        }
+        Set<ByteString> topicsForHost = host2Topics.get(host);
+        if (null == topicsForHost) {
+            Set<ByteString> newTopicsSet = new HashSet<ByteString>();
+            topicsForHost = host2Topics.putIfAbsent(host, newTopicsSet);
+            if (null == topicsForHost) {
+              topicsForHost = newTopicsSet;
+            }
+        }
+        synchronized (topicsForHost) {
+            // check whether the ownership changed, since it might happened
+            // after replace succeed
+            if (host.equals(topic2Host.get(topic))) {
+                topicsForHost.add(topic);
+            }
+        }
+    }
+
+    // If a server host goes down or the channel to it gets disconnected,
+    // we want to clear out all relevant cached information. We'll
+    // need to remove all of the topic mappings that the host was
+    // responsible for.
+    protected void clearAllTopicsForHost(InetSocketAddress host) {
+        logger.debug("Clearing all topics for host: {}", host);
+        // For each of the topics that the host was responsible for,
+        // remove it from the topic2Host mapping.
+        Set<ByteString> topicsForHost = host2Topics.get(host);
+        if (null != topicsForHost) {
+            synchronized (topicsForHost) {
+                for (ByteString topic : topicsForHost) {
+                    logger.debug("Removing mapping for topic: {} from host: {}.",
+                                 va(topic.toStringUtf8(), host));
+                    topic2Host.remove(topic, host);
+                }
+            }
+            // Now it is safe to remove the host2Topics mapping entry.
+            host2Topics.remove(host, topicsForHost);
+        }
+    }
+
+    // If a subscribe channel goes down, the topic might have moved.
+    // We only clear out that topic for the host and not all cached information.
+    public void clearHostForTopic(ByteString topic, InetSocketAddress host) {
+        logger.debug("Clearing topic: {} from host: {}.",
+                     va(topic.toStringUtf8(), host));
+        if (topic2Host.remove(topic, host)) {
+            logger.debug("Removed topic to host mapping for topic: {} and host: {}.",
+                         va(topic.toStringUtf8(), host));
+        }
+        Set<ByteString> topicsForHost = host2Topics.get(host);
+        if (null != topicsForHost) {
+            boolean removed;
+            synchronized (topicsForHost) {
+                removed = topicsForHost.remove(topic);
+            }
+            if (removed) {
+                logger.debug("Removed topic: {} from host: {}.",
+                             topic.toStringUtf8(), host);
+                if (topicsForHost.isEmpty()) {
+                    // remove only topic list is empty
+                    host2Topics.remove(host, EMPTY_TOPIC_SET);
+                }
+            }
+        }
+    }
+
+    @Override
+    public long nextTxnId() {
+        return globalCounter.incrementAndGet();
+    }
+
+    // We need to deal with the possible problem of a PubSub request being
+    // written to successfully to the server host but for some reason, the
+    // ack message back never comes. What could happen is that the VoidCallback
+    // stored in the ResponseHandler.txn2PublishData map will never be called.
+    // We should have a configured timeout so if that passes from the time a
+    // write was successfully done to the server, we can fail this async PubSub
+    // transaction. The caller could possibly redo the transaction if needed at
+    // a later time. Creating a timeout cleaner TimerTask to do this here.
+    class PubSubRequestTimeoutTask extends TimerTask {
+        /**
+         * Implement the TimerTask's abstract run method.
+         */
+        @Override
+        public void run() {
+            if (isClosed()) {
+                return;
+            }
+            logger.debug("Running the PubSubRequest Timeout Task");
+            // First check those non-subscription channels
+            for (HChannel channel : host2NonSubscriptionChannels.getChannels()) {
+                try {
+                    HChannelHandler channelHandler =
+                        HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel());
+                    channelHandler.checkTimeoutRequests();
+                } catch (NoResponseHandlerException nrhe) {
+                    continue;
+                }
+            }
+            // Then check those subscription channels
+            checkTimeoutRequestsOnSubscriptionChannels();
+        }
+    }
+
+    /**
+     * Chekout the pub/sub requests on subscription channels.
+     */
+    protected abstract void checkTimeoutRequestsOnSubscriptionChannels();
+
+    @Override
+    public boolean isClosed() {
+        closedLock.readLock().lock();
+        try {
+            return closed; 
+        } finally {
+            closedLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Close all subscription channels when close channel manager.
+     */
+    protected abstract void closeSubscriptionChannels();
+
+    @Override
+    public void close() {
+        logger.info("Shutting down the channels manager.");
+        closedLock.writeLock().lock();
+        try {
+            // Not first time to close
+            if (closed) {
+                return;
+            }
+            closed = true;
+        } finally {
+            closedLock.writeLock().unlock();
+        }
+        clientTimer.cancel();
+        // Clear all existed channels
+        host2NonSubscriptionChannels.close();
+
+        // clear all subscription channels
+        closeSubscriptionChannels();
+
+        // Clear out all Maps
+        topic2Host.clear();
+        host2Topics.clear();
+    }
+
+}

Copied: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java (from r1390401, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java?p2=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java&p1=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java&r1=1390401&r2=1390777&rev=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java Wed Sep 26 23:52:18 2012
@@ -15,7 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hedwig.client.netty;
+package org.apache.hedwig.client.netty.impl;
+
+import java.util.Map;
 
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -26,14 +28,26 @@ import org.jboss.netty.handler.codec.pro
 import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
 import org.jboss.netty.handler.ssl.SslHandler;
 
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.handlers.AbstractResponseHandler;
 import org.apache.hedwig.protocol.PubSubProtocol;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+
+public abstract class ClientChannelPipelineFactory implements ChannelPipelineFactory {
 
-public class ClientChannelPipelineFactory implements ChannelPipelineFactory {
+    protected ClientConfiguration cfg;
+    protected AbstractHChannelManager channelManager;
+
+    public ClientChannelPipelineFactory(ClientConfiguration cfg,
+                                        AbstractHChannelManager channelManager) {
+        this.cfg = cfg;
+        this.channelManager = channelManager;
+    }
 
-    private HedwigClientImpl client;
+    protected abstract Map<OperationType, AbstractResponseHandler> createResponseHandlers();
 
-    public ClientChannelPipelineFactory(HedwigClientImpl client) {
-        this.client = client;
+    private HChannelHandler createHChannelHandler() {
+        return new HChannelHandler(cfg, channelManager, createResponseHandlers());
     }
 
     // Retrieve a ChannelPipeline from the factory.
@@ -41,17 +55,17 @@ public class ClientChannelPipelineFactor
         // Create a new ChannelPipline using the factory method from the
         // Channels helper class.
         ChannelPipeline pipeline = Channels.pipeline();
-        if (client.getSslFactory() != null) {
-            pipeline.addLast("ssl", new SslHandler(client.getSslFactory().getEngine()));
+        if (channelManager.getSslFactory() != null) {
+            pipeline.addLast("ssl", new SslHandler(channelManager.getSslFactory().getEngine()));
         }
-        pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(client.getConfiguration()
-                         .getMaximumMessageSize(), 0, 4, 0, 4));
+        pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(
+                         cfg.getMaximumMessageSize(), 0, 4, 0, 4));
         pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
 
         pipeline.addLast("protobufdecoder", new ProtobufDecoder(PubSubProtocol.PubSubResponse.getDefaultInstance()));
         pipeline.addLast("protobufencoder", new ProtobufEncoder());
 
-        pipeline.addLast("responsehandler", new ResponseHandler(client));
+        pipeline.addLast("responsehandler", createHChannelHandler());
         return pipeline;
     }
 

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl;
+
+import java.net.InetSocketAddress;
+
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import static org.apache.hedwig.util.VarArgs.va;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handle requests sent to default hub server. <b>DefaultServerChannel</b> would not
+ * be used as a channel to send requests directly. It just takes the responsibility to
+ * connect to the default server. After the underlying netty channel is established,
+ * it would call {@link HChannelManager#submitOpThruChannel()} to send requests thru
+ * the underlying netty channel.
+ */
+class DefaultServerChannel extends HChannelImpl {
+
+    private static Logger logger = LoggerFactory.getLogger(DefaultServerChannel.class);
+
+    DefaultServerChannel(InetSocketAddress host, AbstractHChannelManager channelManager) {
+        super(host, channelManager);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("[DefaultServer: ").append(host).append("]");
+        return sb.toString();
+    }
+
+    @Override
+    public void submitOp(final PubSubData pubSubData) {
+        // for each pub/sub request sent to default hub server
+        // we would establish a fresh connection for it
+        ClientChannelPipelineFactory pipelineFactory;
+        if (OperationType.PUBLISH.equals(pubSubData.operationType) ||
+            OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) {
+            pipelineFactory = channelManager.getNonSubscriptionChannelPipelineFactory();
+        } else {
+            pipelineFactory = channelManager.getSubscriptionChannelPipelineFactory();
+        }
+        ChannelFuture future = connect(host, pipelineFactory);
+        future.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                // If the channel has been closed, there is no need to proceed with any callback
+                // logic here.
+                if (closed) {
+                    future.getChannel().close();
+                    return;
+                }
+
+                // Check if the connection to the server was done successfully.
+                if (!future.isSuccess()) {
+                    logger.error("Error connecting to host {}.", host);
+                    future.getChannel().close();
+
+                    retryOrFailOp(pubSubData);
+                    // Finished with failure logic so just return.
+                    return;
+                }
+                logger.debug("Connected to host {} for pubSubData: {}",
+                             va(host, pubSubData));
+                channelManager.submitOpThruChannel(pubSubData, future.getChannel());
+            }
+        });
+    }
+
+}

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.client.handlers.AbstractResponseHandler;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+import org.apache.hedwig.exceptions.PubSubException.UncertainStateException;
+import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import static org.apache.hedwig.util.VarArgs.va;
+
+@ChannelPipelineCoverage("all")
+public class HChannelHandler extends SimpleChannelHandler {
+
+    private static Logger logger = LoggerFactory.getLogger(HChannelHandler.class);
+
+    // Concurrent Map to store for each async PubSub request, the txn ID
+    // and the corresponding PubSub call's data which stores the VoidCallback to
+    // invoke when we receive a PubSub ack response from the server.
+    // This is specific to this instance of the HChannelHandler which is
+    // tied to a specific netty Channel Pipeline.
+    private final ConcurrentMap<Long, PubSubData> txn2PubSubData =
+        new ConcurrentHashMap<Long, PubSubData>();
+
+    // Boolean indicating if we closed the channel this HChannelHandler is
+    // attached to explicitly or not. If so, we do not need to do the
+    // channel disconnected logic here.
+    private volatile boolean channelClosedExplicitly = false;
+
+    private final AbstractHChannelManager channelManager;
+    private final ClientConfiguration cfg;
+
+    private final Map<OperationType, AbstractResponseHandler> handlers;
+    private final SubscribeResponseHandler subHandler;
+
+    public HChannelHandler(ClientConfiguration cfg,
+                           AbstractHChannelManager channelManager,
+                           Map<OperationType, AbstractResponseHandler> handlers) {
+        this.cfg = cfg;
+        this.channelManager = channelManager;
+        this.handlers = handlers;
+        subHandler = (SubscribeResponseHandler) handlers.get(OperationType.SUBSCRIBE);
+    }
+
+    public SubscribeResponseHandler getSubscribeResponseHandler() {
+        return subHandler;
+    }
+
+    public void removeTxn(long txnId) {
+        txn2PubSubData.remove(txnId);
+    }
+
+    public void addTxn(long txnId, PubSubData pubSubData) {
+        txn2PubSubData.put(txnId, pubSubData);
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+        // If the Message is not a PubSubResponse, just send it upstream and let
+        // something else handle it.
+        if (!(e.getMessage() instanceof PubSubResponse)) {
+            ctx.sendUpstream(e);
+            return;
+        }
+        // Retrieve the PubSubResponse from the Message that was sent by the
+        // server.
+        PubSubResponse response = (PubSubResponse) e.getMessage();
+        logger.debug("Response received from host: {}, response: {}.",
+                     va(NetUtils.getHostFromChannel(ctx.getChannel()), response));
+
+        // Determine if this PubSubResponse is an ack response for a PubSub
+        // Request or if it is a message being pushed to the client subscriber.
+        if (response.hasMessage()) {
+            // Subscribed messages being pushed to the client so handle/consume
+            // it and return.
+            if (null == subHandler) {
+                logger.error("Received message from a non-subscription channel : {}",
+                             response);
+            } else {
+                subHandler.handleSubscribeMessage(response);
+            }
+            return;
+        }
+
+        // Response is an ack to a prior PubSubRequest so first retrieve the
+        // PubSub data for this txn.
+        PubSubData pubSubData = txn2PubSubData.remove(response.getTxnId());
+
+        // Validate that the PubSub data for this txn is stored. If not, just
+        // log an error message and return since we don't know how to handle
+        // this.
+        if (pubSubData == null) {
+            logger.error("PubSub Data was not found for PubSubResponse: {}", response);
+            return;
+        }
+
+        // Store the topic2Host mapping if this wasn't a server redirect. We'll
+        // assume that if the server was able to have an open Channel connection
+        // to the client, and responded with an ack message other than the
+        // NOT_RESPONSIBLE_FOR_TOPIC one, it is the correct topic master.
+        if (!response.getStatusCode().equals(StatusCode.NOT_RESPONSIBLE_FOR_TOPIC)) {
+            // Retrieve the server host that we've connected to and store the
+            // mapping from the topic to this host. For all other non-redirected
+            // server statuses, we consider that as a successful connection to the
+            // correct topic master.
+            InetSocketAddress host = NetUtils.getHostFromChannel(ctx.getChannel());
+            channelManager.storeTopic2HostMapping(pubSubData.topic, host);
+        }
+
+        // Depending on the operation type, call the appropriate handler.
+        logger.debug("Handling a {} response: {}, pubSubData: {}, host: {}.",
+                     va(pubSubData.operationType, response, pubSubData, ctx.getChannel()));
+        AbstractResponseHandler respHandler = handlers.get(pubSubData.operationType);
+        if (null == respHandler) {
+            // The above are the only expected PubSubResponse messages received
+            // from the server for the various client side requests made.
+            logger.error("Response received from server is for an unhandled operation {}, txnId: {}.",
+                         va(pubSubData.operationType, response.getTxnId()));
+            pubSubData.getCallback().operationFailed(pubSubData.context,
+                new UnexpectedConditionException("Can't find response handler for operation "
+                                                 + pubSubData.operationType));
+            return;
+        }
+        respHandler.handleResponse(response, pubSubData, ctx.getChannel());
+    }
+
+    public void checkTimeoutRequests() {
+        long curTime = System.currentTimeMillis();
+        long timeoutInterval = cfg.getServerAckResponseTimeout();
+        for (PubSubData pubSubData : txn2PubSubData.values()) {
+            checkTimeoutRequest(pubSubData, curTime, timeoutInterval);
+        }
+    }
+
+    private void checkTimeoutRequest(PubSubData pubSubData,
+                                     long curTime, long timeoutInterval) {
+        if (curTime > pubSubData.requestWriteTime + timeoutInterval) {
+            // Current PubSubRequest has timed out so remove it from the
+            // ResponseHandler's map and invoke the VoidCallback's
+            // operationFailed method.
+            logger.error("Current PubSubRequest has timed out for pubSubData: " + pubSubData);
+            txn2PubSubData.remove(pubSubData.txnId);
+            pubSubData.getCallback().operationFailed(pubSubData.context,
+                new UncertainStateException("Server ack response never received so PubSubRequest has timed out!"));
+        }
+    }
+
+    // Logic to deal with what happens when a Channel to a server host is
+    // disconnected.
+    @Override
+    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+        // If this channel was closed explicitly by the client code,
+        // we do not need to do any of this logic. This could happen
+        // for redundant Publish channels created or redirected subscribe
+        // channels that are not used anymore or when we shutdown the
+        // client and manually close all of the open channels.
+        // Also don't do any of the disconnect logic if the client has stopped.
+        if (channelClosedExplicitly || channelManager.isClosed()) {
+            return;
+        }
+
+        // Make sure the host retrieved is not null as there could be some weird
+        // channel disconnect events happening during a client shutdown.
+        // If it is, just return as there shouldn't be anything we need to do.
+        InetSocketAddress host = NetUtils.getHostFromChannel(ctx.getChannel());
+        if (host == null) {
+            return;
+        }
+
+        logger.info("Channel {} was disconnected to host {}.",
+                    va(ctx.getChannel(), host));
+
+        // If this Channel was used for Publish and Unsubscribe flows, just
+        // remove it from the HewdigPublisher's host2Channel map. We will
+        // re-establish a Channel connection to that server when the next
+        // publish/unsubscribe request to a topic that the server owns occurs.
+
+        // Now determine what type of operation this channel was used for.
+        if (null == subHandler) {
+            channelManager.onNonSubscriptionChannelDisconnected(host, ctx.getChannel());
+        } else {
+            channelManager.onSubscriptionChannelDisconnected(host, ctx.getChannel());
+        }
+
+        // Finally, all of the PubSubRequests that are still waiting for an ack
+        // response from the server need to be removed and timed out. Invoke the
+        // operationFailed callbacks on all of them. Use the
+        // UncertainStateException since the server did receive the request but
+        // we're not sure of the state of the request since the ack response was
+        // never received.
+        for (PubSubData pubSubData : txn2PubSubData.values()) {
+            logger.debug("Channel disconnected so invoking the operationFailed callback for pubSubData: {}",
+                         pubSubData);
+            pubSubData.getCallback().operationFailed(pubSubData.context, new UncertainStateException(
+                                                     "Server ack response never received before server connection disconnected!"));
+        }
+        txn2PubSubData.clear();
+    }
+
+    // Logic to deal with what happens when a Channel to a server host is
+    // connected. This is needed if the client is using an SSL port to
+    // communicate with the server. If so, we need to do the SSL handshake here
+    // when the channel is first connected.
+    @Override
+    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+        // No need to initiate the SSL handshake if we are closing this channel
+        // explicitly or the client has been stopped.
+        if (cfg.isSSLEnabled() && !channelClosedExplicitly && !channelManager.isClosed()) {
+            logger.debug("Initiating the SSL handshake");
+            ctx.getPipeline().get(SslHandler.class).handshake(e.getChannel());
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+        logger.error("Exception caught on client channel", e.getCause());
+        e.getChannel().close();
+    }
+
+    public void closeExplicitly() {
+        // TODO: BOOKKEEPER-350 : Handle consume buffering, etc here - in a different patch
+        channelClosedExplicitly = true;
+    }
+}

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayDeque;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import com.google.protobuf.ByteString;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
+import org.apache.hedwig.client.netty.HChannel;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.util.HedwigSocketAddress;
+import static org.apache.hedwig.util.VarArgs.va;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provide a wrapper over netty channel for Hedwig operations.
+ */
+public class HChannelImpl implements HChannel {
+
+    private static Logger logger = LoggerFactory.getLogger(HChannelImpl.class);
+
+    enum State {
+        DISCONNECTED,
+        CONNECTING,
+        CONNECTED,
+    };
+
+    InetSocketAddress host;
+    final AbstractHChannelManager channelManager;
+    final ClientChannelPipelineFactory pipelineFactory;
+    volatile Channel channel;
+    volatile State state;
+
+    // Indicates whether the channel is closed or not.
+    volatile boolean closed = false;
+    // Queue the pubsub requests when the channel is not connected.
+    Queue<PubSubData> pendingOps = new ArrayDeque<PubSubData>();
+
+    /**
+     * Create a un-established channel with provided target <code>host</code>.
+     *
+     * @param host
+     *          Target host address.
+     * @param channelManager
+     *          Channel manager manages the channels.
+     */
+    protected HChannelImpl(InetSocketAddress host, AbstractHChannelManager channelManager) {
+        this(host, channelManager, null);
+    }
+
+    public HChannelImpl(InetSocketAddress host, AbstractHChannelManager channelManager,
+                        ClientChannelPipelineFactory pipelineFactory) {
+        this(host, null, channelManager, pipelineFactory);
+        state = State.DISCONNECTED;
+    }
+
+    /**
+     * Create a <code>HChannel</code> with an established netty channel.
+     *
+     * @param host
+     *          Target host address.
+     * @param channel
+     *          Established Netty channel.
+     * @param channelManager
+     *          Channel manager manages the channels.
+     */
+    public HChannelImpl(InetSocketAddress host, Channel channel,
+                        AbstractHChannelManager channelManager,
+                        ClientChannelPipelineFactory pipelineFactory) {
+        this.host = host;
+        this.channel = channel;
+        this.channelManager = channelManager;
+        this.pipelineFactory = pipelineFactory;
+        state = State.CONNECTED;
+    }
+
+    @Override
+    public void submitOp(PubSubData pubSubData) {
+        boolean doOpNow = false;
+
+        // common case without lock first
+        if (null != channel && State.CONNECTED == state) {
+            doOpNow = true;
+        } else {
+            synchronized (this) {
+                // check channel & state again under lock
+                if (null != channel && State.CONNECTED == state) {
+                    doOpNow = true;
+                } else {
+                    // if reached here, channel is either null (first connection attempt),
+                    // or the channel is disconnected. Connection attempt is still in progress,
+                    // queue up this op. Op will be executed when connection attempt either
+                    // fails or succeeds
+                    pendingOps.add(pubSubData);
+                }
+            }
+            if (!doOpNow) {
+                // start connection attempt to server
+                connect();
+            }
+        }
+        if (doOpNow) {
+            executeOpAfterConnected(pubSubData); 
+        }
+    }
+
+    /**
+     * Execute pub/sub operation after the underlying channel is connected.
+     *
+     * @param pubSubData
+     *          Pub/Sub Operation
+     */
+    private void executeOpAfterConnected(PubSubData pubSubData) {
+        PubSubRequest.Builder reqBuilder =
+            NetUtils.buildPubSubRequest(channelManager.nextTxnId(), pubSubData);
+        writePubSubRequest(pubSubData, reqBuilder.build());
+    }
+
+    @Override
+    public Channel getChannel() {
+        return channel;
+    }
+
+    private void writePubSubRequest(PubSubData pubSubData, PubSubRequest pubSubRequest) {
+        if (closed || null == channel || State.CONNECTED != state) {
+            retryOrFailOp(pubSubData);
+            return;
+        }
+
+        // Before we do the write, store this information into the
+        // ResponseHandler so when the server responds, we know what
+        // appropriate Callback Data to invoke for the given txn ID.
+        try {
+            getHChannelHandlerFromChannel(channel)
+                .addTxn(pubSubData.txnId, pubSubData);
+        } catch (NoResponseHandlerException nrhe) {
+            logger.warn("No Channel Handler found for channel {} when writing request."
+                        + " It might already disconnect.", channel);
+            return;
+        }
+
+        // Finally, write the pub/sub request through the Channel.
+        logger.debug("Writing a {} request to host: {} for pubSubData: {}.",
+                     va(pubSubData.operationType, host, pubSubData));
+        ChannelFuture future = channel.write(pubSubRequest);
+        future.addListener(new WriteCallback(pubSubData, channelManager));
+    }
+
+    /**
+     * Re-submit operation to default server or fail it.
+     *
+     * @param pubSubData
+     *          Pub/Sub Operation
+     */
+    protected void retryOrFailOp(PubSubData pubSubData) {
+        // if we were not able to connect to the host, it could be down
+        ByteString hostString = ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(host));
+        if (pubSubData.connectFailedServers != null &&
+            pubSubData.connectFailedServers.contains(hostString)) {
+            // We've already tried to connect to this host before so just
+            // invoke the operationFailed callback.
+            logger.error("Error connecting to host {} more than once so fail the request: {}",
+                         va(host, pubSubData));
+            pubSubData.getCallback().operationFailed(pubSubData.context,
+                new CouldNotConnectException("Could not connect to host: " + host));
+        } else {
+            logger.error("Retry to connect to default hub server again for pubSubData: {}",
+                         pubSubData);
+            // Keep track of this current server that we failed to connect
+            // to but retry the request on the default server host/VIP.
+            if (pubSubData.connectFailedServers == null) {
+                pubSubData.connectFailedServers = new LinkedList<ByteString>();
+            }
+            pubSubData.connectFailedServers.add(hostString);
+            channelManager.submitOpToDefaultServer(pubSubData);
+        }
+    }
+
+    private void onChannelConnected(ChannelFuture future) {
+        Queue<PubSubData> oldPendingOps;
+        synchronized (this) {
+            // if the channel is closed by client, do nothing
+            if (closed) {
+                future.getChannel().close();
+                return;
+            }
+            state = State.CONNECTED;
+            channel = future.getChannel();
+            host = NetUtils.getHostFromChannel(channel);
+            oldPendingOps = pendingOps;
+            pendingOps = new ArrayDeque<PubSubData>();
+        }
+        for (PubSubData op : oldPendingOps) {
+            executeOpAfterConnected(op);
+        }
+    }
+
+    private void onChannelConnectFailure() {
+        Queue<PubSubData> oldPendingOps;
+        synchronized (this) {
+            state = State.DISCONNECTED;
+            channel = null;
+            oldPendingOps = pendingOps;
+            pendingOps = new ArrayDeque<PubSubData>();
+        }
+        for (PubSubData op : oldPendingOps) {
+            retryOrFailOp(op);
+        }
+    }
+
+    private void connect() {
+        synchronized (this) {
+            if (State.CONNECTING == state ||
+                State.CONNECTED == state) {
+                return;
+            }
+            state = State.CONNECTING;
+        }
+        // Start the connection attempt to the input server host.
+        ChannelFuture future = connect(host, pipelineFactory);
+        future.addListener(new ChannelFutureListener() {
+
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                // If the channel has been closed, there is no need to proceed with any
+                // callback logic here.
+                if (closed) {
+                    future.getChannel().close();
+                    return;
+                }
+
+                if (!future.isSuccess()) {
+                    logger.error("Error connecting to host {}.", host);
+                    future.getChannel().close();
+
+                    // if we were not able to connect to the host, it could be down.
+                    onChannelConnectFailure();
+                    return;
+                }
+                logger.debug("Connected to server {}.", host);
+                // Now that we have connected successfully to the server, execute all queueing
+                // requests.
+                onChannelConnected(future);
+            }
+
+        });
+    }
+
+    /**
+     * This is a helper method to do the connect attempt to the server given the
+     * inputted host/port. This can be used to connect to the default server
+     * host/port which is the VIP. That will pick a server in the cluster at
+     * random to connect to for the initial PubSub attempt (with redirect logic
+     * being done at the server side). Additionally, this could be called after
+     * the client makes an initial PubSub attempt at a server, and is redirected
+     * to the one that is responsible for the topic. Once the connect to the
+     * server is done, we will perform the corresponding PubSub write on that
+     * channel.
+     *
+     * @param serverHost
+     *            Input server host to connect to of type InetSocketAddress
+     * @param pipelineFactory
+     *            PipelineFactory to create response handler to handle responses from
+     *            underlying channel.
+     */
+    protected ChannelFuture connect(InetSocketAddress serverHost,
+                                    ClientChannelPipelineFactory pipelineFactory) {
+        logger.debug("Connecting to host {} ...", serverHost);
+        // Set up the ClientBootStrap so we can create a new Channel connection
+        // to the server.
+        ClientBootstrap bootstrap = new ClientBootstrap(channelManager.getChannelFactory());
+        bootstrap.setPipelineFactory(pipelineFactory);
+        bootstrap.setOption("tcpNoDelay", true);
+        bootstrap.setOption("keepAlive", true);
+
+        // Start the connection attempt to the input server host.
+        return bootstrap.connect(serverHost);
+    }
+
+    @Override
+    public void close(boolean wait) {
+        synchronized (this) {
+            if (closed) {
+                return;
+            }
+            closed = true;
+        }
+        if (null == channel) {
+            return;
+        }
+        try {
+            getHChannelHandlerFromChannel(channel).closeExplicitly();
+        } catch (NoResponseHandlerException nrhe) {
+            logger.warn("No channel handler found for channel {} when closing it.",
+                        channel);
+        }
+        if (wait) {
+            channel.close().awaitUninterruptibly();
+        } else {
+            channel.close();
+        }
+        channel = null;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("[HChannel: host - ").append(host)
+          .append(", channel - ").append(channel)
+          .append(", pending reqs - ").append(pendingOps.size())
+          .append(", closed - ").append(closed).append("]");
+        return sb.toString();
+    }
+
+    @Override
+    public void close() {
+        close(false);
+    }
+
+    /**
+     * Helper static method to get the ResponseHandler instance from a Channel
+     * via the ChannelPipeline it is associated with. The assumption is that the
+     * last ChannelHandler tied to the ChannelPipeline is the ResponseHandler.
+     *
+     * @param channel
+     *            Channel we are retrieving the ResponseHandler instance for
+     * @return ResponseHandler Instance tied to the Channel's Pipeline
+     */
+    public static HChannelHandler getHChannelHandlerFromChannel(Channel channel)
+    throws NoResponseHandlerException {
+        if (null == channel) {
+            throw new NoResponseHandlerException("Received a null value for the channel. Cannot retrieve the response handler");
+        }
+
+        HChannelHandler handler = (HChannelHandler) channel.getPipeline().getLast();
+        if (null == handler) {
+            throw new NoResponseHandlerException("Could not retrieve the response handler from the channel's pipeline.");
+        }
+        return handler;
+    }
+
+}

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.handlers.AbstractResponseHandler;
+import org.apache.hedwig.client.handlers.PublishResponseHandler;
+import org.apache.hedwig.client.handlers.UnsubscribeResponseHandler;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+
+public class NonSubscriptionChannelPipelineFactory extends ClientChannelPipelineFactory {
+
+    public NonSubscriptionChannelPipelineFactory(ClientConfiguration cfg,
+                                                 AbstractHChannelManager channelManager) {
+        super(cfg, channelManager);
+    }
+
+    @Override
+    protected Map<OperationType, AbstractResponseHandler> createResponseHandlers() {
+        Map<OperationType, AbstractResponseHandler> handlers =
+            new HashMap<OperationType, AbstractResponseHandler>();
+        handlers.put(OperationType.PUBLISH,
+                     new PublishResponseHandler(cfg, channelManager));
+        handlers.put(OperationType.UNSUBSCRIBE,
+                     new UnsubscribeResponseHandler(cfg, channelManager));
+        return handlers;
+    }
+
+}

Copied: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java (from r1390401, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java?p2=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java&p1=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java&r1=1390401&r2=1390777&rev=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java Wed Sep 26 23:52:18 2012
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hedwig.client.netty;
+package org.apache.hedwig.client.netty.impl;
 
 import java.net.InetSocketAddress;
 import java.util.LinkedList;
@@ -27,8 +27,9 @@ import org.jboss.netty.channel.ChannelFu
 import org.jboss.netty.channel.ChannelFutureListener;
 
 import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.netty.HChannelManager;
+import org.apache.hedwig.client.netty.NetUtils;
 import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
 import org.apache.hedwig.util.HedwigSocketAddress;
 
@@ -38,38 +39,41 @@ public class WriteCallback implements Ch
 
     // Private member variables
     private PubSubData pubSubData;
-    private final HedwigClientImpl client;
-    private final ClientConfiguration cfg;
+    private final HChannelManager channelManager;
 
     // Constructor
-    public WriteCallback(PubSubData pubSubData, HedwigClientImpl client) {
+    public WriteCallback(PubSubData pubSubData,
+                         HChannelManager channelManager) {
         super();
         this.pubSubData = pubSubData;
-        this.client = client;
-        this.cfg = client.getConfiguration();
+        this.channelManager = channelManager;
     }
 
     public void operationComplete(ChannelFuture future) throws Exception {
         // If the client has stopped, there is no need to proceed
         // with any callback logic here.
-        if (client.hasStopped()) {
+        if (channelManager.isClosed()) {
             future.getChannel().close();
             return;
         }
 
         // When the write operation to the server is done, we just need to check
         // if it was successful or not.
-        InetSocketAddress host = HedwigClientImpl.getHostFromChannel(future.getChannel());
+        InetSocketAddress host = NetUtils.getHostFromChannel(future.getChannel());
         if (!future.isSuccess()) {
-            logger.error("Error writing on channel to host: " + host);
+            logger.error("Error writing on channel to host: {}", host);
             // On a write failure for a PubSubRequest, we also want to remove
             // the saved txnId to PubSubData in the ResponseHandler. These
             // requests will not receive an ack response from the server
             // so there is no point storing that information there anymore.
             try {
-                HedwigClientImpl.getResponseHandlerFromChannel(future.getChannel()).txn2PubSubData.remove(pubSubData.txnId);
+                HChannelHandler channelHandler = 
+                    HChannelImpl.getHChannelHandlerFromChannel(future.getChannel());
+                channelHandler.removeTxn(pubSubData.txnId);
+                channelHandler.closeExplicitly();
             } catch (NoResponseHandlerException e) {
-                // We just couldn't remove the transaction ID's mapping. The handler was null, so this has been reset anyway.
+                // We just couldn't remove the transaction ID's mapping.
+                // The handler was null, so this has been reset anyway.
                 logger.warn("Could not find response handler to remove txnId mapping to pubsub data. Ignoring.");
             }
 
@@ -93,7 +97,7 @@ public class WriteCallback implements Ch
                 if (pubSubData.writeFailedServers == null)
                     pubSubData.writeFailedServers = new LinkedList<ByteString>();
                 pubSubData.writeFailedServers.add(hostString);
-                client.doConnect(pubSubData, cfg.getDefaultServerHost());
+                channelManager.submitOpToDefaultServer(pubSubData);
             }
         } else {
             // Now that the write to the server is done, we have to wait for it

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl.simple;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+
+import org.apache.hedwig.client.netty.CleanupChannelMap;
+import org.apache.hedwig.client.netty.HChannel;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
+import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
+import org.apache.hedwig.client.netty.impl.HChannelHandler;
+import org.apache.hedwig.client.netty.impl.HChannelImpl;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.filter.ClientMessageFilter;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
+
+/**
+ * Simple HChannel Manager which establish a connection for each subscription.
+ */
+public class SimpleHChannelManager extends AbstractHChannelManager {
+
+    private static Logger logger = LoggerFactory.getLogger(SimpleHChannelManager.class);
+
+    // Concurrent Map to store the cached Channel connections on the client side
+    // to a server host for a given Topic + SubscriberId combination. For each
+    // TopicSubscriber, we want a unique Channel connection to the server for
+    // it. We can also get the ResponseHandler tied to the Channel via the
+    // Channel Pipeline.
+    protected final CleanupChannelMap<TopicSubscriber> topicSubscriber2Channel;
+
+    // Concurrent Map to store Message handler for each topic + sub id combination.
+    // Store it here instead of in SubscriberResponseHandler as we don't want to lose the handler
+    // user set when connection is recovered
+    protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler
+        = new ConcurrentHashMap<TopicSubscriber, MessageHandler>();
+
+    // PipelineFactory to create subscription netty channels to the appropriate server
+    private final ClientChannelPipelineFactory subscriptionChannelPipelineFactory;
+
+    public SimpleHChannelManager(ClientConfiguration cfg,
+                                 ChannelFactory socketFactory) {
+        super(cfg, socketFactory);
+        topicSubscriber2Channel = new CleanupChannelMap<TopicSubscriber>();
+        this.subscriptionChannelPipelineFactory =
+            new SimpleSubscriptionChannelPipelineFactory(cfg, this);
+    }
+
+    @Override
+    protected ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory() {
+        return subscriptionChannelPipelineFactory;
+    }
+
+    @Override
+    protected HChannel createAndStoreSubscriptionChannel(Channel channel) {
+        // for simple channel, we don't store subscription channel now
+        // we store it until we received success response
+        InetSocketAddress host = NetUtils.getHostFromChannel(channel);
+        return new HChannelImpl(host, channel, this,
+                                getSubscriptionChannelPipelineFactory());
+    }
+
+    @Override
+    protected HChannel createAndStoreSubscriptionChannel(InetSocketAddress host) {
+        // for simple channel, we don't store subscription channel now
+        // we store it until we received success response
+        return new HChannelImpl(host, this,
+                                getSubscriptionChannelPipelineFactory());
+    }
+
+    protected HChannel storeSubscriptionChannel(TopicSubscriber topicSubscriber,
+                                                Channel channel) {
+        InetSocketAddress host = NetUtils.getHostFromChannel(channel);
+        HChannel newHChannel = new HChannelImpl(host, channel, this,
+                                                getSubscriptionChannelPipelineFactory());
+        return topicSubscriber2Channel.addChannel(topicSubscriber, newHChannel);
+    }
+
+    @Override
+    protected HChannel getSubscriptionChannel(InetSocketAddress host) {
+        return null;
+    }
+
+    @Override
+    protected HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber subscriber) {
+        HChannel channel = topicSubscriber2Channel.getChannel(subscriber);
+        if (null != channel) {
+            // there is no channel established for this subscription
+            return channel;
+        } else {
+            InetSocketAddress host = topic2Host.get(subscriber.getTopic());
+            if (null == host) {
+                return null;
+            } else {
+                channel = getSubscriptionChannel(host);
+                if (null == channel) {
+                    channel = createAndStoreSubscriptionChannel(host);
+                }
+                return channel;
+            }
+        }
+    }
+
+    @Override
+    protected void onSubscriptionChannelDisconnected(InetSocketAddress host,
+                                                     Channel channel) {
+        logger.info("Subscription Channel {} disconnected from {}.",
+                    va(channel, host));
+        try {
+            // get hchannel handler
+            HChannelHandler channelHandler =
+                HChannelImpl.getHChannelHandlerFromChannel(channel);
+            channelHandler.getSubscribeResponseHandler()
+                          .onChannelDisconnected(host, channel);
+        } catch (NoResponseHandlerException nrhe) {
+            logger.warn("No Channel Handler found for channel {} when it disconnected.",
+                        channel);
+        }
+    }
+
+    @Override
+    public SubscribeResponseHandler getSubscribeResponseHandler(TopicSubscriber topicSubscriber) {
+        HChannel hChannel = topicSubscriber2Channel.getChannel(topicSubscriber);
+        if (null == hChannel) {
+            return null;
+        }
+        Channel channel = hChannel.getChannel();
+        if (null == channel) {
+            return null;
+        }
+        try {
+            HChannelHandler channelHandler =
+                HChannelImpl.getHChannelHandlerFromChannel(channel);
+            return channelHandler.getSubscribeResponseHandler();
+        } catch (NoResponseHandlerException nrhe) {
+            logger.warn("No Channel Handler found for channel {}, topic subscriber {}.",
+                        channel, topicSubscriber);
+            return null;
+        }
+
+    }
+
+    @Override
+    public void startDelivery(TopicSubscriber topicSubscriber,
+                              MessageHandler messageHandler)
+        throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+        startDelivery(topicSubscriber, messageHandler, false);
+    }
+
+    protected void restartDelivery(TopicSubscriber topicSubscriber)
+        throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+        startDelivery(topicSubscriber, null, true);
+    }
+
+    private void startDelivery(TopicSubscriber topicSubscriber,
+                               MessageHandler messageHandler, boolean restart)
+        throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+        // Make sure we know about this topic subscription on the client side
+        // exists. The assumption is that the client should have in memory the
+        // Channel created for the TopicSubscriber once the server has sent
+        // an ack response to the initial subscribe request.
+        SubscribeResponseHandler subscribeResponseHandler =
+            getSubscribeResponseHandler(topicSubscriber);
+        if (null == subscribeResponseHandler ||
+            !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
+            logger.error("Client is not yet subscribed to {}.", topicSubscriber);
+            throw new ClientNotSubscribedException("Client is not yet subscribed to "
+                                                   + topicSubscriber);
+        }
+
+        MessageHandler existedMsgHandler = topicSubscriber2MessageHandler.get(topicSubscriber);
+        if (restart) {
+            // restart using existing msg handler 
+            messageHandler = existedMsgHandler;
+        } else {
+            // some has started delivery but not stop it
+            if (null != existedMsgHandler) {
+                throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber);
+            }
+            if (messageHandler != null) {
+                if (null != topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) {
+                    throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber);
+                }
+            }
+        }
+
+        // tell subscribe response handler to start delivering messages for topicSubscriber
+        subscribeResponseHandler.startDelivery(topicSubscriber, messageHandler);
+    }
+
+    public void stopDelivery(TopicSubscriber topicSubscriber)
+    throws ClientNotSubscribedException {
+        // Make sure we know that this topic subscription on the client side
+        // exists. The assumption is that the client should have in memory the
+        // Channel created for the TopicSubscriber once the server has sent
+        // an ack response to the initial subscribe request.
+        SubscribeResponseHandler subscribeResponseHandler =
+            getSubscribeResponseHandler(topicSubscriber);
+        if (null == subscribeResponseHandler ||
+            !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
+            logger.error("Client is not yet subscribed to {}.", topicSubscriber);
+            throw new ClientNotSubscribedException("Client is not yet subscribed to "
+                                                   + topicSubscriber);
+        }
+
+        // tell subscribe response handler to stop delivering messages for a given topic subscriber
+        topicSubscriber2MessageHandler.remove(topicSubscriber);
+        subscribeResponseHandler.stopDelivery(topicSubscriber);
+    }
+                            
+
+    @Override
+    public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
+                                       final Callback<ResponseBody> callback,
+                                       final Object context) {
+        HChannel hChannel = topicSubscriber2Channel.removeChannel(topicSubscriber);
+        if (null == hChannel) {
+            logger.warn("Trying to close a subscription when we don't have a subscribe channel cached for {}",
+                        topicSubscriber);
+            callback.operationFinished(context, (ResponseBody)null);
+            return;
+        }
+
+        Channel channel = hChannel.getChannel();
+        if (null == channel) {
+            callback.operationFinished(context, (ResponseBody)null);
+            return;
+        }
+
+        try {
+            HChannelImpl.getHChannelHandlerFromChannel(channel).closeExplicitly();
+        } catch (NoResponseHandlerException nrhe) {
+            logger.warn("No Channel Handler found when closing {}'s channel {}.",
+                        channel, topicSubscriber);
+        }
+        ChannelFuture future = channel.close();
+        future.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                if (!future.isSuccess()) {
+                    logger.error("Failed to close the subscription channel for {}",
+                                 topicSubscriber);
+                    callback.operationFailed(context, new ServiceDownException(
+                        "Failed to close the subscription channel for " + topicSubscriber));
+                } else {
+                    callback.operationFinished(context, (ResponseBody)null);
+                }
+            }
+        });
+    }
+
+    @Override
+    protected void checkTimeoutRequestsOnSubscriptionChannels() {
+        // timeout task may be started before constructing topicSubscriber2Channel
+        if (null == topicSubscriber2Channel) {
+            return;
+        }
+        for (HChannel channel : topicSubscriber2Channel.getChannels()) {
+            try {
+                HChannelHandler channelHandler =
+                    HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel());
+                channelHandler.checkTimeoutRequests();
+            } catch (NoResponseHandlerException nrhe) {
+                continue;
+            }
+        }
+    }
+
+    @Override
+    protected void closeSubscriptionChannels() {
+        topicSubscriber2Channel.close();
+    }
+}