You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/27 01:52:20 UTC

svn commit: r1390777 [1/4] - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/data/ hedwig-client/src/main/java/org/apache/hedwig/client/handlers/ hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig...

Author: ivank
Date: Wed Sep 26 23:52:18 2012
New Revision: 1390777

URL: http://svn.apache.org/viewvc?rev=1390777&view=rev
Log:
BOOKKEEPER-364: re-factor hedwig java client to support both one-subscription-per-channel and multiplex-subscriptions-per-channel. (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java
      - copied, changed from r1390401, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java
      - copied, changed from r1390401, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SubscribeReconnectCallback.java
      - copied, changed from r1390401, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/package-info.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/VarArgs.java
Removed:
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Sep 26 23:52:18 2012
@@ -174,6 +174,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-335: client-side message filter for cpp client. (sijie via ivank)
 
+        BOOKKEEPER-364: re-factor hedwig java client to support both one-subscription-per-channel and multiplex-subscriptions-per-channel. (sijie via ivank)
+
 Release 4.1.0 - 2012-06-07
 
   Non-backward compatible changes:

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java Wed Sep 26 23:52:18 2012
@@ -17,7 +17,7 @@
  */
 package org.apache.hedwig.client.data;
 
-import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.data.TopicSubscriber;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 
 /**
@@ -31,28 +31,26 @@ import org.apache.hedwig.protocol.PubSub
 public class MessageConsumeData {
 
     // Member variables
-    public final ByteString topic;
-    public final ByteString subscriberId;
+    public final TopicSubscriber topicSubscriber;
     // This is the Message sent from the server for Subscribes for consumption
     // by the client.
     public final Message msg;
 
     // Constructor
-    public MessageConsumeData(final ByteString topic, final ByteString subscriberId, final Message msg) {
-        this.topic = topic;
-        this.subscriberId = subscriberId;
+    public MessageConsumeData(final TopicSubscriber topicSubscriber, final Message msg) {
+        this.topicSubscriber = topicSubscriber;
         this.msg = msg;
     }
 
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
-        if (topic != null)
-            sb.append("Topic: " + topic.toStringUtf8());
-        if (subscriberId != null)
-            sb.append(PubSubData.COMMA).append("SubscriberId: " + subscriberId.toStringUtf8());
-        if (msg != null)
-            sb.append(PubSubData.COMMA).append("Message: " + msg);
+        if (topicSubscriber != null) {
+            sb.append("Subscription: ").append(topicSubscriber);
+        }
+        if (msg != null) {
+            sb.append(PubSubData.COMMA).append("Message: ").append(msg);
+        }
         return sb.toString();
     }
 }

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.handlers;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+
+import com.google.protobuf.ByteString;
+
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.netty.HChannelManager;
+import org.apache.hedwig.client.exceptions.ServerRedirectLoopException;
+import org.apache.hedwig.client.exceptions.TooManyServerRedirectsException;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.util.HedwigSocketAddress;
+import static org.apache.hedwig.util.VarArgs.va;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractResponseHandler {
+
+    private static Logger logger = LoggerFactory.getLogger(AbstractResponseHandler.class);
+
+    protected final ClientConfiguration cfg;
+    protected final HChannelManager channelManager;
+
+    protected AbstractResponseHandler(ClientConfiguration cfg,
+                                      HChannelManager channelManager) {
+        this.cfg = cfg;
+        this.channelManager = channelManager;
+    }
+
+    /**
+     * Logic to handle received response.
+     *
+     * @param response
+     *            PubSubResponse received from hub server.
+     * @param pubSubData
+     *            PubSubData for the pub/sub request.
+     * @param channel
+     *            Channel we used to make the request.
+     */
+    public abstract void handleResponse(PubSubResponse response, PubSubData pubSubData,
+                                        Channel channel) throws Exception;
+
+    /**
+     * Logic to repost a PubSubRequest when the server responds with a redirect
+     * indicating they are not the topic master.
+     *
+     * @param response
+     *            PubSubResponse from the server for the redirect
+     * @param pubSubData
+     *            PubSubData for the original PubSubRequest made
+     * @param channel
+     *            Channel Channel we used to make the original PubSubRequest
+     * @throws Exception
+     *             Throws an exception if there was an error in doing the
+     *             redirect repost of the PubSubRequest
+     */
+    protected void handleRedirectResponse(PubSubResponse response, PubSubData pubSubData,
+                                          Channel channel)
+            throws Exception {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Handling a redirect from host: {}, response: {}, pubSubData: {}",
+                         va(NetUtils.getHostFromChannel(channel), response, pubSubData));
+        }
+        // In this case, the PubSub request was done to a server that is not
+        // responsible for the topic. First make sure that we haven't
+        // exceeded the maximum number of server redirects.
+        int curNumServerRedirects = (pubSubData.triedServers == null) ? 0 : pubSubData.triedServers.size();
+        if (curNumServerRedirects >= cfg.getMaximumServerRedirects()) {
+            // We've already exceeded the maximum number of server redirects
+            // so consider this as an error condition for the client.
+            // Invoke the operationFailed callback and just return.
+            logger.debug("Exceeded the number of server redirects ({}) so error out.",
+                         curNumServerRedirects);
+            PubSubException exception = new ServiceDownException(
+                new TooManyServerRedirectsException("Already reached max number of redirects: "
+                                                    + curNumServerRedirects));
+            pubSubData.getCallback().operationFailed(pubSubData.context, exception);
+            return;
+        }
+
+        // We will redirect and try to connect to the correct server
+        // stored in the StatusMsg of the response. First store the
+        // server that we sent the PubSub request to for the topic.
+        ByteString triedServer = ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(
+                                                         NetUtils.getHostFromChannel(channel)));
+        if (pubSubData.triedServers == null) {
+            pubSubData.triedServers = new LinkedList<ByteString>();
+        }
+        pubSubData.shouldClaim = true;
+        pubSubData.triedServers.add(triedServer);
+
+        // Now get the redirected server host (expected format is
+        // Hostname:Port:SSLPort) from the server's response message. If one is
+        // not given for some reason, then redirect to the default server
+        // host/VIP to repost the request.
+        String statusMsg = response.getStatusMsg();
+        InetSocketAddress redirectedHost;
+        boolean redirectToDefaultServer;
+        if (statusMsg != null && statusMsg.length() > 0) {
+            if (cfg.isSSLEnabled()) {
+                redirectedHost = new HedwigSocketAddress(statusMsg).getSSLSocketAddress();
+            } else {
+                redirectedHost = new HedwigSocketAddress(statusMsg).getSocketAddress();
+            }
+            redirectToDefaultServer = false;
+        } else {
+            redirectedHost = cfg.getDefaultServerHost();
+            redirectToDefaultServer = true;
+        }
+
+        // Make sure the redirected server is not one we've already attempted
+        // already before in this PubSub request.
+        if (pubSubData.triedServers.contains(ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(redirectedHost)))) {
+            logger.error("We've already sent this PubSubRequest before to redirectedHost: {}, pubSubData: {}",
+                         va(redirectedHost, pubSubData));
+            PubSubException exception = new ServiceDownException(
+                new ServerRedirectLoopException("Already made the request before to redirected host: "
+                                                + redirectedHost));
+            pubSubData.getCallback().operationFailed(pubSubData.context, exception);
+            return;
+        }
+
+        // submit the pub/sub request to redirected host
+        if (redirectToDefaultServer) {
+            channelManager.submitOpToDefaultServer(pubSubData);
+        } else {
+            channelManager.redirectToHost(pubSubData, redirectedHost);
+        }
+    }
+
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java Wed Sep 26 23:52:18 2012
@@ -19,17 +19,16 @@ package org.apache.hedwig.client.handler
 
 import java.util.TimerTask;
 
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
-import org.apache.hedwig.client.netty.ResponseHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
 
+import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.data.MessageConsumeData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.netty.HedwigClientImpl;
+import org.apache.hedwig.client.netty.HChannelManager;
 import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protoextensions.MessageIdUtils;
 import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
 
 /**
  * This is the Callback used by the MessageHandlers on the client app when
@@ -37,83 +36,82 @@ import org.apache.hedwig.util.Callback;
  * asynchronously. This callback back to the client libs will be stateless so we
  * can use a singleton for the class. The object context used should be the
  * MessageConsumeData type. That will contain all of the information needed to
- * call the message consume logic in the client lib ResponseHandler.
+ * call the message consume logic in the client lib HChannelHandler.
  *
  */
 public class MessageConsumeCallback implements Callback<Void> {
 
     private static Logger logger = LoggerFactory.getLogger(MessageConsumeCallback.class);
 
-    private final HedwigClientImpl client;
+    private final HChannelManager channelManager;
+    private final long consumeRetryWaitTime;
 
-    public MessageConsumeCallback(HedwigClientImpl client) {
-        this.client = client;
+    public MessageConsumeCallback(ClientConfiguration cfg,
+                                  HChannelManager channelManager) {
+        this.channelManager = channelManager;
+        this.consumeRetryWaitTime =
+            cfg.getMessageConsumeRetryWaitTime();
     }
 
     class MessageConsumeRetryTask extends TimerTask {
         private final MessageConsumeData messageConsumeData;
-        private final TopicSubscriber topicSubscriber;
 
-        public MessageConsumeRetryTask(MessageConsumeData messageConsumeData, TopicSubscriber topicSubscriber) {
+        public MessageConsumeRetryTask(MessageConsumeData messageConsumeData) {
             this.messageConsumeData = messageConsumeData;
-            this.topicSubscriber = topicSubscriber;
         }
 
         @Override
         public void run() {
             // Try to consume the message again
-            Channel topicSubscriberChannel = client.getSubscriber().getChannelForTopic(topicSubscriber);
-            ResponseHandler handler = null;
-            try {
-                handler = HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel);
-            } catch (NoResponseHandlerException e) {
-                logger.debug("No response handler found while invoking asyncMessageConsumed in the Message"
-                    + " consume retry task.", e);
-                // Explicitly close the channel
-                if (null != topicSubscriberChannel) {
-                    topicSubscriberChannel.close();
-                }
+            SubscribeResponseHandler subscribeHChannelHandler =
+                channelManager.getSubscribeResponseHandler(messageConsumeData.topicSubscriber);
+            if (null == subscribeHChannelHandler ||
+                !subscribeHChannelHandler.hasSubscription(messageConsumeData.topicSubscriber)) {
+                logger.warn("No subscription {} found to retry delivering message {}.",
+                            va(messageConsumeData.topicSubscriber,
+                               MessageIdUtils.msgIdToReadableString(messageConsumeData.msg.getMsgId())));
                 return;
             }
-            handler.getSubscribeResponseHandler().asyncMessageConsume(messageConsumeData.msg);
+
+            subscribeHChannelHandler.asyncMessageDeliver(messageConsumeData.topicSubscriber,
+                                                         messageConsumeData.msg);
         }
     }
 
     public void operationFinished(Object ctx, Void resultOfOperation) {
         MessageConsumeData messageConsumeData = (MessageConsumeData) ctx;
-        TopicSubscriber topicSubscriber = new TopicSubscriber(messageConsumeData.topic, messageConsumeData.subscriberId);
-        // Message has been successfully consumed by the client app so callback
-        // to the ResponseHandler indicating that the message is consumed.
-        Channel topicSubscriberChannel = client.getSubscriber().getChannelForTopic(topicSubscriber);
-        ResponseHandler handler = null;
-        try {
-            handler = HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel);
-        } catch (NoResponseHandlerException e) {
-            logger.debug("No response handler found while invoking messageConsumed.", e);
-            // Explicitly close the channel
-            if (null != topicSubscriberChannel) {
-                topicSubscriberChannel.close();
-            }
+
+        SubscribeResponseHandler subscribeHChannelHandler =
+            channelManager.getSubscribeResponseHandler(messageConsumeData.topicSubscriber);
+        if (null == subscribeHChannelHandler ||
+            !subscribeHChannelHandler.hasSubscription(messageConsumeData.topicSubscriber)) {
+            logger.warn("No subscription {} found to consume message {}.",
+                        va(messageConsumeData.topicSubscriber,
+                           MessageIdUtils.msgIdToReadableString(messageConsumeData.msg.getMsgId())));
             return;
         }
-        handler.getSubscribeResponseHandler().messageConsumed(messageConsumeData.msg);
+
+        // Message has been successfully consumed by the client app so callback
+        // to the HChannelHandler indicating that the message is consumed.
+        subscribeHChannelHandler.messageConsumed(messageConsumeData.topicSubscriber,
+                                                 messageConsumeData.msg);
     }
 
     public void operationFailed(Object ctx, PubSubException exception) {
         // Message has NOT been successfully consumed by the client app so
-        // callback to the ResponseHandler to try the async MessageHandler
+        // callback to the HChannelHandler to try the async MessageHandler
         // Consume logic again.
         MessageConsumeData messageConsumeData = (MessageConsumeData) ctx;
-        TopicSubscriber topicSubscriber = new TopicSubscriber(messageConsumeData.topic, messageConsumeData.subscriberId);
-        logger.error("Message was not consumed successfully by client MessageHandler: " + messageConsumeData);
+        logger.error("Message was not consumed successfully by client MessageHandler: {}",
+                     messageConsumeData);
 
         // Sleep a pre-configured amount of time (in milliseconds) before we
         // do the retry. In the future, we can have more dynamic logic on
         // what duration to sleep based on how many times we've retried, or
         // perhaps what the last amount of time we slept was. We could stick
         // some of this meta-data into the MessageConsumeData when we retry.
-        client.getClientTimer().schedule(new MessageConsumeRetryTask(messageConsumeData, topicSubscriber),
-                                         client.getConfiguration().getMessageConsumeRetryWaitTime());
+        channelManager.schedule(new MessageConsumeRetryTask(messageConsumeData),
+                                consumeRetryWaitTime);
     }
 
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java Wed Sep 26 23:52:18 2012
@@ -17,32 +17,29 @@
  */
 package org.apache.hedwig.client.handlers;
 
-import org.apache.hedwig.protocol.PubSubProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.jboss.netty.channel.Channel;
 
+import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.netty.HedwigClientImpl;
-import org.apache.hedwig.client.netty.ResponseHandler;
+import org.apache.hedwig.client.netty.HChannelManager;
 import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
 
-public class PublishResponseHandler {
+public class PublishResponseHandler extends AbstractResponseHandler {
 
     private static Logger logger = LoggerFactory.getLogger(PublishResponseHandler.class);
 
-    private final ResponseHandler responseHandler;
-
-    public PublishResponseHandler(ResponseHandler responseHandler) {
-        this.responseHandler = responseHandler;
+    public PublishResponseHandler(ClientConfiguration cfg,
+                                  HChannelManager channelManager) {
+        super(cfg, channelManager);
     }
 
-    // Main method to handle Publish Response messages from the server.
-    public void handlePublishResponse(PubSubResponse response, PubSubData pubSubData, Channel channel) throws Exception {
-        if (logger.isDebugEnabled())
-            logger.debug("Handling a Publish response: " + response + ", pubSubData: " + pubSubData + ", host: "
-                         + HedwigClientImpl.getHostFromChannel(channel));
+    @Override
+    public void handleResponse(PubSubResponse response, PubSubData pubSubData,
+                               Channel channel) throws Exception {
         switch (response.getStatusCode()) {
         case SUCCESS:
             // Response was success so invoke the callback's operationFinished
@@ -59,7 +56,7 @@ public class PublishResponseHandler {
         case NOT_RESPONSIBLE_FOR_TOPIC:
             // Redirect response so we'll need to repost the original Publish
             // Request
-            responseHandler.handleRedirectResponse(response, pubSubData, channel);
+            handleRedirectResponse(response, pubSubData, channel);
             break;
         default:
             // Consider all other status codes as errors, operation failed

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Wed Sep 26 23:52:18 2012
@@ -17,244 +17,59 @@
  */
 package org.apache.hedwig.client.handlers;
 
-import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
+import java.net.InetSocketAddress;
 
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.jboss.netty.channel.Channel;
 
 import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.data.MessageConsumeData;
-import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.netty.HedwigClientImpl;
-import org.apache.hedwig.client.netty.ResponseHandler;
-import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
+import org.apache.hedwig.client.netty.HChannelManager;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
 import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
 import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.util.Callback;
 
-public class SubscribeResponseHandler {
-
-    private static Logger logger = LoggerFactory.getLogger(SubscribeResponseHandler.class);
-
-    private final ResponseHandler responseHandler;
-
-    // Member variables used when this ResponseHandler is for a Subscribe
-    // channel. We need to be able to consume messages sent back to us from
-    // the server, and to also recreate the Channel connection if it ever goes
-    // down. For that, we need to store the original PubSubData for the
-    // subscribe request, and also the MessageHandler that was registered when
-    // delivery of messages started for the subscription.
-    private PubSubData origSubData;
-    private Channel subscribeChannel;
-    private MessageHandler messageHandler;
-    // Counter for the number of consumed messages so far to buffer up before we
-    // send the Consume message back to the server along with the last/largest
-    // message seq ID seen so far in that batch.
-    private int numConsumedMessagesInBuffer = 0;
-    private MessageSeqId lastMessageSeqId;
-    // Queue used for subscribes when the MessageHandler hasn't been registered
-    // yet but we've already received subscription messages from the server.
-    // This will be lazily created as needed.
-    private Queue<Message> subscribeMsgQueue;
-    // Set to store all of the outstanding subscribed messages that are pending
-    // to be consumed by the client app's MessageHandler. If this ever grows too
-    // big (e.g. problem at the client end for message consumption), we can
-    // throttle things by temporarily setting the Subscribe Netty Channel
-    // to not be readable. When the Set has shrunk sufficiently, we can turn the
-    // channel back on to read new messages.
-    private Set<Message> outstandingMsgSet;
-
-    public SubscribeResponseHandler(ResponseHandler responseHandler) {
-        this.responseHandler = responseHandler;
-    }
-
-    // Public getter to retrieve the original PubSubData used for the Subscribe
-    // request.
-    synchronized public PubSubData getOrigSubData() {
-        return origSubData;
-    }
+/**
+ * An interface provided to manage all subscriptions on a channel.
+ *
+ * Its responsibility is to handle all subscribe responses received on that channel,
+ * clear up subscriptions and retry reconnectin subscriptions when channel disconnected,
+ * and handle delivering messages to {@link MessageHandler} and sent consume messages
+ * back to hub servers.
+ */
+public abstract class SubscribeResponseHandler extends AbstractResponseHandler {
 
-    // Main method to handle Subscribe responses from the server that we sent
-    // a Subscribe Request to.
-    public void handleSubscribeResponse(PubSubResponse response, PubSubData pubSubData, Channel channel)
-            throws Exception {
-        // If this was not a successful response to the Subscribe request, we
-        // won't be using the Netty Channel created so just close it.
-        if (!response.getStatusCode().equals(StatusCode.SUCCESS)) {
-            try {
-                HedwigClientImpl.getResponseHandlerFromChannel(channel).handleChannelClosedExplicitly();
-            } catch (NoResponseHandlerException e) {
-                // Log an error. But should we also return and not process anything further?
-                logger.error("No response handler found while trying to close channel explicitly while handling a " +
-                        "failed subscription response.", e);
-                // Continue closing the channel because this is an unexpected event and state should be reset.
-            }
-            channel.close();
-        }
-
-        if (logger.isDebugEnabled())
-            logger.debug("Handling a Subscribe response: " + response + ", pubSubData: " + pubSubData + ", host: "
-                         + HedwigClientImpl.getHostFromChannel(channel));
-        switch (response.getStatusCode()) {
-        case SUCCESS:
-            synchronized(this) {
-                // For successful Subscribe requests, store this Channel locally
-                // and set it to not be readable initially.
-                // This way we won't be delivering messages for this topic
-                // subscription until the client explicitly says so.
-                subscribeChannel = channel;
-                subscribeChannel.setReadable(false);
-                // Store the original PubSubData used to create this successful
-                // Subscribe request.
-                origSubData = pubSubData;
-
-                SubscriptionPreferences preferences = null;
-                if (response.hasResponseBody()) {
-                    ResponseBody respBody = response.getResponseBody();
-                    if (respBody.hasSubscribeResponse()) {
-                        SubscribeResponse resp = respBody.getSubscribeResponse();
-                        if (resp.hasPreferences()) {
-                            preferences = resp.getPreferences();
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("Receive subscription preferences for (topic:" + pubSubData.topic.toStringUtf8()
-                                           + ", subscriber:" + pubSubData.subscriberId.toStringUtf8() + ") :"
-                                           + SubscriptionStateUtils.toString(preferences));
-                            }
-                        }
-                    }
-                }
-
-                // Store the mapping for the TopicSubscriber to the Channel.
-                // This is so we can control the starting and stopping of
-                // message deliveries from the server on that Channel. Store
-                // this only on a successful ack response from the server.
-                TopicSubscriber topicSubscriber = new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId);
-                responseHandler.getSubscriber().setChannelAndPreferencesForTopic(topicSubscriber, channel, preferences);
-                // Lazily create the Set (from a concurrent hashmap) to keep track
-                // of outstanding Messages to be consumed by the client app. At this
-                // stage, delivery for that topic hasn't started yet so creation of
-                // this Set should be thread safe. We'll create the Set with an initial
-                // capacity equal to the configured parameter for the maximum number of
-                // outstanding messages to allow. The load factor will be set to
-                // 1.0f which means we'll only rehash and allocate more space if
-                // we ever exceed the initial capacity. That should be okay
-                // because when that happens, things are slow already and piling
-                // up on the client app side to consume messages.
-                outstandingMsgSet = Collections.newSetFromMap(
-                        new ConcurrentHashMap<Message,Boolean>(
-                                responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f));
-            }
-            // Response was success so invoke the callback's operationFinished
-            // method.
-            pubSubData.getCallback().operationFinished(pubSubData.context, null);
-            break;
-        case CLIENT_ALREADY_SUBSCRIBED:
-            // For Subscribe requests, the server says that the client is
-            // already subscribed to it.
-            pubSubData.getCallback().operationFailed(pubSubData.context, new ClientAlreadySubscribedException(
-                                                    "Client is already subscribed for topic: " +
-                                                        pubSubData.topic.toStringUtf8() + ", subscriberId: " +
-                                                        pubSubData.subscriberId.toStringUtf8()));
-            break;
-        case SERVICE_DOWN:
-            // Response was service down failure so just invoke the callback's
-            // operationFailed method.
-            pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
-                                                    "Server responded with a SERVICE_DOWN status"));
-            break;
-        case NOT_RESPONSIBLE_FOR_TOPIC:
-            // Redirect response so we'll need to repost the original Subscribe
-            // Request
-            responseHandler.handleRedirectResponse(response, pubSubData, channel);
-            break;
-        default:
-            // Consider all other status codes as errors, operation failed
-            // cases.
-            logger.error("Unexpected error response from server for PubSubResponse: " + response);
-            pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
-                                                    "Server responded with a status code of: " +
-                                                        response.getStatusCode()));
-            break;
-        }
+    protected SubscribeResponseHandler(ClientConfiguration cfg,
+                                       HChannelManager channelManager) {
+        super(cfg, channelManager);
     }
 
-    // Main method to handle consuming a message for a topic that the client is
-    // subscribed to.
-    public void handleSubscribeMessage(PubSubResponse response) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Handling a Subscribe message in response: {}, topic: {}, subscriberId: {}",
-                    new Object[] { response, getOrigSubData().topic.toStringUtf8(),
-                                   getOrigSubData().subscriberId.toStringUtf8() });
-        }
-        Message message = response.getMessage();
-
-        synchronized (this) {
-            // Consume the message asynchronously that the client is subscribed
-            // to. Do this only if delivery for the subscription has started and
-            // a MessageHandler has been registered for the TopicSubscriber.
-            if (messageHandler != null) {
-                asyncMessageConsume(message);
-            } else {
-                // MessageHandler has not yet been registered so queue up these
-                // messages for the Topic Subscription. Make the initial lazy
-                // creation of the message queue thread safe just so we don't
-                // run into a race condition where two simultaneous threads process
-                // a received message and both try to create a new instance of
-                // the message queue. Performance overhead should be okay
-                // because the delivery of the topic has not even started yet
-                // so these messages are not consumed and just buffered up here.
-                if (subscribeMsgQueue == null)
-                    subscribeMsgQueue = new LinkedList<Message>();
-                logger.debug("Message has arrived but Subscribe channel does not have a registered "
-                    + "MessageHandler yet so queueing up the message: {}", message);
-                subscribeMsgQueue.add(message);
-            }
-        }
-    }
+    /**
+     * Handle Message delivered by the server.
+     *
+     * @param response
+     *          Message received from the server.
+     */
+    public abstract void handleSubscribeMessage(PubSubResponse response);
 
     /**
      * Method called when a message arrives for a subscribe Channel and we want
-     * to consume it asynchronously via the registered MessageHandler (should
+     * to deliver it asynchronously via the registered MessageHandler (should
      * not be null when called here).
      *
      * @param message
      *            Message from Subscribe Channel we want to consume.
      */
-    protected void asyncMessageConsume(Message message) {
-        if (logger.isDebugEnabled())
-            logger.debug("Call the client app's MessageHandler asynchronously to consume the message: " + message
-                         + ", topic: " + origSubData.topic.toStringUtf8() + ", subscriberId: "
-                         + origSubData.subscriberId.toStringUtf8());
-        // Add this "pending to be consumed" message to the outstandingMsgSet.
-        outstandingMsgSet.add(message);
-        // Check if we've exceeded the max size for the outstanding message set.
-        if (outstandingMsgSet.size() >= responseHandler.getConfiguration().getMaximumOutstandingMessages()
-                && subscribeChannel.isReadable()) {
-            // Too many outstanding messages so throttle it by setting the Netty
-            // Channel to not be readable.
-            if (logger.isDebugEnabled())
-                logger.debug("Too many outstanding messages (" + outstandingMsgSet.size()
-                             + ") so throttling the subscribe netty Channel");
-            subscribeChannel.setReadable(false);
-        }
-        MessageConsumeData messageConsumeData = new MessageConsumeData(origSubData.topic, origSubData.subscriberId,
-                message);
-        messageHandler.deliver(origSubData.topic, origSubData.subscriberId, message, responseHandler.getClient()
-                .getConsumeCallback(), messageConsumeData);
-    }
+    protected abstract void asyncMessageDeliver(TopicSubscriber topicSubscriber,
+                                                Message message);
 
     /**
      * Method called when the client app's MessageHandler has asynchronously
@@ -267,100 +82,85 @@ public class SubscribeResponseHandler {
      * could be consumed by the client app and then called back to here, make
      * this method synchronized.
      *
+     * @param topicSubscriber
+     *            Topic Subscriber
      * @param message
      *            Message sent from server for topic subscription that has been
      *            consumed by the client.
      */
-    protected synchronized void messageConsumed(Message message) {
-        if (logger.isDebugEnabled())
-            logger.debug("Message has been successfully consumed by the client app for message: " + message
-                         + ", topic: " + origSubData.topic.toStringUtf8() + ", subscriberId: "
-                         + origSubData.subscriberId.toStringUtf8());
-        // Update the consumed messages buffer variables
-        if (responseHandler.getConfiguration().isAutoSendConsumeMessageEnabled()) {
-            // Update these variables only if we are auto-sending consume
-            // messages to the server. Otherwise the onus is on the client app
-            // to call the Subscriber consume API to let the server know which
-            // messages it has successfully consumed.
-            numConsumedMessagesInBuffer++;
-            lastMessageSeqId = message.getMsgId();
-        }
-        // Remove this consumed message from the outstanding Message Set.
-        outstandingMsgSet.remove(message);
-
-        // For consume response to server, there is a config param on how many
-        // messages to consume and buffer up before sending the consume request.
-        // We just need to keep a count of the number of messages consumed
-        // and the largest/latest msg ID seen so far in this batch. Messages
-        // should be delivered in order and without gaps. Do this only if
-        // auto-sending of consume messages is enabled.
-        if (responseHandler.getConfiguration().isAutoSendConsumeMessageEnabled()
-                && numConsumedMessagesInBuffer >= responseHandler.getConfiguration().getConsumedMessagesBufferSize()) {
-            // Send the consume request and reset the consumed messages buffer
-            // variables. We will use the same Channel created from the
-            // subscribe request for the TopicSubscriber.
-            logger.debug("Consumed message buffer limit reached so send the Consume Request to the "
-                + "server with lastMessageSeqId: {}", lastMessageSeqId);
-            responseHandler.getSubscriber().doConsume(origSubData, subscribeChannel, lastMessageSeqId);
-            numConsumedMessagesInBuffer = 0;
-            lastMessageSeqId = null;
-        }
-
-        // Check if we throttled message consumption previously when the
-        // outstanding message limit was reached. For now, only turn the
-        // delivery back on if there are no more outstanding messages to
-        // consume. We could make this a configurable parameter if needed.
-        if (!subscribeChannel.isReadable() && outstandingMsgSet.isEmpty()) {
-            if (logger.isDebugEnabled())
-                logger.debug("Message consumption has caught up so okay to turn off throttling of " +
-                    "messages on the subscribe channel for topic: " + origSubData.topic.toStringUtf8()
-                       + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
-            subscribeChannel.setReadable(true);
-        }
-    }
+    protected abstract void messageConsumed(TopicSubscriber topicSubscriber,
+                                            Message message);
 
     /**
-     * Setter used for Subscribe flows when delivery for the subscription is
-     * started. This is used to register the MessageHandler needed to consumer
-     * the subscribed messages for the topic.
+     * Start delivering messages for a given topic subscriber.
      *
+     * @param topicSubscriber
+     *            Topic Subscriber
      * @param messageHandler
      *            MessageHandler to register for this ResponseHandler instance.
+     * @throws ClientNotSubscribedException
+     *            If the client is not currently subscribed to the topic
+     * @throws AlreadyStartDeliveryException
+     *            If someone started delivery a message handler before stopping existed one.
      */
-    public void setMessageHandler(MessageHandler messageHandler) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Setting the messageHandler for topic: {}, subscriberId: {}",
-                         getOrigSubData().topic.toStringUtf8(),
-                         getOrigSubData().subscriberId.toStringUtf8());
-        }
-        synchronized (this) {
-            this.messageHandler = messageHandler;
-            // Once the MessageHandler is registered, see if we have any queued up
-            // subscription messages sent to us already from the server. If so,
-            // consume those first. Do this only if the MessageHandler registered is
-            // not null (since that would be the HedwigSubscriber.stopDelivery
-            // call).
-            if (messageHandler != null && subscribeMsgQueue != null && subscribeMsgQueue.size() > 0) {
-                if (logger.isDebugEnabled())
-                    logger.debug("Consuming " + subscribeMsgQueue.size() + " queued up messages for topic: "
-                                 + origSubData.topic.toStringUtf8() + ", subscriberId: "
-                                 + origSubData.subscriberId.toStringUtf8());
-                for (Message message : subscribeMsgQueue) {
-                    asyncMessageConsume(message);
-                }
-                // Now we can remove the queued up messages since they are all
-                // consumed.
-                subscribeMsgQueue.clear();
-            }
-        }
-    }
+    public abstract void startDelivery(TopicSubscriber topicSubscriber,
+                                       MessageHandler messageHandler)
+    throws ClientNotSubscribedException, AlreadyStartDeliveryException;
 
     /**
-     * Getter for the MessageHandler that is set for this subscribe channel.
+     * Stop delivering messages for a given topic subscriber.
      *
-     * @return The MessageHandler for consuming messages
+     * @param topicSubscriber
+     *            Topic Subscriber
+     * @throws ClientNotSubscribedException
+     *             If the client is not currently subscribed to the topic
      */
-    public MessageHandler getMessageHandler() {
-        return messageHandler;
-    }
+    public abstract void stopDelivery(TopicSubscriber topicSubscriber)
+    throws ClientNotSubscribedException;
+
+    /**
+     * Whether the given topic subscriber subscribed thru this handler.
+     *
+     * @param topicSubscriber
+     *            Topic Subscriber
+     * @return whether the given topic subscriber subscribed thru this handler.
+     */
+    public abstract boolean hasSubscription(TopicSubscriber topicSubscriber);
+
+    /**
+     * Close subscription from this handler.
+     *
+     * @param topicSubscriber
+     *            Topic Subscriber
+     * @param callback
+     *            Callback when the subscription is closed. 
+     * @param context
+     *            Callback context.
+     */
+    public abstract void asyncCloseSubscription(TopicSubscriber topicSubscriber,
+                                                Callback<ResponseBody> callback,
+                                                Object context);
+
+    /**
+     * Consume a given message for given topic subscriber thru this handler.
+     *
+     * @param topicSubscriber
+     *            Topic Subscriber
+     */
+    public abstract void consume(TopicSubscriber topicSubscriber,
+                                 MessageSeqId messageSeqId);
+
+    /**
+     * This method is called when the underlying channel is disconnected due to server failure.
+     *
+     * The implementation should take the responsibility to clear subscriptions and retry
+     * reconnecting subscriptions to new hub servers.
+     *
+     * @param host
+     *          Host that channel connected to has disconnected.
+     * @param channel
+     *          Channel connected to.
+     */
+    public abstract void onChannelDisconnected(InetSocketAddress host,
+                                               Channel channel);
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java Wed Sep 26 23:52:18 2012
@@ -21,36 +21,35 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.jboss.netty.channel.Channel;
 
+import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.netty.HedwigClientImpl;
-import org.apache.hedwig.client.netty.ResponseHandler;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.netty.HChannelManager;
+import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
 import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
 
-public class UnsubscribeResponseHandler {
+public class UnsubscribeResponseHandler extends AbstractResponseHandler {
 
     private static Logger logger = LoggerFactory.getLogger(UnsubscribeResponseHandler.class);
 
-    private final ResponseHandler responseHandler;
-
-    public UnsubscribeResponseHandler(ResponseHandler responseHandler) {
-        this.responseHandler = responseHandler;
+    public UnsubscribeResponseHandler(ClientConfiguration cfg,
+                                      HChannelManager channelManager) {
+        super(cfg, channelManager);
     }
 
-    // Main method to handle Unsubscribe Response messages from the server.
-    public void handleUnsubscribeResponse(PubSubResponse response, PubSubData pubSubData, Channel channel)
+    @Override
+    public void handleResponse(final PubSubResponse response, final PubSubData pubSubData,
+                               final Channel channel)
             throws Exception {
-        if (logger.isDebugEnabled())
-            logger.debug("Handling an Unsubscribe response: " + response + ", pubSubData: " + pubSubData + ", host: "
-                         + HedwigClientImpl.getHostFromChannel(channel));
         switch (response.getStatusCode()) {
         case SUCCESS:
-            // For successful Unsubscribe requests, we can now safely close the
-            // Subscribe Channel and any cached data for that TopicSubscriber.
-            responseHandler.getSubscriber().closeSubscription(pubSubData.topic, pubSubData.subscriberId);
-            // Response was success so invoke the callback's operationFinished
-            // method.
+            // since for unsubscribe request, we close subscription first
+            // for now, we don't need to do anything now.
             pubSubData.getCallback().operationFinished(pubSubData.context, null);
             break;
         case CLIENT_NOT_SUBSCRIBED:
@@ -70,7 +69,7 @@ public class UnsubscribeResponseHandler 
         case NOT_RESPONSIBLE_FOR_TOPIC:
             // Redirect response so we'll need to repost the original
             // Unsubscribe Request
-            responseHandler.handleRedirectResponse(response, pubSubData, channel);
+            handleRedirectResponse(response, pubSubData, channel);
             break;
         default:
             // Consider all other status codes as errors, operation failed

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CleanupChannelMap<T> {
+
+    private static Logger logger = LoggerFactory.getLogger(CleanupChannelMap.class);
+    
+    private final ConcurrentHashMap<T, HChannel> channels;
+
+    // Boolean indicating if the channel map is closed or not.
+    protected boolean closed = false;
+    protected final ReentrantReadWriteLock closedLock =
+        new ReentrantReadWriteLock();
+
+    public CleanupChannelMap() {
+        channels = new ConcurrentHashMap<T, HChannel>();
+    }
+
+    /**
+     * Add channel to the map. If an old channel has been bound
+     * to <code>key</code>, the <code>channel</code> would be
+     * closed immediately and the old channel is returned. Otherwise,
+     * the <code>channel</code> is put in the map for future usage.
+     *
+     * If the channel map has been closed, the channel would be closed
+     * immediately.
+     *
+     * @param key
+     *            Key
+     * @param channel
+     *            Channel
+     * @return the channel instance to use.
+     */
+    public HChannel addChannel(T key, HChannel channel) {
+        this.closedLock.readLock().lock();
+        try {
+            if (closed) {
+                channel.close();
+                return channel;
+            }
+            HChannel oldChannel = channels.putIfAbsent(key, channel);
+            if (null != oldChannel) {
+                logger.info("Channel for {} already exists, so no need to store it.", key);
+                channel.close();
+                return oldChannel;
+            } else {
+                logger.debug("Storing a new channel for {}.", key);
+                return channel;
+            }
+        } finally {
+            this.closedLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Returns the channel bound with <code>key</code>.
+     *
+     * @param key Key
+     * @return the channel bound with <code>key</code>.
+     */
+    public HChannel getChannel(T key) {
+        return channels.get(key);
+    }
+
+    /**
+     * Remove the channel bound with <code>key</code>.
+     *
+     * @param key Key
+     * @return the channel bound with <code>key</code>, null if no channel
+     *         is bound with <code>key</code>.
+     */
+    public HChannel removeChannel(T key) {
+        return channels.remove(key);
+    }
+
+    /**
+     * Remove the channel bound with <code>key</code>.
+     *
+     * @param key Key
+     * @param channel The channel expected to be bound with <code>key</code>.
+     * @return true if the channel is removed, false otherwise.
+     */
+    public boolean removeChannel(T key, HChannel channel) {
+        return channels.remove(key, channel);
+    }
+
+    /**
+     * Return the channels in the map.
+     *
+     * @return the set of channels.
+     */
+    public Collection<HChannel> getChannels() {
+        return channels.values();
+    }
+
+    /**
+     * Close the channels map.
+     */
+    public void close() {
+        closedLock.writeLock().lock();
+        try {
+            if (closed) {
+                return;
+            }
+            closed = true;
+        } finally {
+            closedLock.writeLock().unlock();
+        }
+        logger.debug("Closing channels map.");
+        for (HChannel channel : channels.values()) {
+            channel.close(true);
+        }
+        channels.clear();
+        logger.debug("Closed channels map.");
+    }
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java Wed Sep 26 23:52:18 2012
@@ -27,7 +27,7 @@ import org.apache.hedwig.util.Callback;
 /**
  * Handlers used by a subscription.
  */
-class FilterableMessageHandler implements MessageHandler {
+public class FilterableMessageHandler implements MessageHandler {
 
     MessageHandler msgHandler;
     ClientMessageFilter  msgFilter;

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.apache.hedwig.client.data.PubSubData;
+
+/**
+ * A wrapper interface over netty {@link Channel} to submit hedwig's
+ * {@link PubSubData} requests.
+ */
+public interface HChannel {
+
+    /**
+     * Submit a pub/sub request.
+     *
+     * @param op
+     *          Pub/Sub Request.
+     */
+    public void submitOp(PubSubData op);
+
+    /**
+     * @return underlying netty channel
+     */
+    public Channel getChannel();
+
+    /**
+     * Close the channel without waiting.
+     */
+    public void close();
+
+    /**
+     * Close the channel
+     *
+     * @param wait
+     *          Whether wait until the channel is closed.
+     */
+    public void close(boolean wait);
+}

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.net.InetSocketAddress;
+import java.util.TimerTask;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * A manager manages 1) all channels established to hub servers,
+ * 2) the actions taken by the topic subscribers.
+ */
+public interface HChannelManager {
+
+    /**
+     * Submit a pub/sub request after a given <code>delay</code>.
+     *
+     * @param op
+     *          Pub/Sub Request.
+     * @param delay
+     *          Delay time in ms.
+     */
+    public void submitOpAfterDelay(PubSubData op, long delay);
+
+    /**
+     * Submit a pub/sub request.
+     *
+     * @param pubSubData
+     *          Pub/Sub Request.
+     */
+    public void submitOp(PubSubData pubSubData);
+
+    /**
+     * Submit a pub/sub request to default server.
+     *
+     * @param pubSubData
+     *           Pub/Sub request.
+     */
+    public void submitOpToDefaultServer(PubSubData pubSubData);
+
+    /**
+     * Submit a pub/sub request to a given host.
+     *
+     * @param pubSubData
+     *          Pub/Sub request.
+     * @param host
+     *          Given host address.
+     */
+    public void redirectToHost(PubSubData pubSubData, InetSocketAddress host);
+
+    /**
+     * Generate next transaction id for pub/sub request sending thru this manager.
+     *
+     * @return next transaction id.
+     */
+    public long nextTxnId();
+
+    /**
+     * Schedule a timer task after a given <code>delay</code>.
+     *
+     * @param task
+     *          A timer task
+     * @param delay
+     *          Delay time in ms.
+     */
+    public void schedule(TimerTask task, long delay);
+
+    /**
+     * Get the subscribe response handler managed the given <code>topicSubscriber</code>.
+     *
+     * @param topicSubscriber
+     *          Topic Subscriber
+     * @return subscribe response handler managed it, otherwise return null.
+     */
+    public SubscribeResponseHandler getSubscribeResponseHandler(
+                                    TopicSubscriber topicSubscriber);
+
+    /**
+     * Start delivering messages for a given topic subscriber.
+     *
+     * @param topicSubscriber
+     *            Topic Subscriber
+     * @param messageHandler
+     *            MessageHandler to register for this ResponseHandler instance.
+     * @throws ClientNotSubscribedException
+     *            If the client is not currently subscribed to the topic
+     * @throws AlreadyStartDeliveryException
+     *            If someone started delivery a message handler before stopping existed one.
+     */
+    public void startDelivery(TopicSubscriber topicSubscriber,
+                              MessageHandler messageHandler)
+    throws ClientNotSubscribedException, AlreadyStartDeliveryException;
+
+    /**
+     * Stop delivering messages for a given topic subscriber.
+     *
+     * @param topicSubscriber
+     *            Topic Subscriber
+     * @throws ClientNotSubscribedException
+     *             If the client is not currently subscribed to the topic
+     */
+    public void stopDelivery(TopicSubscriber topicSubscriber)
+    throws ClientNotSubscribedException;
+
+    /**
+     * Close the subscription of the given <code>topicSubscriber</code>.
+     *
+     * @param topicSubscriber
+     *          Topic Subscriber
+     * @param callback
+     *          Callback
+     * @param context
+     *          Callback context
+     */
+    public void asyncCloseSubscription(TopicSubscriber topicSubscriber,
+                                       Callback<ResponseBody> callback,
+                                       Object context);
+
+    /**
+     * Return the subscription event emitter to emit subscription events.
+     *
+     * @return subscription event emitter.
+     */
+    public SubscriptionEventEmitter getSubscriptionEventEmitter();
+
+    /**
+     * Is the channel manager closed.
+     *
+     * @return true if the channel manager is closed, otherwise return false.
+     */
+    public boolean isClosed();
+
+    /**
+     * Close the channel manager.
+     */
+    public void close();
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java Wed Sep 26 23:52:18 2012
@@ -17,33 +17,18 @@
  */
 package org.apache.hedwig.client.netty;
 
-import java.net.InetSocketAddress;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 
 import com.google.protobuf.ByteString;
 
-import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.client.api.Client;
 import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.handlers.MessageConsumeCallback;
-import org.apache.hedwig.client.ssl.SslClientContextFactory;
-import org.apache.hedwig.exceptions.PubSubException.UncertainStateException;
+import org.apache.hedwig.client.netty.impl.simple.SimpleHChannelManager;
 
 /**
  * This is a top level Hedwig Client class that encapsulates the common
@@ -54,49 +39,18 @@ public class HedwigClientImpl implements
 
     private static final Logger logger = LoggerFactory.getLogger(HedwigClientImpl.class);
 
-    // Empty Topic List
-    private ConcurrentLinkedQueue<ByteString> EMPTY_TOPIC_LIST =
-        new ConcurrentLinkedQueue<ByteString>();
-
-    // Global counter used for generating unique transaction ID's for
-    // publish and subscribe requests
-    protected final AtomicLong globalCounter = new AtomicLong();
-    // Static String constants
-    protected static final String COLON = ":";
-
     // The Netty socket factory for making connections to the server.
     protected final ChannelFactory socketFactory;
     // Whether the socket factory is one we created or is owned by whoever
     // instantiated us.
     protected boolean ownChannelFactory = false;
 
-    // PipelineFactory to create netty client channels to the appropriate server
-    private ClientChannelPipelineFactory pipelineFactory;
-
-    // Concurrent Map to store the mapping from the Topic to the Host.
-    // This could change over time since servers can drop mastership of topics
-    // for load balancing or failover. If a server host ever goes down, we'd
-    // also want to remove all topic mappings the host was responsible for.
-    // The second Map is used as the inverted version of the first one.
-    protected final ConcurrentMap<ByteString, InetSocketAddress> topic2Host = new ConcurrentHashMap<ByteString, InetSocketAddress>();
-    private final ConcurrentMap<InetSocketAddress, ConcurrentLinkedQueue<ByteString>> host2Topics =
-        new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<ByteString>>();
-
-    // Each client instantiation will have a Timer for running recurring
-    // threads. One such timer task thread to is to timeout long running
-    // PubSubRequests that are waiting for an ack response from the server.
-    private final Timer clientTimer = new Timer(true);
-
-    // Boolean indicating if the client is running or has stopped.
-    // Once we stop the client, we should sidestep all of the connect,
-    // write callback and channel disconnected logic.
-    private boolean isStopped = false;
+    // channel manager manages all the channels established by the client
+    protected final HChannelManager channelManager;
 
     private HedwigSubscriber sub;
     private final HedwigPublisher pub;
     private final ClientConfiguration cfg;
-    private final MessageConsumeCallback consumeCb;
-    private SslClientContextFactory sslFactory = null;
 
     public static Client create(ClientConfiguration cfg) {
         return new HedwigClientImpl(cfg);
@@ -109,7 +63,8 @@ public class HedwigClientImpl implements
     // Base constructor that takes in a Configuration object.
     // This will create its own client socket channel factory.
     protected HedwigClientImpl(ClientConfiguration cfg) {
-        this(cfg, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
+        this(cfg, new NioClientSocketChannelFactory(
+                  Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
         ownChannelFactory = true;
     }
 
@@ -118,23 +73,20 @@ public class HedwigClientImpl implements
     protected HedwigClientImpl(ClientConfiguration cfg, ChannelFactory socketFactory) {
         this.cfg = cfg;
         this.socketFactory = socketFactory;
+        channelManager = new SimpleHChannelManager(cfg, socketFactory);
+
         pub = new HedwigPublisher(this);
         sub = new HedwigSubscriber(this);
-        pipelineFactory = new ClientChannelPipelineFactory(this);
-        consumeCb = new MessageConsumeCallback(this);
-        if (cfg.isSSLEnabled()) {
-            sslFactory = new SslClientContextFactory(cfg);
-        }
-        // Schedule all of the client timer tasks. Currently we only have the
-        // Request Timeout task.
-        clientTimer.schedule(new PubSubRequestTimeoutTask(), 0, cfg.getTimeoutThreadRunInterval());
     }
 
-    // Public getters for the various components of a client.
     public ClientConfiguration getConfiguration() {
         return cfg;
     }
 
+    public HChannelManager getHChannelManager() {
+        return channelManager;
+    }
+
     public HedwigSubscriber getSubscriber() {
         return sub;
     }
@@ -149,96 +101,14 @@ public class HedwigClientImpl implements
         return pub;
     }
 
-    public MessageConsumeCallback getConsumeCallback() {
-        return consumeCb;
-    }
-
-    public SslClientContextFactory getSslFactory() {
-        return sslFactory;
-    }
-
-    // We need to deal with the possible problem of a PubSub request being
-    // written to successfully to the server host but for some reason, the
-    // ack message back never comes. What could happen is that the VoidCallback
-    // stored in the ResponseHandler.txn2PublishData map will never be called.
-    // We should have a configured timeout so if that passes from the time a
-    // write was successfully done to the server, we can fail this async PubSub
-    // transaction. The caller could possibly redo the transaction if needed at
-    // a later time. Creating a timeout cleaner TimerTask to do this here.
-    class PubSubRequestTimeoutTask extends TimerTask {
-        /**
-         * Implement the TimerTask's abstract run method.
-         */
-        @Override
-        public void run() {
-            logger.debug("Running the PubSubRequest Timeout Task");
-            // Loop through all outstanding PubSubData requests and check if
-            // the requestWriteTime has timed out compared to the current time.
-            long curTime = MathUtils.now();
-            long timeoutInterval = cfg.getServerAckResponseTimeout();
-
-            // First check the ResponseHandlers associated with cached
-            // channels in HedwigPublisher.host2Channel. This stores the
-            // channels used for Publish and Unsubscribe requests.
-            for (Channel channel : pub.host2Channel.values()) {
-                ResponseHandler responseHandler = null;
-                try {
-                    responseHandler = getResponseHandlerFromChannel(channel);
-                } catch (NoResponseHandlerException e) {
-                    logger.warn("No response handler found for channel" + channel + " in the retry timeout task.", e);
-                    continue;
-                }
-                for (PubSubData pubSubData : responseHandler.txn2PubSubData.values()) {
-                    checkPubSubDataToTimeOut(pubSubData, responseHandler, curTime, timeoutInterval);
-                }
-            }
-            // Now do the same for the cached channels in
-            // HedwigSubscriber.topicSubscriber2Channel. This stores the
-            // channels used exclusively for Subscribe requests.
-            for (Channel channel : sub.topicSubscriber2Channel.values()) {
-                ResponseHandler responseHandler = null;
-                try {
-                    responseHandler = getResponseHandlerFromChannel(channel);
-                } catch (NoResponseHandlerException e) {
-                    logger.warn("No response handler found for channel" + channel + " in the retry timeout task.", e);
-                    continue;
-                }
-                for (PubSubData pubSubData : responseHandler.txn2PubSubData.values()) {
-                    checkPubSubDataToTimeOut(pubSubData, responseHandler, curTime, timeoutInterval);
-                }
-            }
-        }
-
-        private void checkPubSubDataToTimeOut(PubSubData pubSubData, ResponseHandler responseHandler, long curTime,
-                                              long timeoutInterval) {
-            if (curTime > pubSubData.requestWriteTime + timeoutInterval) {
-                // Current PubSubRequest has timed out so remove it from the
-                // ResponseHandler's map and invoke the VoidCallback's
-                // operationFailed method.
-                logger.error("Current PubSubRequest has timed out for pubSubData: " + pubSubData);
-                responseHandler.txn2PubSubData.remove(pubSubData.txnId);
-                pubSubData.getCallback().operationFailed(pubSubData.context,
-                    new UncertainStateException("Server ack response never received so PubSubRequest has timed out!"));
-            }
-        }
-    }
-
     // When we are done with the client, this is a clean way to gracefully close
     // all channels/sockets created by the client and to also release all
     // resources used by netty.
     public void close() {
         logger.info("Stopping the client!");
-        // Set the client boolean flag to indicate the client has stopped.
-        isStopped = true;
-        // Stop the timer and all timer task threads.
-        clientTimer.cancel();
-
-        pub.close();
-        sub.close();
-
-        // Clear out all Maps.
-        topic2Host.clear();
-        host2Topics.clear();
+
+        // close channel manager to release all channels
+        channelManager.close(); 
 
         // Release resources used by the ChannelFactory on the client if we are
         // the owner that created it.
@@ -248,170 +118,4 @@ public class HedwigClientImpl implements
         logger.info("Completed stopping the client!");
     }
 
-    /**
-     * This is a helper method to do the connect attempt to the server given the
-     * inputted host/port. This can be used to connect to the default server
-     * host/port which is the VIP. That will pick a server in the cluster at
-     * random to connect to for the initial PubSub attempt (with redirect logic
-     * being done at the server side). Additionally, this could be called after
-     * the client makes an initial PubSub attempt at a server, and is redirected
-     * to the one that is responsible for the topic. Once the connect to the
-     * server is done, we will perform the corresponding PubSub write on that
-     * channel.
-     *
-     * @param pubSubData
-     *            PubSub call's data wrapper object
-     * @param serverHost
-     *            Input server host to connect to of type InetSocketAddress
-     */
-    public void doConnect(PubSubData pubSubData, InetSocketAddress serverHost) {
-        logger.debug("Connecting to host: {} with pubSubData: {}", serverHost, pubSubData);
-        // Set up the ClientBootStrap so we can create a new Channel connection
-        // to the server.
-        ClientBootstrap bootstrap = new ClientBootstrap(socketFactory);
-        bootstrap.setPipelineFactory(pipelineFactory);
-        bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("keepAlive", true);
-
-        // Start the connection attempt to the input server host.
-        ChannelFuture future = bootstrap.connect(serverHost);
-        future.addListener(new ConnectCallback(pubSubData, serverHost, this));
-    }
-
-    /**
-     * Helper method to store the topic2Host mapping in the HedwigClient cache
-     * map. This method is assumed to be called when we've done a successful
-     * connection to the correct server topic master.
-     *
-     * @param pubSubData
-     *            PubSub wrapper data
-     * @param channel
-     *            Netty Channel
-     */
-    protected void storeTopic2HostMapping(PubSubData pubSubData, Channel channel) {
-        // Retrieve the server host that we've connected to and store the
-        // mapping from the topic to this host. For all other non-redirected
-        // server statuses, we consider that as a successful connection to the
-        // correct topic master.
-        InetSocketAddress host = getHostFromChannel(channel);
-        InetSocketAddress existingHost = topic2Host.get(pubSubData.topic);
-        if (existingHost != null && existingHost.equals(host)) {
-            // Entry in map exists for the topic but it is the same as the
-            // current host. In this case there is nothing to do.
-            return;
-        }
-
-        // Store the relevant mappings for this topic and host combination.
-        if (topic2Host.putIfAbsent(pubSubData.topic, host) == null) {
-            if (logger.isDebugEnabled())
-                logger.debug("Stored info for topic: " + pubSubData.topic.toStringUtf8() + ", old host: "
-                            + existingHost + ", new host: " + host);
-            ConcurrentLinkedQueue<ByteString> topicsForHost = host2Topics.get(host);
-            if (topicsForHost == null) {
-                ConcurrentLinkedQueue<ByteString> newTopicsList = new ConcurrentLinkedQueue<ByteString>();
-                topicsForHost = host2Topics.putIfAbsent(host, newTopicsList);
-                if (topicsForHost == null) {
-                  topicsForHost = newTopicsList;
-                }
-            }
-            topicsForHost.add(pubSubData.topic);
-        }
-    }
-
-    /**
-     * Helper static method to get the String Hostname:Port from a netty
-     * Channel. Assumption is that the netty Channel was originally created with
-     * an InetSocketAddress. This is true with the Hedwig netty implementation.
-     *
-     * @param channel
-     *            Netty channel to extract the hostname and port from.
-     * @return String representation of the Hostname:Port from the Netty Channel
-     */
-    public static InetSocketAddress getHostFromChannel(Channel channel) {
-        return (InetSocketAddress) channel.getRemoteAddress();
-    }
-
-    /**
-     * Helper static method to get the ResponseHandler instance from a Channel
-     * via the ChannelPipeline it is associated with. The assumption is that the
-     * last ChannelHandler tied to the ChannelPipeline is the ResponseHandler.
-     *
-     * @param channel
-     *            Channel we are retrieving the ResponseHandler instance for
-     * @return ResponseHandler Instance tied to the Channel's Pipeline
-     * @throws NoResponseHandlerException if the response handler found for the channel is null.
-     */
-    public static ResponseHandler getResponseHandlerFromChannel(Channel channel) throws NoResponseHandlerException {
-        if (null == channel) {
-            throw new NoResponseHandlerException("Received a null value for the channel. Cannot retrieve the response handler");
-        }
-        ResponseHandler handler = (ResponseHandler) channel.getPipeline().getLast();
-        if (null == handler) {
-            throw new NoResponseHandlerException("Could not retrieve the response handler from the channel's pipeline.");
-        }
-        return handler;
-    }
-
-    // Public getter for entries in the topic2Host Map.
-    public InetSocketAddress getHostForTopic(ByteString topic) {
-        return topic2Host.get(topic);
-    }
-
-    // If a server host goes down or the channel to it gets disconnected,
-    // we want to clear out all relevant cached information. We'll
-    // need to remove all of the topic mappings that the host was
-    // responsible for.
-    public void clearAllTopicsForHost(InetSocketAddress host) {
-        logger.debug("Clearing all topics for host: {}", host);
-        // For each of the topics that the host was responsible for,
-        // remove it from the topic2Host mapping.
-        ConcurrentLinkedQueue<ByteString> topicsForHost = host2Topics.get(host);
-        if (topicsForHost != null) {
-            for (ByteString topic : topicsForHost) {
-                if (logger.isDebugEnabled())
-                    logger.debug("Removing mapping for topic: " + topic.toStringUtf8() + " from host: " + host);
-                topic2Host.remove(topic, host);
-            }
-            // Now it is safe to remove the host2Topics mapping entry.
-            host2Topics.remove(host, topicsForHost);
-        }
-    }
-
-    // If a subscribe channel goes down, the topic might have moved.
-    // We only clear out that topic for the host and not all cached information.
-    public void clearHostForTopic(ByteString topic, InetSocketAddress host) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Clearing topic: " + topic.toStringUtf8() + " for host: "
-                    + host);
-        }
-        if (topic2Host.remove(topic, host)) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Removed topic to host mapping for topic: " + topic.toStringUtf8()
-                           + " and host: " + host);
-            }
-        }
-        ConcurrentLinkedQueue<ByteString> topicsForHost = host2Topics.get(host);
-        if (null != topicsForHost && topicsForHost.remove(topic)) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Removed topic: " + topic.toStringUtf8() + " from host: " + host);
-            }
-            if (topicsForHost.isEmpty()) {
-                // remove only topic list is empty
-                host2Topics.remove(host, EMPTY_TOPIC_LIST);
-            }
-        }
-    }
-
-    // Public getter to see if the client has been stopped.
-    public boolean hasStopped() {
-        return isStopped;
-    }
-
-    // Public getter to get the client's Timer object.
-    // This is so we can reuse this and not have to create multiple Timer
-    // objects.
-    public Timer getClientTimer() {
-        return clientTimer;
-    }
-
 }