You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/27 01:52:20 UTC

svn commit: r1390777 [2/4] - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/data/ hedwig-client/src/main/java/org/apache/hedwig/client/handlers/ hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig...

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java Wed Sep 26 23:52:18 2012
@@ -17,22 +17,12 @@
  */
 package org.apache.hedwig.client.netty;
 
-import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
-import org.apache.hedwig.protocol.PubSubProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
 
 import com.google.protobuf.ByteString;
 
-import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.data.PubSubData;
 import org.apache.hedwig.client.handlers.PubSubCallback;
 import org.apache.hedwig.exceptions.PubSubException;
@@ -40,9 +30,8 @@ import org.apache.hedwig.exceptions.PubS
 import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.PublishRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PublishResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
 import org.apache.hedwig.util.Callback;
 
 /**
@@ -53,27 +42,19 @@ public class HedwigPublisher implements 
 
     private static Logger logger = LoggerFactory.getLogger(HedwigPublisher.class);
 
-    // Concurrent Map to store the mappings for a given Host (Hostname:Port) to
-    // the Channel that has been established for it previously. This channel
-    // will be used whenever we publish on a topic that the server is the master
-    // of currently. The channels used here will only be used for publish and
-    // unsubscribe requests.
-    protected final ConcurrentMap<InetSocketAddress, Channel> host2Channel = new ConcurrentHashMap<InetSocketAddress, Channel>();
-
-    private final HedwigClientImpl client;
-    private final ClientConfiguration cfg;
-    private boolean closed = false;
+    private final HChannelManager channelManager;
 
     protected HedwigPublisher(HedwigClientImpl client) {
-        this.client = client;
-        this.cfg = client.getConfiguration();
+        this.channelManager = client.getHChannelManager();
     }
 
-    public PubSubProtocol.PublishResponse publish(ByteString topic, Message msg)
+    public PublishResponse publish(ByteString topic, Message msg)
         throws CouldNotConnectException, ServiceDownException {
 
-        if (logger.isDebugEnabled())
-            logger.debug("Calling a sync publish for topic: " + topic.toStringUtf8() + ", msg: " + msg);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Calling a sync publish for topic: {}, msg: {}.",
+                         topic.toStringUtf8(), msg);
+        }
         PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null, null, null);
         synchronized (pubSubData) {
             PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
@@ -105,192 +86,61 @@ public class HedwigPublisher implements 
                 } else {
                     // For other types of PubSubExceptions, just throw a generic
                     // ServiceDownException but log a warning message.
-                    logger.error("Unexpected exception type when a sync publish operation failed: " + failureException);
+                    logger.error("Unexpected exception type when a sync publish operation failed: ",
+                                 failureException);
                     throw new ServiceDownException("Server ack response to publish request is not successful");
                 }
             }
 
-            PubSubProtocol.ResponseBody respBody = pubSubCallback.getResponseBody();
-            if (null == respBody) return null;
+            ResponseBody respBody = pubSubCallback.getResponseBody();
+            if (null == respBody) {
+                return null;
+            }
             return respBody.hasPublishResponse() ? respBody.getPublishResponse() : null;
         }
     }
 
-    public void asyncPublish(ByteString topic, Message msg, final Callback<Void> callback, Object context) {
+    public void asyncPublish(ByteString topic, Message msg,
+                             final Callback<Void> callback, Object context) {
         asyncPublishWithResponseImpl(topic, msg,
-            new VoidCallbackAdapter<PubSubProtocol.ResponseBody>(callback), context);
+                                     new VoidCallbackAdapter<ResponseBody>(callback), context);
     }
 
     public void asyncPublishWithResponse(ByteString topic, Message msg,
-                                         Callback<PubSubProtocol.PublishResponse> _callback, Object context) {
+                                         Callback<PublishResponse> callback,
+                                         Object context) {
         // adapt the callback.
-        asyncPublishWithResponseImpl(topic, msg, new PublishResponseCallbackAdapter(_callback), context);
+        asyncPublishWithResponseImpl(topic, msg,
+                                     new PublishResponseCallbackAdapter(callback), context);
     }
 
     private void asyncPublishWithResponseImpl(ByteString topic, Message msg,
-                                              Callback<PubSubProtocol.ResponseBody> callback, Object context) {
-
-        if (logger.isDebugEnabled())
-            logger.debug("Calling an async publish for topic: " + topic.toStringUtf8() + ", msg: " + msg);
-        // Check if we already have a Channel connection set up to the server
-        // for the given Topic.
-        PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null, callback, context);
-        InetSocketAddress host = client.topic2Host.get(topic);
-        if (host != null) {
-            Channel channel = host2Channel.get(host);
-            if (channel != null) {
-                // We already have the Channel connection for the server host so
-                // do the publish directly. We will deal with redirect logic
-                // later on if that server is no longer the current host for
-                // the topic.
-                doPublish(pubSubData, channel);
-            } else {
-                // We have a mapping for the topic to host but don't have a
-                // Channel for that server. This can happen if the Channel
-                // is disconnected for some reason. Do the connect then to
-                // the specified server host to create a new Channel connection.
-                client.doConnect(pubSubData, host);
-            }
-        } else {
-            // Server host for the given topic is not known yet so use the
-            // default server host/port as defined in the configs. This should
-            // point to the server VIP which would redirect to a random server
-            // (which might not be the server hosting the topic).
-            host = cfg.getDefaultServerHost();
-            Channel channel = host2Channel.get(host);
-            if (channel != null) {
-                // if there is a channel to default server, use it!
-                doPublish(pubSubData, channel);
-                return;
-            }
-            client.doConnect(pubSubData, host);
-        }
-    }
-
-    /**
-     * This is a helper method to write the actual publish message once the
-     * client is connected to the server and a Channel is available.
-     *
-     * @param pubSubData
-     *            Publish call's data wrapper object
-     * @param channel
-     *            Netty I/O channel for communication between the client and
-     *            server
-     */
-    protected void doPublish(PubSubData pubSubData, Channel channel) {
-        // Create a PubSubRequest
-        PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
-        pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
-        pubsubRequestBuilder.setType(OperationType.PUBLISH);
-        if (pubSubData.triedServers != null && pubSubData.triedServers.size() > 0) {
-            pubsubRequestBuilder.addAllTriedServers(pubSubData.triedServers);
-        }
-        long txnId = client.globalCounter.incrementAndGet();
-        pubsubRequestBuilder.setTxnId(txnId);
-        pubsubRequestBuilder.setShouldClaim(pubSubData.shouldClaim);
-        pubsubRequestBuilder.setTopic(pubSubData.topic);
-
-        // Now create the PublishRequest
-        PublishRequest.Builder publishRequestBuilder = PublishRequest.newBuilder();
-
-        publishRequestBuilder.setMsg(pubSubData.msg);
-
-        // Set the PublishRequest into the outer PubSubRequest
-        pubsubRequestBuilder.setPublishRequest(publishRequestBuilder);
-
-        // Update the PubSubData with the txnId and the requestWriteTime
-        pubSubData.txnId = txnId;
-        pubSubData.requestWriteTime = MathUtils.now();
-
-        // Before we do the write, store this information into the
-        // ResponseHandler so when the server responds, we know what
-        // appropriate Callback Data to invoke for the given txn ID.
-        try {
-            HedwigClientImpl.getResponseHandlerFromChannel(channel).txn2PubSubData.put(txnId, pubSubData);
-        } catch (NoResponseHandlerException e) {
-            logger.error("No response handler found while storing the publish callback.");
-            // Callback on pubsubdata indicating failure.
-            pubSubData.getCallback().operationFailed(pubSubData.context, new CouldNotConnectException("No response " +
-                    "handler found while attempting to publish. So could not connect."));
-            return;
-        }
-
-        // Finally, write the Publish request through the Channel.
-        if (logger.isDebugEnabled())
-            logger.debug("Writing a Publish request to host: " + HedwigClientImpl.getHostFromChannel(channel)
-                         + " for pubSubData: " + pubSubData);
-        ChannelFuture future = channel.write(pubsubRequestBuilder.build());
-        future.addListener(new WriteCallback(pubSubData, client));
-    }
-
-    // Synchronized method to store the host2Channel mapping (if it doesn't
-    // exist yet). Retrieve the hostname info from the Channel created via the
-    // RemoteAddress tied to it.
-    protected synchronized void storeHost2ChannelMapping(Channel channel) {
-        InetSocketAddress host = HedwigClientImpl.getHostFromChannel(channel);
-        if (!closed && host2Channel.putIfAbsent(host, channel) == null) {
-            if (logger.isDebugEnabled())
-                logger.debug("Stored a new Channel mapping for host: " + host);
-        } else {
-            // If we've reached here, that means we already have a Channel
-            // mapping for the given host. This should ideally not happen
-            // and it means we are creating another Channel to a server host
-            // to publish on when we could have used an existing one. This could
-            // happen due to a race condition if initially multiple concurrent
-            // threads are publishing on the same topic and no Channel exists
-            // currently to the server. We are not synchronizing this initial
-            // creation of Channels to a given host for performance.
-            // Another possible way to have redundant Channels created is if
-            // a new topic is being published to, we connect to the default
-            // server host which should be a VIP that redirects to a "real"
-            // server host. Since we don't know beforehand what is the full
-            // set of server hosts, we could be redirected to a server that
-            // we already have a channel connection to from a prior existing
-            // topic. Close these redundant channels as they won't be used.
-            logger.debug("Channel mapping to host: {} already exists so no need to store it.", host);
-            try {
-                HedwigClientImpl.getResponseHandlerFromChannel(channel).handleChannelClosedExplicitly();
-            } catch (NoResponseHandlerException e) {
-                logger.error("Could not get response handler while closing channel.");
-            }
-            channel.close();
-        }
-    }
-
-    // Public getter for entries in the host2Channel Map.
-    // This is used for classes that need this information but are not in the
-    // same classpath.
-    public Channel getChannelForHost(InetSocketAddress host) {
-        return host2Channel.get(host);
-    }
-
-    void close() {
-        synchronized(this) {
-            closed = true;
+                                              Callback<ResponseBody> callback,
+                                              Object context) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Calling an async publish for topic: {}, msg: {}.",
+                         topic.toStringUtf8(), msg);
         }
-        for (Channel channel : host2Channel.values()) {
-            try {
-                client.getResponseHandlerFromChannel(channel).handleChannelClosedExplicitly();
-            } catch (NoResponseHandlerException e) {
-                logger.error("No response handler while trying explicitly close Publisher channel " + channel);
-            }
-            channel.close().awaitUninterruptibly();
-        }
-        host2Channel.clear();
+        PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null,
+                                               callback, context);
+        channelManager.submitOp(pubSubData);
     }
 
-    private static class PublishResponseCallbackAdapter implements Callback<PubSubProtocol.ResponseBody>{
+    private static class PublishResponseCallbackAdapter implements Callback<ResponseBody>{
 
-        private final Callback<PubSubProtocol.PublishResponse> delegate;
+        private final Callback<PublishResponse> delegate;
 
-        private PublishResponseCallbackAdapter(Callback<PubSubProtocol.PublishResponse> delegate) {
+        private PublishResponseCallbackAdapter(Callback<PublishResponse> delegate) {
             this.delegate = delegate;
         }
 
         @Override
-        public void operationFinished(Object ctx, PubSubProtocol.ResponseBody resultOfOperation) {
-            if (null == resultOfOperation) delegate.operationFinished(ctx, null);
-            else delegate.operationFinished(ctx, resultOfOperation.getPublishResponse());
+        public void operationFinished(Object ctx, ResponseBody resultOfOperation) {
+            if (null == resultOfOperation) {
+                delegate.operationFinished(ctx, null);
+            } else {
+                delegate.operationFinished(ctx, resultOfOperation.getPublishResponse());
+            }
         }
 
         @Override

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Wed Sep 26 23:52:18 2012
@@ -17,25 +17,13 @@
  */
 package org.apache.hedwig.client.netty;
 
-import java.net.InetSocketAddress;
-import java.util.Collections;
 import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
-import org.apache.hedwig.protocol.PubSubProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
 
 import com.google.protobuf.ByteString;
 
-import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.client.api.MessageHandler;
 import org.apache.hedwig.client.api.Subscriber;
 import org.apache.hedwig.client.conf.ClientConfiguration;
@@ -44,22 +32,17 @@ import org.apache.hedwig.client.data.Top
 import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
 import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
 import org.apache.hedwig.client.handlers.PubSubCallback;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
 import org.apache.hedwig.filter.ClientMessageFilter;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
 import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
 import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
 import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
 import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
 import org.apache.hedwig.util.Callback;
@@ -73,46 +56,22 @@ public class HedwigSubscriber implements
 
     private static Logger logger = LoggerFactory.getLogger(HedwigSubscriber.class);
 
-    // Concurrent Map to store the cached Channel connections on the client side
-    // to a server host for a given Topic + SubscriberId combination. For each
-    // TopicSubscriber, we want a unique Channel connection to the server for
-    // it. We can also get the ResponseHandler tied to the Channel via the
-    // Channel Pipeline.
-    protected final ConcurrentMap<TopicSubscriber, Channel> topicSubscriber2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
-    protected final ConcurrentMap<TopicSubscriber, SubscriptionPreferences> topicSubscriber2Preferences =
-        new ConcurrentHashMap<TopicSubscriber, SubscriptionPreferences>();
-
-    // Concurrent Map to store Message handler for each topic + sub id combination.
-    // Store it here instead of in SubscriberResponseHandler as we don't want to lose the handler
-    // user set when connection is recovered
-    protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler= new ConcurrentHashMap<TopicSubscriber, MessageHandler>();
-
-    protected final CopyOnWriteArraySet<SubscriptionListener> listeners;
-
-    protected final HedwigClientImpl client;
     protected final ClientConfiguration cfg;
-    private final Object closeLock = new Object();
-    private boolean closed = false;
+    protected final HChannelManager channelManager;
 
     public HedwigSubscriber(HedwigClientImpl client) {
-        this.client = client;
         this.cfg = client.getConfiguration();
-        this.listeners = new CopyOnWriteArraySet<SubscriptionListener>();
+        this.channelManager = client.getHChannelManager();
     }
 
     public void addSubscriptionListener(SubscriptionListener listener) {
-        listeners.add(listener); 
+        channelManager.getSubscriptionEventEmitter()
+                      .addSubscriptionListener(listener);
     }
 
     public void removeSubscriptionListener(SubscriptionListener listener) {
-        listeners.remove(listener);
-    }
-
-    void emitSubscriptionEvent(ByteString topic, ByteString subscriberId,
-                               SubscriptionEvent event) {
-        for (SubscriptionListener listener : listeners) {
-            listener.processEvent(topic, subscriberId, event);
-        }
+        channelManager.getSubscriptionEventEmitter()
+                      .removeSubscriptionListener(listener);
     }
 
     // Private method that holds the common logic for doing synchronous
@@ -167,7 +126,7 @@ public class HedwigSubscriber implements
                 else if (failureException instanceof ServiceDownException)
                     throw (ServiceDownException) failureException;
                 else {
-                    logger.error("Unexpected PubSubException thrown: " + failureException.toString());
+                    logger.error("Unexpected PubSubException thrown: ", failureException);
                     // Throw a generic ServiceDownException but wrap the
                     // original PubSubException within it.
                     throw new ServiceDownException(failureException);
@@ -181,7 +140,7 @@ public class HedwigSubscriber implements
     // flows are very similar. The assumption is that the input OperationType is
     // either SUBSCRIBE or UNSUBSCRIBE.
     private void asyncSubUnsub(ByteString topic, ByteString subscriberId,
-                               Callback<PubSubProtocol.ResponseBody> callback, Object context,
+                               Callback<ResponseBody> callback, Object context,
                                OperationType operationType, SubscriptionOptions options) {
         if (logger.isDebugEnabled()) {
             StringBuilder debugMsg = new StringBuilder().append("Calling a async subUnsub request for topic: ")
@@ -194,41 +153,18 @@ public class HedwigSubscriber implements
             }
             logger.debug(debugMsg.toString());
         }
-        // Check if we know which server host is the master for the topic we are
-        // subscribing to.
-        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, options, callback,
-                                               context);
-
-        InetSocketAddress host = client.topic2Host.get(topic);
-        if (host != null) {
-            Channel existingChannel = null;
-            if (operationType.equals(OperationType.UNSUBSCRIBE) &&
-                (existingChannel = client.getPublisher().host2Channel.get(host)) != null) {
-                // For unsubscribes, we can reuse the channel connections to the
-                // server host that are cached for publishes. For publish and
-                // unsubscribe flows, we will thus use the same Channels and
-                // will cache and store them during the ConnectCallback.
-                doSubUnsub(pubSubData, existingChannel);
-            } else {
-                // We know which server host is the master for the topic so
-                // connect to that first. For subscribes, we want a new channel
-                // connection each time for the TopicSubscriber. If the
-                // TopicSubscriber is already connected and subscribed,
-                // we assume the server will respond with an appropriate status
-                // indicating this. For unsubscribes, it is possible that the
-                // client is subscribed to the topic already but does not
-                // have a Channel connection yet to the server host. e.g. Client
-                // goes down and comes back up but client side soft state memory
-                // does not have the netty Channel connection anymore.
-                client.doConnect(pubSubData, host);
+        if (OperationType.SUBSCRIBE.equals(operationType)) {
+            if (options.getMessageBound() <= 0 &&
+                cfg.getSubscriptionMessageBound() > 0) {
+                SubscriptionOptions.Builder soBuilder =
+                    SubscriptionOptions.newBuilder(options).setMessageBound(
+                        cfg.getSubscriptionMessageBound());
+                options = soBuilder.build();
             }
-        } else {
-            // Server host for the given topic is not known yet so use the
-            // default server host/port as defined in the configs. This should
-            // point to the server VIP which would redirect to a random server
-            // (which might not be the server hosting the topic).
-            client.doConnect(pubSubData, cfg.getDefaultServerHost());
         }
+        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType,
+                                               options, callback, context);
+        channelManager.submitOp(pubSubData);
     }
 
     public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
@@ -256,7 +192,7 @@ public class HedwigSubscriber implements
         try {
             subUnsub(topic, subscriberId, OperationType.SUBSCRIBE, options);
         } catch (ClientNotSubscribedException e) {
-            logger.error("Unexpected Exception thrown: " + e.toString());
+            logger.error("Unexpected Exception thrown: ", e);
             // This exception should never be thrown here. But just in case,
             // throw a generic ServiceDownException but wrap the original
             // Exception within it.
@@ -285,8 +221,9 @@ public class HedwigSubscriber implements
                                          "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
             return;
         }
-        asyncSubUnsub(topic, subscriberId, new VoidCallbackAdapter<PubSubProtocol.ResponseBody>(callback),
-            context, OperationType.SUBSCRIBE, options);
+        asyncSubUnsub(topic, subscriberId,
+                      new VoidCallbackAdapter<ResponseBody>(callback), context,
+                      OperationType.SUBSCRIBE, options);
     }
 
     public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
@@ -312,7 +249,7 @@ public class HedwigSubscriber implements
         try {
             subUnsub(topic, subscriberId, OperationType.UNSUBSCRIBE, null);
         } catch (ClientAlreadySubscribedException e) {
-            logger.error("Unexpected Exception thrown: " + e.toString());
+            logger.error("Unexpected Exception thrown: ", e);
             // This exception should never be thrown here. But just in case,
             // throw a generic ServiceDownException but wrap the original
             // Exception within it.
@@ -320,20 +257,22 @@ public class HedwigSubscriber implements
         }
     }
 
-    public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId, final Callback<Void> callback,
-                                 final Object context) {
+    public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
+                                 final Callback<Void> callback, final Object context) {
         doAsyncUnsubscribe(topic, subscriberId,
-            new VoidCallbackAdapter<PubSubProtocol.ResponseBody>(callback), context, false);
+                           new VoidCallbackAdapter<ResponseBody>(callback),
+                           context, false);
     }
 
     protected void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
                                     final Callback<Void> callback, final Object context, boolean isHub) {
         doAsyncUnsubscribe(topic, subscriberId,
-            new VoidCallbackAdapter<PubSubProtocol.ResponseBody>(callback), context, isHub);
+                           new VoidCallbackAdapter<ResponseBody>(callback),
+                           context, isHub);
     }
 
     private void doAsyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
-                                    final Callback<PubSubProtocol.ResponseBody> callback,
+                                    final Callback<ResponseBody> callback,
                                     final Object context, boolean isHub) {
         // Validate that the format of the subscriberId is valid either as a
         // local or hub subscriber.
@@ -344,9 +283,9 @@ public class HedwigSubscriber implements
         }
         // Asynchronously close the subscription. On the callback to that
         // operation once it completes, post the async unsubscribe request.
-        doAsyncCloseSubscription(topic, subscriberId, new Callback<PubSubProtocol.ResponseBody>() {
+        doAsyncCloseSubscription(topic, subscriberId, new Callback<ResponseBody>() {
             @Override
-            public void operationFinished(Object ctx, PubSubProtocol.ResponseBody resultOfOperation) {
+            public void operationFinished(Object ctx, ResponseBody resultOfOperation) {
                 asyncSubUnsub(topic, subscriberId, callback, context, OperationType.UNSUBSCRIBE, null);
             }
 
@@ -369,184 +308,22 @@ public class HedwigSubscriber implements
 
     public void consume(ByteString topic, ByteString subscriberId, MessageSeqId messageSeqId)
             throws ClientNotSubscribedException {
-        if (logger.isDebugEnabled())
-            logger.debug("Calling consume for topic: " + topic.toStringUtf8() + ", subscriberId: "
-                         + subscriberId.toStringUtf8() + ", messageSeqId: " + messageSeqId);
         TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
+        logger.debug("Calling consume for {}, messageSeqId: {}.",
+                     topicSubscriber, messageSeqId);
+
+        SubscribeResponseHandler subscribeResponseHandler =
+            channelManager.getSubscribeResponseHandler(topicSubscriber);
         // Check that this topic subscription on the client side exists.
-        Channel channel = topicSubscriber2Channel.get(topicSubscriber);
-        if (channel == null) {
+        if (null == subscribeResponseHandler ||
+            !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
             throw new ClientNotSubscribedException(
-                "Cannot send consume message since client is not subscribed to topic: " + topic.toStringUtf8()
-                + ", subscriberId: " + subscriberId.toStringUtf8());
+                "Cannot send consume message since client is not subscribed to topic: "
+                + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
         }
-        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, OperationType.CONSUME, null, null, null);
         // Send the consume message to the server using the same subscribe
         // channel that the topic subscription uses.
-        doConsume(pubSubData, channel, messageSeqId);
-    }
-
-    /**
-     * Convert client-side subscription options to subscription preferences
-     *
-     * @param options
-     *          Client-Side subscription options
-     */
-    protected SubscriptionPreferences.Builder options2Preferences(SubscriptionOptions options) {
-        // prepare subscription preferences
-        SubscriptionPreferences.Builder preferencesBuilder = SubscriptionPreferences.newBuilder();
-
-        // set message bound
-        if (options.getMessageBound() > 0) {
-            preferencesBuilder.setMessageBound(options.getMessageBound());
-        } else if (cfg.getSubscriptionMessageBound() > 0) {
-            preferencesBuilder.setMessageBound(cfg.getSubscriptionMessageBound());
-        }
-
-        // set message filter
-        if (options.hasMessageFilter()) {
-            preferencesBuilder.setMessageFilter(options.getMessageFilter());
-        }
-
-        // set user options
-        if (options.hasOptions()) {
-            preferencesBuilder.setOptions(options.getOptions());
-        }
-
-        return preferencesBuilder;
-    }
-
-    /**
-     * This is a helper method to write the actual subscribe/unsubscribe message
-     * once the client is connected to the server and a Channel is available.
-     *
-     * @param pubSubData
-     *            Subscribe/Unsubscribe call's data wrapper object. We assume
-     *            that the operationType field is either SUBSCRIBE or
-     *            UNSUBSCRIBE.
-     * @param channel
-     *            Netty I/O channel for communication between the client and
-     *            server
-     */
-    protected void doSubUnsub(PubSubData pubSubData, Channel channel) {
-        // Create a PubSubRequest
-        PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
-        pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
-        pubsubRequestBuilder.setType(pubSubData.operationType);
-        if (pubSubData.triedServers != null && pubSubData.triedServers.size() > 0) {
-            pubsubRequestBuilder.addAllTriedServers(pubSubData.triedServers);
-        }
-        long txnId = client.globalCounter.incrementAndGet();
-        pubsubRequestBuilder.setTxnId(txnId);
-        pubsubRequestBuilder.setShouldClaim(pubSubData.shouldClaim);
-        pubsubRequestBuilder.setTopic(pubSubData.topic);
-
-        // Create either the Subscribe or Unsubscribe Request
-        if (pubSubData.operationType.equals(OperationType.SUBSCRIBE)) {
-            // Create the SubscribeRequest
-            SubscribeRequest.Builder subscribeRequestBuilder = SubscribeRequest.newBuilder();
-            subscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
-            subscribeRequestBuilder.setCreateOrAttach(pubSubData.options.getCreateOrAttach());
-            subscribeRequestBuilder.setForceAttach(pubSubData.options.getForceAttach());
-            // For now, all subscribes should wait for all cross-regional
-            // subscriptions to be established before returning.
-            subscribeRequestBuilder.setSynchronous(true);
-            // set subscription preferences
-            SubscriptionPreferences.Builder preferencesBuilder =
-                options2Preferences(pubSubData.options);
-            // backward compatable with 4.1.0
-            if (preferencesBuilder.hasMessageBound()) {
-                subscribeRequestBuilder.setMessageBound(preferencesBuilder.getMessageBound());
-            }
-            subscribeRequestBuilder.setPreferences(preferencesBuilder);
-
-            // Set the SubscribeRequest into the outer PubSubRequest
-            pubsubRequestBuilder.setSubscribeRequest(subscribeRequestBuilder);
-        } else {
-            // Create the UnSubscribeRequest
-            UnsubscribeRequest.Builder unsubscribeRequestBuilder = UnsubscribeRequest.newBuilder();
-            unsubscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
-
-            // Set the UnsubscribeRequest into the outer PubSubRequest
-            pubsubRequestBuilder.setUnsubscribeRequest(unsubscribeRequestBuilder);
-        }
-
-        // Update the PubSubData with the txnId and the requestWriteTime
-        pubSubData.txnId = txnId;
-        pubSubData.requestWriteTime = MathUtils.now();
-
-        // Before we do the write, store this information into the
-        // ResponseHandler so when the server responds, we know what
-        // appropriate Callback Data to invoke for the given txn ID.
-        try {
-            HedwigClientImpl.getResponseHandlerFromChannel(channel).txn2PubSubData.put(txnId, pubSubData);
-        } catch (Exception e) {
-            logger.error("No response handler found while storing the subscribe callback.");
-            // Call operationFailed on the pubsubdata callback to indicate failure
-            pubSubData.getCallback().operationFailed(pubSubData.context, new CouldNotConnectException("No response " +
-                    "handler found while attempting to subscribe."));
-            return;
-        }
-
-        // Finally, write the Subscribe request through the Channel.
-        if (logger.isDebugEnabled())
-            logger.debug("Writing a SubUnsub request to host: " + HedwigClientImpl.getHostFromChannel(channel)
-                         + " for pubSubData: " + pubSubData);
-        ChannelFuture future = channel.write(pubsubRequestBuilder.build());
-        future.addListener(new WriteCallback(pubSubData, client));
-    }
-
-    /**
-     * This is a helper method to write a consume message to the server after a
-     * subscribe Channel connection is made to the server and messages are being
-     * consumed by the client.
-     *
-     * @param pubSubData
-     *            Consume call's data wrapper object. We assume that the
-     *            operationType field is CONSUME.
-     * @param channel
-     *            Netty I/O channel for communication between the client and
-     *            server
-     * @param messageSeqId
-     *            Message Seq ID for the latest/last message the client has
-     *            consumed.
-     */
-    public void doConsume(final PubSubData pubSubData, final Channel channel, final MessageSeqId messageSeqId) {
-        // Create a PubSubRequest
-        PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
-        pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
-        pubsubRequestBuilder.setType(OperationType.CONSUME);
-        long txnId = client.globalCounter.incrementAndGet();
-        pubsubRequestBuilder.setTxnId(txnId);
-        pubsubRequestBuilder.setTopic(pubSubData.topic);
-
-        // Create the ConsumeRequest
-        ConsumeRequest.Builder consumeRequestBuilder = ConsumeRequest.newBuilder();
-        consumeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
-        consumeRequestBuilder.setMsgId(messageSeqId);
-
-        // Set the ConsumeRequest into the outer PubSubRequest
-        pubsubRequestBuilder.setConsumeRequest(consumeRequestBuilder);
-
-        // For Consume requests, we will send them from the client in a fire and
-        // forget manner. We are not expecting the server to send back an ack
-        // response so no need to register this in the ResponseHandler. There
-        // are no callbacks to invoke since this isn't a client initiated
-        // action. Instead, just have a future listener that will log an error
-        // message if there was a problem writing the consume request.
-        if (logger.isDebugEnabled())
-            logger.debug("Writing a Consume request to host: " + HedwigClientImpl.getHostFromChannel(channel)
-                         + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
-        ChannelFuture future = channel.write(pubsubRequestBuilder.build());
-        future.addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-                if (!future.isSuccess()) {
-                    logger.error("Error writing a Consume request to host: " + HedwigClientImpl.getHostFromChannel(channel)
-                                 + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
-                }
-            }
-        });
+        subscribeResponseHandler.consume(topicSubscriber, messageSeqId);
     }
 
     public boolean hasSubscription(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
@@ -559,7 +336,11 @@ public class HedwigSubscriber implements
         // correct way to contact the server to get this info is then.
         // The client side just has soft memory state for client subscription
         // information.
-        return topicSubscriber2Channel.containsKey(new TopicSubscriber(topic, subscriberId));
+        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
+        SubscribeResponseHandler subscribeResponseHandler =
+            channelManager.getSubscribeResponseHandler(topicSubscriber);
+        return !(null == subscribeResponseHandler ||
+                 !subscribeResponseHandler.hasSubscription(topicSubscriber));
     }
 
     public List<ByteString> getSubscriptionList(ByteString subscriberId) throws CouldNotConnectException,
@@ -569,9 +350,12 @@ public class HedwigSubscriber implements
         return null;
     }
 
-    public void startDelivery(final ByteString topic, final ByteString subscriberId, MessageHandler messageHandler)
+    public void startDelivery(final ByteString topic, final ByteString subscriberId,
+                              MessageHandler messageHandler)
             throws ClientNotSubscribedException, AlreadyStartDeliveryException {
-        startDelivery(topic, subscriberId, messageHandler, false);
+        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
+        logger.debug("Starting delivery for {}.", topicSubscriber);
+        channelManager.startDelivery(topicSubscriber, messageHandler); 
     }
 
     public void startDeliveryWithFilter(final ByteString topic, final ByteString subscriberId,
@@ -579,147 +363,19 @@ public class HedwigSubscriber implements
                                         ClientMessageFilter messageFilter)
             throws ClientNotSubscribedException, AlreadyStartDeliveryException {
         if (null == messageHandler || null == messageFilter) {
-            throw new NullPointerException("Null message handler or message filter is provided.");
+            throw new NullPointerException("Null message handler or message filter is       provided.");
         }
         TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
-        SubscriptionPreferences preferences = topicSubscriber2Preferences.get(topicSubscriber);
-        if (null == preferences) {
-            throw new ClientNotSubscribedException("No subscription preferences found to filter messages for topic: "
-                    + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
-        }
-        // pass subscription preferences to message filter
-        if (logger.isDebugEnabled()) {
-            logger.debug("Start delivering messages with filter on topic: " + topic.toStringUtf8()
-                         + ", subscriberId: " + subscriberId.toStringUtf8() + ", preferences: "
-                         + SubscriptionStateUtils.toString(preferences));
-        }
-        messageFilter.setSubscriptionPreferences(topic, subscriberId, preferences);
         messageHandler = new FilterableMessageHandler(messageHandler, messageFilter);
-        startDelivery(topic, subscriberId, messageHandler, false);
-    }
-
-    public void restartDelivery(final ByteString topic, final ByteString subscriberId)
-        throws ClientNotSubscribedException, AlreadyStartDeliveryException {
-        startDelivery(topic, subscriberId, null, true);
+        logger.debug("Starting delivery with filter for {}.", topicSubscriber);
+        channelManager.startDelivery(topicSubscriber, messageHandler);
     }
 
-    private void startDelivery(final ByteString topic, final ByteString subscriberId,
-                               MessageHandler messageHandler, boolean restart)
-        throws ClientNotSubscribedException, AlreadyStartDeliveryException {
-        if (logger.isDebugEnabled())
-            logger.debug("Starting delivery for topic: " + topic.toStringUtf8() + ", subscriberId: "
-                         + subscriberId.toStringUtf8());
+    public void stopDelivery(final ByteString topic, final ByteString subscriberId)
+    throws ClientNotSubscribedException {
         TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
-        // Make sure we know about this topic subscription on the client side
-        // exists. The assumption is that the client should have in memory the
-        // Channel created for the TopicSubscriber once the server has sent
-        // an ack response to the initial subscribe request.
-        Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
-        if (topicSubscriberChannel == null) {
-            logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: "
-                         + subscriberId.toStringUtf8());
-            throw new ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8()
-                                                   + ", subscriberId: " + subscriberId.toStringUtf8());
-        }
-
-        // Need to ensure the setting of handler and the readability of channel is in sync
-        // as there's a race condition that connection recovery and user might call this at the same time
-        MessageHandler existedMsgHandler = topicSubscriber2MessageHandler.get(topicSubscriber);
-        if (restart) {
-            // restart using existing msg handler 
-            messageHandler = existedMsgHandler;
-        } else {
-            // some has started delivery but not stop it
-            if (null != existedMsgHandler) {
-                throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber);
-            }
-            if (messageHandler != null) {
-                if (null != topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) {
-                    throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber);
-                }
-            }
-        }
-        try {
-            HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
-            .setMessageHandler(messageHandler);
-        } catch (NoResponseHandlerException e) {
-            // We did not find a response handler. So remove this subscription handler and throw an exception.
-            topicSubscriber2MessageHandler.remove(topicSubscriber, existedMsgHandler);
-            asyncCloseSubscription(topic, subscriberId, new Callback<Void>() {
-                @Override
-                public void operationFinished(Object ctx, Void resultOfOperation) {
-                    logger.warn("Closed subscription for topic " + topic.toStringUtf8() + " and subscriber " +
-                    subscriberId.toStringUtf8());
-                }
-
-                @Override
-                public void operationFailed(Object ctx, PubSubException exception) {
-                    logger.warn("Error while closing subscription for topic " + topic.toStringUtf8() + " and subscriber " +
-                            subscriberId.toStringUtf8());
-                }
-            }, null);
-
-            // We should tell the client to resubscribe.
-            throw new ClientNotSubscribedException("Closed subscription for topic " + topic.toStringUtf8() + " and" +
-                    "subscriber Id "  + subscriberId.toStringUtf8());
-        }
-        // Now make the TopicSubscriber Channel readable (it is set to not be
-        // readable when the initial subscription is done). Note that this is an
-        // asynchronous call. If this fails (not likely), the futureListener
-        // will just log an error message for now.
-        ChannelFuture future = topicSubscriberChannel.setReadable(true);
-        future.addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-                if (!future.isSuccess()) {
-                    logger.error("Unable to make subscriber Channel readable in startDelivery call for topic: "
-                                 + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
-                }
-            }
-        });
-    }
-
-    public void stopDelivery(final ByteString topic, final ByteString subscriberId) throws ClientNotSubscribedException {
-        if (logger.isDebugEnabled())
-            logger.debug("Stopping delivery for topic: " + topic.toStringUtf8() + ", subscriberId: "
-                         + subscriberId.toStringUtf8());
-        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
-        // Make sure we know that this topic subscription on the client side
-        // exists. The assumption is that the client should have in memory the
-        // Channel created for the TopicSubscriber once the server has sent
-        // an ack response to the initial subscribe request.
-        Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
-        if (topicSubscriberChannel == null) {
-            logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: "
-                         + subscriberId.toStringUtf8());
-            throw new ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8()
-                                                   + ", subscriberId: " + subscriberId.toStringUtf8());
-        }
-
-        // Unregister the MessageHandler for the subscribe Channel's
-        // Response Handler.
-        try {
-            HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
-                .setMessageHandler(null);
-        } catch (NoResponseHandlerException e) {
-            // Here it's okay if we can't set the response handler's message handler to null. We should just remove it.
-            logger.warn("Could not set message handler to null for subscription channel " + topicSubscriberChannel + ", ignoring.");
-        }
-        this.topicSubscriber2MessageHandler.remove(topicSubscriber);
-        // Now make the TopicSubscriber channel not-readable. This will buffer
-        // up messages if any are sent from the server. Note that this is an
-        // asynchronous call. If this fails (not likely), the futureListener
-        // will just log an error message for now.
-        ChannelFuture future = topicSubscriberChannel.setReadable(false);
-        future.addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-                if (!future.isSuccess()) {
-                    logger.error("Unable to make subscriber Channel not readable in stopDelivery call for topic: "
-                                 + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
-                }
-            }
-        });
+        logger.debug("Stopping delivery for {}.", topicSubscriber);
+        channelManager.stopDelivery(topicSubscriber); 
     }
 
     public void closeSubscription(ByteString topic, ByteString subscriberId) throws ServiceDownException {
@@ -744,94 +400,22 @@ public class HedwigSubscriber implements
     public void asyncCloseSubscription(final ByteString topic, final ByteString subscriberId,
                                        final Callback<Void> callback, final Object context) {
         doAsyncCloseSubscription(topic, subscriberId,
-            new VoidCallbackAdapter<PubSubProtocol.ResponseBody> (callback), context);
+                                 new VoidCallbackAdapter<ResponseBody>(callback), context);
     }
 
     private void doAsyncCloseSubscription(final ByteString topic, final ByteString subscriberId,
-                                       final Callback<PubSubProtocol.ResponseBody> callback, final Object context) {
-        if (logger.isDebugEnabled())
-            logger.debug("Closing subscription asynchronously for topic: " + topic.toStringUtf8() + ", subscriberId: "
-                         + subscriberId.toStringUtf8());
+                                          final Callback<ResponseBody> callback, final Object context) {
         TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
-        // Remove all cached references for the TopicSubscriber
-        Channel channel = topicSubscriber2Channel.remove(topicSubscriber);
-        if (channel != null) {
-            // Close the subscribe channel asynchronously.
-            try {
-                HedwigClientImpl.getResponseHandlerFromChannel(channel).handleChannelClosedExplicitly();
-            } catch (NoResponseHandlerException e) {
-                // Don't close the channel if you can't find the handler.
-                logger.warn("No response handler found, so could not close subscription channel " + channel);
-            }
-            // We still close the channel as this is an unexpected event and should be handled as one.
-            ChannelFuture future = channel.close();
-            future.addListener(new ChannelFutureListener() {
-                @Override
-                public void operationComplete(ChannelFuture future) throws Exception {
-                    if (!future.isSuccess()) {
-                        logger.error("Failed to close the subscription channel for topic: " + topic.toStringUtf8()
-                                     + ", subscriberId: " + subscriberId.toStringUtf8());
-                        callback.operationFailed(context, new ServiceDownException(
-                                                     "Failed to close the subscription channel for topic: " + topic.toStringUtf8()
-                                                     + ", subscriberId: " + subscriberId.toStringUtf8()));
-                    } else {
-                        callback.operationFinished(context, null);
-                    }
-                }
-            });
-        } else {
-            logger.warn("Trying to close a subscription when we don't have a subscribe channel cached for topic: "
-                        + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
-            callback.operationFinished(context, null);
-        }
-    }
-
-    // Public getter and setters for entries in the topic2Host Map.
-    // This is used for classes that need this information but are not in the
-    // same classpath.
-    public Channel getChannelForTopic(TopicSubscriber topic) {
-        return topicSubscriber2Channel.get(topic);
-    }
-
-    public void setChannelAndPreferencesForTopic(TopicSubscriber topic, Channel channel,
-                                                 SubscriptionPreferences preferences) {
-        synchronized (closeLock) {
-            if (closed) {
-                channel.close();
-                return;
-            }
-            Channel oldc = topicSubscriber2Channel.putIfAbsent(topic, channel);
-            if (oldc != null) {
-                logger.warn("Dropping new channel for " + topic + ", due to existing channel: " + oldc);
-                channel.close();
-            }
-            if (null != preferences) {
-                topicSubscriber2Preferences.put(topic, preferences);
-            }
-        }
-    }
-
-    public void removeTopicSubscriber(TopicSubscriber topic) {
-        synchronized (topic) {
-            topicSubscriber2Preferences.remove(topic);
-            topicSubscriber2Channel.remove(topic);
-        }
-    }
-
-    void close() {
-        synchronized (closeLock) {
-            closed = true;
-        }
-
-        // Close all of the open Channels.
-        for (Channel channel : topicSubscriber2Channel.values()) {
-            try {
-                client.getResponseHandlerFromChannel(channel).handleChannelClosedExplicitly();
-            } catch (NoResponseHandlerException e) {
-                logger.error("No response handler found while trying to close subscription channel.");
-            }
-            channel.close().awaitUninterruptibly();
+        logger.debug("Stopping delivery for {} before closing subscription.", topicSubscriber);
+        // We only stop delivery here not in channel manager
+        // Because channelManager#asyncCloseSubscription will called
+        // when subscription channel disconnected to clear local subscription
+        try {
+            channelManager.stopDelivery(topicSubscriber); 
+        } catch (ClientNotSubscribedException cnse) {
+            // it is OK to ignore the exception when closing subscription
         }
-        topicSubscriber2Channel.clear();
+        logger.debug("Closing subscription asynchronously for {}.", topicSubscriber);
+        channelManager.asyncCloseSubscription(topicSubscriber, callback, context);
     }
 }

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.net.InetSocketAddress;
+
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.PublishRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
+import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
+
+/**
+ * Utilities for network operations.
+ */
+public class NetUtils {
+
+    /**
+     * Helper static method to get the String Hostname:Port from a netty
+     * Channel. Assumption is that the netty Channel was originally created with
+     * an InetSocketAddress. This is true with the Hedwig netty implementation.
+     *
+     * @param channel
+     *            Netty channel to extract the hostname and port from.
+     * @return String representation of the Hostname:Port from the Netty Channel
+     */
+    public static InetSocketAddress getHostFromChannel(Channel channel) {
+        return (InetSocketAddress) channel.getRemoteAddress();
+    }
+
+    /**
+     * This is a helper method to build the actual pub/sub message.
+     *
+     * @param txnId
+     *            Transaction Id.
+     * @param pubSubData
+     *            Publish call's data wrapper object.
+     * @return pub sub request to send
+     */
+    public static PubSubRequest.Builder buildPubSubRequest(long txnId,
+                                                           PubSubData pubSubData) {
+        // Create a PubSubRequest
+        PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
+        pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
+        pubsubRequestBuilder.setType(pubSubData.operationType);
+        // for consume request, we don't need to care about tried servers list
+        if (OperationType.CONSUME != pubSubData.operationType) {
+            if (pubSubData.triedServers != null && pubSubData.triedServers.size() > 0) {
+                pubsubRequestBuilder.addAllTriedServers(pubSubData.triedServers);
+            }
+        }
+        pubsubRequestBuilder.setTxnId(txnId);
+        pubsubRequestBuilder.setShouldClaim(pubSubData.shouldClaim);
+        pubsubRequestBuilder.setTopic(pubSubData.topic);
+
+        switch (pubSubData.operationType) {
+        case PUBLISH:
+            // Set the PublishRequest into the outer PubSubRequest
+            pubsubRequestBuilder.setPublishRequest(buildPublishRequest(pubSubData));
+            break;
+        case SUBSCRIBE:
+            // Set the SubscribeRequest into the outer PubSubRequest
+            pubsubRequestBuilder.setSubscribeRequest(buildSubscribeRequest(pubSubData));
+            break;
+        case UNSUBSCRIBE:
+            // Set the UnsubscribeRequest into the outer PubSubRequest
+            pubsubRequestBuilder.setUnsubscribeRequest(buildUnsubscribeRequest(pubSubData));
+            break;
+        }
+
+        // Update the PubSubData with the txnId and the requestWriteTime
+        pubSubData.txnId = txnId;
+        pubSubData.requestWriteTime = System.currentTimeMillis();
+
+        return pubsubRequestBuilder;
+    }
+
+    // build publish request
+    private static PublishRequest.Builder buildPublishRequest(PubSubData pubSubData) {
+        PublishRequest.Builder publishRequestBuilder = PublishRequest.newBuilder();
+        publishRequestBuilder.setMsg(pubSubData.msg);
+        return publishRequestBuilder;
+    }
+
+    // build subscribe request
+    private static SubscribeRequest.Builder buildSubscribeRequest(PubSubData pubSubData) { SubscribeRequest.Builder subscribeRequestBuilder = SubscribeRequest.newBuilder();
+        subscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
+        subscribeRequestBuilder.setCreateOrAttach(pubSubData.options.getCreateOrAttach());
+        subscribeRequestBuilder.setForceAttach(pubSubData.options.getForceAttach());
+        // For now, all subscribes should wait for all cross-regional
+        // subscriptions to be established before returning.
+        subscribeRequestBuilder.setSynchronous(true);
+        // set subscription preferences
+        SubscriptionPreferences.Builder preferencesBuilder =
+            options2Preferences(pubSubData.options);
+        // backward compatable with 4.1.0
+        if (preferencesBuilder.hasMessageBound()) {
+            subscribeRequestBuilder.setMessageBound(preferencesBuilder.getMessageBound());
+        } 
+        subscribeRequestBuilder.setPreferences(preferencesBuilder);
+        return subscribeRequestBuilder;
+    }
+
+    // build unsubscribe request
+    private static UnsubscribeRequest.Builder buildUnsubscribeRequest(PubSubData pubSubData) {
+        // Create the UnSubscribeRequest
+        UnsubscribeRequest.Builder unsubscribeRequestBuilder = UnsubscribeRequest.newBuilder();
+        unsubscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
+        return unsubscribeRequestBuilder;
+    }
+
+    /**
+     * Build consume request
+     *
+     * @param txnId
+     *          Transaction Id.
+     * @param topicSubscriber
+     *          Topic Subscriber.
+     * @param messageSeqId
+     *          Message Seq Id.
+     * @return pub/sub request.
+     */
+    public static PubSubRequest.Builder buildConsumeRequest(long txnId,
+                                                            TopicSubscriber topicSubscriber,
+                                                            MessageSeqId messageSeqId) {
+        // Create a PubSubRequest
+        PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
+        pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
+        pubsubRequestBuilder.setType(OperationType.CONSUME);
+
+        pubsubRequestBuilder.setTxnId(txnId);
+        pubsubRequestBuilder.setTopic(topicSubscriber.getTopic());
+
+        // Create the ConsumeRequest
+        ConsumeRequest.Builder consumeRequestBuilder = ConsumeRequest.newBuilder();
+        consumeRequestBuilder.setSubscriberId(topicSubscriber.getSubscriberId());
+        consumeRequestBuilder.setMsgId(messageSeqId);
+
+        pubsubRequestBuilder.setConsumeRequest(consumeRequestBuilder);
+
+        return pubsubRequestBuilder;
+    }
+
+    /**
+     * Convert client-side subscription options to subscription preferences
+     *
+     * @param options
+     *          Client-Side subscription options
+     * @return subscription preferences
+     */
+    private static SubscriptionPreferences.Builder options2Preferences(SubscriptionOptions options) {
+        // prepare subscription preferences
+        SubscriptionPreferences.Builder preferencesBuilder =
+            SubscriptionPreferences.newBuilder();
+
+        // set message bound
+        if (options.getMessageBound() > 0) {
+            preferencesBuilder.setMessageBound(options.getMessageBound());
+        }
+
+        // set message filter
+        if (options.hasMessageFilter()) {
+            preferencesBuilder.setMessageFilter(options.getMessageFilter());
+        }
+
+        // set user options
+        if (options.hasOptions()) {
+            preferencesBuilder.setOptions(options.getOptions());
+        }
+
+        return preferencesBuilder;
+    }
+
+}

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+import org.apache.hedwig.util.SubscriptionListener;
+
+public class SubscriptionEventEmitter {
+
+    private final CopyOnWriteArraySet<SubscriptionListener> listeners;
+
+    public SubscriptionEventEmitter() {
+        listeners = new CopyOnWriteArraySet<SubscriptionListener>();
+    }
+
+    public void addSubscriptionListener(SubscriptionListener listener) {
+        listeners.add(listener); 
+    }
+
+    public void removeSubscriptionListener(SubscriptionListener listener) {
+        listeners.remove(listener);
+    }
+
+    public void emitSubscriptionEvent(ByteString topic, ByteString subscriberId,
+                                      SubscriptionEvent event) {
+        for (SubscriptionListener listener : listeners) {
+            listener.processEvent(topic, subscriberId, event);
+        }
+    }
+
+}