You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/11/02 17:15:01 UTC

svn commit: r1405028 [1/3] - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/conf/ hedwig-client/src/main/java/org/apache/hedwig/client/handlers/ hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig...

Author: ivank
Date: Fri Nov  2 16:14:59 2012
New Revision: 1405028

URL: http://svn.apache.org/viewvc?rev=1405028&view=rev
Log:
BOOKKEEPER-368 Implementing multiplexing java client. (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/ResubscribeCallback.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyCloseSubscriptionHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java
Removed:
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestCloseSubscription.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1405028&r1=1405027&r2=1405028&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Nov  2 16:14:59 2012
@@ -224,6 +224,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-369: re-factor hedwig cpp client to provide better interface to support both one-subscription-per-channel and multiple-subscriptions-per-channel. (sijie via ivank)
 
+        BOOKKEEPER-368: Implementing multiplexing java client. (sijie via ivank)
+
 Release 4.1.0 - 2012-06-07
 
   Non-backward compatible changes:

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java?rev=1405028&r1=1405027&r2=1405028&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java Fri Nov  2 16:14:59 2012
@@ -42,6 +42,7 @@ public class ClientConfiguration extends
     protected static final String TIMEOUT_THREAD_RUN_INTERVAL = "timeout_thread_run_interval";
     protected static final String SSL_ENABLED = "ssl_enabled";
     protected static final String SUBSCRIPTION_MESSAGE_BOUND = "subscription_message_bound";
+    protected static final String MULTIPLEXING_ENABLED = "multiplexing_enabled";
 
     // Singletons we want to instantiate only once per ClientConfiguration
     protected HedwigSocketAddress myDefaultServerAddress = null;
@@ -140,6 +141,14 @@ public class ClientConfiguration extends
     }
 
     /**
+     * This parameter is a boolean flag indicating if multiplexing subscription
+     * channels.
+     */
+    public boolean isMultiplexingEnabled() {
+        return conf.getBoolean(MULTIPLEXING_ENABLED, false);
+    }
+
+    /**
      * The maximum number of messages the hub will queue for subscriptions
      * created using this configuration. The hub will always queue the most
      * recent messages. If there are enough publishes to the topic to hit

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1405028&r1=1405027&r2=1405028&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Fri Nov  2 16:14:59 2012
@@ -23,6 +23,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.jboss.netty.channel.Channel;
 
+import com.google.protobuf.ByteString;
+
 import org.apache.hedwig.client.api.MessageHandler;
 import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.data.TopicSubscriber;
@@ -34,6 +36,7 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
 import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
 import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
 import org.apache.hedwig.util.Callback;
 
@@ -61,6 +64,20 @@ public abstract class SubscribeResponseH
     public abstract void handleSubscribeMessage(PubSubResponse response);
 
     /**
+     * Handle a subscription event delivered by the server.
+     *
+     * @param topic
+     *          Topic Name
+     * @param subscriberId
+     *          Subscriber Id
+     * @param event
+     *          Subscription Event describes its status
+     */
+    public abstract void handleSubscriptionEvent(ByteString topic,
+                                                 ByteString subscriberId,
+                                                 SubscriptionEvent event);
+
+    /**
      * Method called when a message arrives for a subscribe Channel and we want
      * to deliver it asynchronously via the registered MessageHandler (should
      * not be null when called here).

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java?rev=1405028&r1=1405027&r2=1405028&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java Fri Nov  2 16:14:59 2012
@@ -29,6 +29,7 @@ import com.google.protobuf.ByteString;
 import org.apache.hedwig.client.api.Client;
 import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.netty.impl.simple.SimpleHChannelManager;
+import org.apache.hedwig.client.netty.impl.multiplex.MultiplexHChannelManager;
 
 /**
  * This is a top level Hedwig Client class that encapsulates the common
@@ -73,8 +74,11 @@ public class HedwigClientImpl implements
     protected HedwigClientImpl(ClientConfiguration cfg, ChannelFactory socketFactory) {
         this.cfg = cfg;
         this.socketFactory = socketFactory;
-        channelManager = new SimpleHChannelManager(cfg, socketFactory);
-
+        if (cfg.isMultiplexingEnabled()) {
+            channelManager = new MultiplexHChannelManager(cfg, socketFactory);
+        } else {
+            channelManager = new SimpleHChannelManager(cfg, socketFactory);
+        }
         pub = new HedwigPublisher(this);
         sub = new HedwigSubscriber(this);
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java?rev=1405028&r1=1405027&r2=1405028&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java Fri Nov  2 16:14:59 2012
@@ -43,7 +43,9 @@ import org.apache.hedwig.exceptions.PubS
 import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException;
 import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
 import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEventResponse;
 import static org.apache.hedwig.util.VarArgs.va;
 
 @ChannelPipelineCoverage("all")
@@ -119,6 +121,27 @@ public class HChannelHandler extends Sim
             return;
         }
 
+        // Process Subscription Events
+        if (response.hasResponseBody()) {
+            ResponseBody resp = response.getResponseBody();
+            // A special subscription event indicates the state of a subscriber
+            if (resp.hasSubscriptionEvent()) {
+                if (null == subHandler) {
+                    logger.error("Received subscription event from a non-subscription channel : {}",
+                                 response); 
+                } else {
+                    SubscriptionEventResponse eventResp = resp.getSubscriptionEvent();
+                    logger.debug("Received subscription event {} for (topic:{}, subscriber:{}).",
+                                 va(eventResp.getEvent(), response.getTopic(),
+                                    response.getSubscriberId()));
+                    subHandler.handleSubscriptionEvent(response.getTopic(),
+                                                       response.getSubscriberId(),
+                                                       eventResp.getEvent());
+                }
+                return;
+            }
+        }
+
         // Response is an ack to a prior PubSubRequest so first retrieve the
         // PubSub data for this txn.
         PubSubData pubSubData = txn2PubSubData.remove(response.getTxnId());

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java?rev=1405028&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java Fri Nov  2 16:14:59 2012
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl.multiplex;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+import org.apache.hedwig.client.netty.CleanupChannelMap;
+import org.apache.hedwig.client.netty.HChannel;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
+import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
+import org.apache.hedwig.client.netty.impl.HChannelHandler;
+import org.apache.hedwig.client.netty.impl.HChannelImpl;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
+
+
+/**
+ * Multiplex HChannel Manager which establish a connection for multi subscriptions.
+ */
+public class MultiplexHChannelManager extends AbstractHChannelManager {
+
+    static final Logger logger = LoggerFactory.getLogger(MultiplexHChannelManager.class);
+
+    // Find which HChannel that a given TopicSubscriber used.
+    protected final CleanupChannelMap<InetSocketAddress> subscriptionChannels;
+
+    // Concurrent Map to store Message handler for each topic + sub id combination.
+    // Store it here instead of in SubscriberResponseHandler as we don't want to lose the handler
+    // user set when connection is recovered
+    protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler
+        = new ConcurrentHashMap<TopicSubscriber, MessageHandler>();
+
+    // PipelineFactory to create subscription netty channels to the appropriate server
+    private final ClientChannelPipelineFactory subscriptionChannelPipelineFactory;
+
+    public MultiplexHChannelManager(ClientConfiguration cfg,
+                                    ChannelFactory socketFactory) {
+        super(cfg, socketFactory);
+        subscriptionChannels = new CleanupChannelMap<InetSocketAddress>();
+        subscriptionChannelPipelineFactory =
+            new MultiplexSubscriptionChannelPipelineFactory(cfg, this);
+    }
+
+    @Override
+    protected ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory() {
+        return subscriptionChannelPipelineFactory;
+    }
+
+    @Override
+    protected HChannel createAndStoreSubscriptionChannel(Channel channel) {
+        // store the channel connected to target host for future usage
+        InetSocketAddress host = NetUtils.getHostFromChannel(channel);
+        HChannel newHChannel = new HChannelImpl(host, channel, this,
+                                                getSubscriptionChannelPipelineFactory());
+        return storeSubscriptionChannel(host, newHChannel);
+    }
+
+    @Override
+    protected HChannel createAndStoreSubscriptionChannel(InetSocketAddress host) {
+        HChannel newHChannel = new HChannelImpl(host, this,
+                                                getSubscriptionChannelPipelineFactory());
+        return storeSubscriptionChannel(host, newHChannel);
+    }
+
+    private HChannel storeSubscriptionChannel(InetSocketAddress host,
+                                              HChannel newHChannel) {
+        // here, we guarantee there is only one channel used to communicate with target
+        // host.
+        return subscriptionChannels.addChannel(host, newHChannel);
+    }
+
+    @Override
+    protected HChannel getSubscriptionChannel(InetSocketAddress host) {
+        return subscriptionChannels.getChannel(host);
+    }
+
+    protected HChannel getSubscriptionChannel(TopicSubscriber subscriber) {
+        InetSocketAddress host = topic2Host.get(subscriber.getTopic());
+        if (null == host) {
+            // we don't know where is the owner of the topic
+            return null;
+        } else {
+            return getSubscriptionChannel(host);
+        }
+    }
+
+    @Override
+    protected HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber subscriber) {
+        InetSocketAddress host = topic2Host.get(subscriber.getTopic());
+        if (null == host) {
+            // we don't know where is the topic
+            return null;
+        } else {
+            // we had know which server owned the topic
+            HChannel channel = getSubscriptionChannel(host);
+            if (null == channel) {
+                // create a channel to connect to sepcified host
+                channel = createAndStoreSubscriptionChannel(host);
+            }
+            return channel;
+        }
+    }
+
+    @Override
+    protected void onSubscriptionChannelDisconnected(InetSocketAddress host,
+                                                     Channel channel) {
+        HChannel hChannel = subscriptionChannels.getChannel(host);
+        if (null == hChannel) {
+            return;
+        }
+        Channel underlyingChannel = hChannel.getChannel();
+        if (null == underlyingChannel ||
+            !underlyingChannel.equals(channel)) {
+            return;
+        }
+        logger.info("Subscription Channel {} disconnected from {}.",
+                    va(channel, host));
+        // remove existed channel
+        if (subscriptionChannels.removeChannel(host, hChannel)) {
+            try {
+                HChannelHandler channelHandler =
+                    HChannelImpl.getHChannelHandlerFromChannel(channel);
+                channelHandler.getSubscribeResponseHandler()
+                              .onChannelDisconnected(host, channel);
+            } catch (NoResponseHandlerException nrhe) {
+                logger.warn("No Channel Handler found for channel {} when it disconnected.",
+                            channel);
+            }
+        }
+    }
+
+    @Override
+    public SubscribeResponseHandler getSubscribeResponseHandler(TopicSubscriber topicSubscriber) {
+        HChannel hChannel = getSubscriptionChannel(topicSubscriber);
+        if (null == hChannel) {
+            return null;
+        }
+        Channel channel = hChannel.getChannel();
+        if (null == channel) {
+            return null;
+        }
+        try {
+            HChannelHandler channelHandler =
+                HChannelImpl.getHChannelHandlerFromChannel(channel);
+            return channelHandler.getSubscribeResponseHandler();
+        } catch (NoResponseHandlerException nrhe) {
+            logger.warn("No Channel Handler found for channel {}, topic subscriber {}.",
+                        channel, topicSubscriber);
+            return null;
+        }
+    }
+
+    @Override
+    public void startDelivery(TopicSubscriber topicSubscriber,
+                              MessageHandler messageHandler)
+        throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+        startDelivery(topicSubscriber, messageHandler, false);
+    }
+
+    protected void restartDelivery(TopicSubscriber topicSubscriber)
+        throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+        startDelivery(topicSubscriber, null, true);
+    }
+
+    private void startDelivery(TopicSubscriber topicSubscriber,
+                               MessageHandler messageHandler,
+                               boolean restart)
+        throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+        // Make sure we know about this topic subscription on the client side
+        // exists. The assumption is that the client should have in memory the
+        // Channel created for the TopicSubscriber once the server has sent
+        // an ack response to the initial subscribe request.
+        SubscribeResponseHandler subscribeResponseHandler =
+            getSubscribeResponseHandler(topicSubscriber);
+        if (null == subscribeResponseHandler ||
+            !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
+            logger.error("Client is not yet subscribed to {}.", topicSubscriber);
+            throw new ClientNotSubscribedException("Client is not yet subscribed to "
+                                                   + topicSubscriber);
+        }
+
+        MessageHandler existedMsgHandler = topicSubscriber2MessageHandler.get(topicSubscriber);
+        if (restart) {
+            // restart using existing msg handler 
+            messageHandler = existedMsgHandler;
+        } else {
+            // some has started delivery but not stop it
+            if (null != existedMsgHandler) {
+                throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber);
+            }
+            if (messageHandler != null) {
+                if (null != topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) {
+                    throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber);
+                }
+            }
+        }
+
+        // tell subscribe response handler to start delivering messages for topicSubscriber
+        subscribeResponseHandler.startDelivery(topicSubscriber, messageHandler);
+    }
+
+    public void stopDelivery(TopicSubscriber topicSubscriber)
+    throws ClientNotSubscribedException {
+        // Make sure we know that this topic subscription on the client side
+        // exists. The assumption is that the client should have in memory the
+        // Channel created for the TopicSubscriber once the server has sent
+        // an ack response to the initial subscribe request.
+        SubscribeResponseHandler subscribeResponseHandler =
+            getSubscribeResponseHandler(topicSubscriber);
+        if (null == subscribeResponseHandler ||
+            !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
+            logger.error("Client is not yet subscribed to {}.", topicSubscriber);
+            throw new ClientNotSubscribedException("Client is not yet subscribed to "
+                                                   + topicSubscriber);
+        }
+
+        // tell subscribe response handler to stop delivering messages for a given topic subscriber
+        topicSubscriber2MessageHandler.remove(topicSubscriber);
+        subscribeResponseHandler.stopDelivery(topicSubscriber);
+    }
+                            
+
+    @Override
+    public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
+                                       final Callback<ResponseBody> callback,
+                                       final Object context) {
+        SubscribeResponseHandler subscribeResponseHandler =
+            getSubscribeResponseHandler(topicSubscriber);
+        if (null == subscribeResponseHandler ||
+            !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
+            logger.warn("Trying to close a subscription when we don't have a subscription channel cached for {}",
+                        topicSubscriber);
+            callback.operationFinished(context, (ResponseBody)null);
+            return;
+        }
+        subscribeResponseHandler.asyncCloseSubscription(topicSubscriber, callback, context);
+    }
+
+    @Override
+    protected void checkTimeoutRequestsOnSubscriptionChannels() {
+        // timeout task may be started before constructing subscriptionChannels
+        if (null == subscriptionChannels) {
+            return;
+        }
+        for (HChannel channel : subscriptionChannels.getChannels()) {
+            try {
+                HChannelHandler channelHandler =
+                    HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel());
+                channelHandler.checkTimeoutRequests();
+            } catch (NoResponseHandlerException nrhe) {
+                continue;
+            }
+        }
+    }
+
+    @Override
+    protected void closeSubscriptionChannels() {
+        subscriptionChannels.close();
+    }
+
+}

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java?rev=1405028&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java Fri Nov  2 16:14:59 2012
@@ -0,0 +1,622 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl.multiplex;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.protobuf.ByteString;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.MessageConsumeData;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+import org.apache.hedwig.client.netty.HChannelManager;
+import org.apache.hedwig.client.netty.HChannel;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.client.netty.FilterableMessageHandler;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException;
+import org.apache.hedwig.filter.ClientMessageFilter;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
+import org.apache.hedwig.protoextensions.MessageIdUtils;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.SubscriptionListener;
+import static org.apache.hedwig.util.VarArgs.va;
+
+public class MultiplexSubscribeResponseHandler extends SubscribeResponseHandler {
+
+    private static Logger logger =
+        LoggerFactory.getLogger(MultiplexSubscribeResponseHandler.class);
+
+    class ActiveSubscriber implements SubscriptionListener {
+        private final TopicSubscriber topicSubscriber;
+        private final PubSubData op;
+        private final SubscriptionPreferences preferences;
+
+        // the underlying netty channel to send request
+        private final Channel channel;
+
+        // Counter for the number of consumed messages so far to buffer up before we
+        // send the Consume message back to the server along with the last/largest
+        // message seq ID seen so far in that batch.
+        private int numConsumedMessagesInBuffer = 0;
+        private MessageSeqId lastMessageSeqId;
+
+        // Message Handler
+        private MessageHandler msgHandler;
+
+        // Queue used for subscribes when the MessageHandler hasn't been registered
+        // yet but we've already received subscription messages from the server.
+        // This will be lazily created as needed.
+        private Queue<Message> msgQueue = new LinkedList<Message>();
+
+        ActiveSubscriber(TopicSubscriber ts, PubSubData op,
+                        SubscriptionPreferences preferences,
+                        Channel channel) {
+            this.topicSubscriber = ts;
+            this.op = op;
+            this.preferences = preferences;
+            this.channel = channel;
+        }
+
+        PubSubData getPubSubData() {
+            return this.op;
+        }
+
+        TopicSubscriber getTopicSubscriber() {
+            return this.topicSubscriber;
+        }
+
+        synchronized boolean updateLastMessageSeqId(MessageSeqId seqId) {
+            if (null != lastMessageSeqId &&
+                seqId.getLocalComponent() <= lastMessageSeqId.getLocalComponent()) {
+                return false;
+            }
+            ++numConsumedMessagesInBuffer;
+            lastMessageSeqId = seqId;
+            if (numConsumedMessagesInBuffer >= cfg.getConsumedMessagesBufferSize()) {
+                numConsumedMessagesInBuffer = 0;
+                lastMessageSeqId = null;
+                return true;
+            }
+            return false;
+        }
+
+        synchronized void startDelivery(MessageHandler messageHandler)
+        throws AlreadyStartDeliveryException, ClientNotSubscribedException {
+            if (null != this.msgHandler) {
+                throw new AlreadyStartDeliveryException("A message handler " + msgHandler 
+                    + " has been started for " + topicSubscriber);
+            }
+            if (null != messageHandler && messageHandler instanceof FilterableMessageHandler) {
+                FilterableMessageHandler filterMsgHandler =
+                    (FilterableMessageHandler) messageHandler;
+                if (filterMsgHandler.hasMessageFilter()) {
+                    if (null == preferences) {
+                        // no preferences means talking to an old version hub server
+                        logger.warn("Start delivering messages with filter but no subscription "
+                                  + "preferences found. It might due to talking to an old version"
+                                  + " hub server.");
+                        // use the original message handler.
+                        messageHandler = filterMsgHandler.getMessageHandler();
+                    } else {
+                        // pass subscription preferences to message filter
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Start delivering messages with filter on {}, preferences: {}",
+                                         va(topicSubscriber,
+                                            SubscriptionStateUtils.toString(preferences)));
+                        }
+                        ClientMessageFilter msgFilter = filterMsgHandler.getMessageFilter();
+                        msgFilter.setSubscriptionPreferences(topicSubscriber.getTopic(),
+                                                             topicSubscriber.getSubscriberId(),
+                                                             preferences);
+                    }
+                }
+            }
+
+            this.msgHandler = messageHandler;
+            // Once the MessageHandler is registered, see if we have any queued up
+            // subscription messages sent to us already from the server. If so,
+            // consume those first. Do this only if the MessageHandler registered is
+            // not null (since that would be the HedwigSubscriber.stopDelivery
+            // call).
+            if (null == msgHandler) {
+                return;
+            }
+            if (msgQueue.size() > 0) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Consuming {} queued up messages for {}",
+                                 va(msgQueue.size(), topicSubscriber));
+                }
+                for (Message message : msgQueue) {
+                    asyncMessageDeliver(message);
+                }
+                // Now we can remove the queued up messages since they are all
+                // consumed.
+                msgQueue.clear();
+            }
+        }
+
+        synchronized void stopDelivery() {
+            this.msgHandler = null;
+        }
+
+        synchronized void handleMessage(Message message) {
+            if (null != msgHandler) {
+                asyncMessageDeliver(message);
+            } else {
+                // MessageHandler has not yet been registered so queue up these
+                // messages for the Topic Subscription. Make the initial lazy
+                // creation of the message queue thread safe just so we don't
+                // run into a race condition where two simultaneous threads process
+                // a received message and both try to create a new instance of
+                // the message queue. Performance overhead should be okay
+                // because the delivery of the topic has not even started yet
+                // so these messages are not consumed and just buffered up here.
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Message {} has arrived but no MessageHandler provided for {}"
+                                 + " yet so queueing up the message.",
+                                 va(MessageIdUtils.msgIdToReadableString(message.getMsgId()),
+                                    topicSubscriber));
+                }
+                msgQueue.add(message);
+            }
+        }
+
+        synchronized void asyncMessageDeliver(Message message) {
+            if (null == msgHandler) {
+                logger.error("No message handler found to deliver message {} to {}.",
+                             va(MessageIdUtils.msgIdToReadableString(message.getMsgId()),
+                                topicSubscriber));
+                return;
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("Call the client app's MessageHandler asynchronously to deliver the message {} to {}",
+                             va(message, topicSubscriber));
+            }
+            MessageConsumeData messageConsumeData =
+                new MessageConsumeData(topicSubscriber, message);
+            msgHandler.deliver(topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(),
+                               message, sChannelManager.getConsumeCallback(),
+                               messageConsumeData);
+        }
+
+        void consume(final MessageSeqId messageSeqId) {
+            PubSubRequest.Builder pubsubRequestBuilder =
+                NetUtils.buildConsumeRequest(sChannelManager.nextTxnId(),
+                                             topicSubscriber, messageSeqId);  
+
+            // For Consume requests, we will send them from the client in a fire and
+            // forget manner. We are not expecting the server to send back an ack
+            // response so no need to register this in the ResponseHandler. There
+            // are no callbacks to invoke since this isn't a client initiated
+            // action. Instead, just have a future listener that will log an error
+            // message if there was a problem writing the consume request.
+            if (logger.isDebugEnabled()) {
+                logger.debug("Writing a Consume request to host: {} with messageSeqId: {} for {}",
+                             va(NetUtils.getHostFromChannel(channel),
+                                messageSeqId, topicSubscriber));
+            }
+            ChannelFuture future = channel.write(pubsubRequestBuilder.build());
+            future.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (!future.isSuccess()) {
+                        logger.error("Error writing a Consume request to host: {} with messageSeqId: {} for {}",
+                                     va(host, messageSeqId, topicSubscriber));
+                    }
+                }
+            });
+        }
+
+        @Override
+        public void processEvent(ByteString topic, ByteString subscriberId,
+                                 SubscriptionEvent event) {
+            switch (event) {
+            // for all cases we need to resubscribe for the subscription
+            case TOPIC_MOVED:
+                sChannelManager.clearHostForTopic(topic, NetUtils.getHostFromChannel(channel));
+                resubscribeIfNecessary(event);
+                break;
+            case SUBSCRIPTION_FORCED_CLOSED:
+                resubscribeIfNecessary(event);
+                break;
+            default:
+                logger.error("Receive unknown subscription event {} for {}.",
+                             va(event, topicSubscriber));
+            }
+        }
+
+        private void resubscribeIfNecessary(SubscriptionEvent event) {
+            // if subscriber has been changed, we don't need to resubscribe
+            if (!subscriptions.remove(topicSubscriber, this)) {
+                return;
+            }
+            if (!op.options.getEnableResubscribe()) {
+                sChannelManager.getSubscriptionEventEmitter().emitSubscriptionEvent(
+                    topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(), event);
+                return;
+            }
+            // Since the connection to the server host that was responsible
+            // for the topic died, we are not sure about the state of that
+            // server. Resend the original subscribe request data to the default
+            // server host/VIP. Also clear out all of the servers we've
+            // contacted or attempted to from this request as we are starting a
+            // "fresh" subscribe request.
+            op.clearServersList();
+            // Set a new type of VoidCallback for this async call. We need this
+            // hook so after the resubscribe has completed, delivery for
+            // that topic subscriber should also be restarted (if it was that
+            // case before the channel disconnect).
+            final long retryWaitTime = cfg.getSubscribeReconnectRetryWaitTime();
+            ResubscribeCallback resubscribeCb =
+                new ResubscribeCallback(topicSubscriber, op,
+                                        sChannelManager, retryWaitTime);
+            op.setCallback(resubscribeCb);
+            op.context = null;
+            if (logger.isDebugEnabled()) {
+                logger.debug("Resubscribe {} with origSubData {}",
+                             va(topicSubscriber, op));
+            }
+            // resubmit the request
+            sChannelManager.submitOp(op);
+        }
+    }
+
+    protected final ReentrantReadWriteLock disconnectLock =
+        new ReentrantReadWriteLock();
+
+    // the underlying subscription channel
+    volatile HChannel hChannel;
+    InetSocketAddress host; 
+    protected final ConcurrentMap<TopicSubscriber, ActiveSubscriber> subscriptions
+        = new ConcurrentHashMap<TopicSubscriber, ActiveSubscriber>();
+    private final MultiplexHChannelManager sChannelManager;
+
+    protected MultiplexSubscribeResponseHandler(ClientConfiguration cfg,
+                                                HChannelManager channelManager) {
+        super(cfg, channelManager);
+        sChannelManager = (MultiplexHChannelManager) channelManager;
+    }
+
+    protected HChannelManager getHChannelManager() {
+        return this.sChannelManager;
+    }
+
+    protected ClientConfiguration getConfiguration() {
+        return cfg;
+    }
+
+    private ActiveSubscriber getActiveSubscriber(TopicSubscriber ts) {
+        return subscriptions.get(ts);
+    }
+
+    @Override
+    public void handleResponse(PubSubResponse response, PubSubData pubSubData,
+                               Channel channel) throws Exception {
+        if (null == hChannel) {
+            host = NetUtils.getHostFromChannel(channel);
+            hChannel = sChannelManager.getSubscriptionChannel(host);
+            if (null == hChannel ||
+                !channel.equals(hChannel.getChannel())) {
+                PubSubException pse =
+                    new UnexpectedConditionException("Failed to get subscription channel of " + host);
+                pubSubData.getCallback().operationFailed(pubSubData.context, pse);
+                return;
+            }
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("Handling a Subscribe response: {}, pubSubData: {}, host: {}.",
+                         va(response, pubSubData, NetUtils.getHostFromChannel(channel)));
+        }
+        switch (response.getStatusCode()) {
+        case SUCCESS:
+            TopicSubscriber ts = new TopicSubscriber(pubSubData.topic,
+                                                     pubSubData.subscriberId);
+            SubscriptionPreferences preferences = null;
+            if (response.hasResponseBody()) {
+                ResponseBody respBody = response.getResponseBody(); 
+                if (respBody.hasSubscribeResponse()) {
+                    SubscribeResponse resp = respBody.getSubscribeResponse();
+                    if (resp.hasPreferences()) {
+                        preferences = resp.getPreferences();
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Receive subscription preferences for {} : {}",
+                                         va(ts,
+                                            SubscriptionStateUtils.toString(preferences)));
+                        }
+                    }
+                }
+            }
+
+            ActiveSubscriber ss = new ActiveSubscriber(ts, pubSubData, preferences,
+                                                       channel);
+
+            boolean success = false;
+            // Store the Subscribe state
+            disconnectLock.readLock().lock();
+            try {
+                ActiveSubscriber oldSS = subscriptions.putIfAbsent(ts, ss);
+                if (null != oldSS) {
+                    logger.warn("Subscribe {} has existed in channel {}.",
+                                va(ts, channel));
+                    success = false;
+                } else {
+                    logger.debug("Succeed to add subscription {} in channel {}.",
+                                 va(ts, channel));
+                    success = true;
+                }
+            } finally {
+                disconnectLock.readLock().unlock();
+            }
+            if (success) {
+                // Response was success so invoke the callback's operationFinished
+                // method.
+                pubSubData.getCallback().operationFinished(pubSubData.context, null);
+            } else {
+                ClientAlreadySubscribedException exception =
+                    new ClientAlreadySubscribedException("Client is already subscribed for " + ts);
+                pubSubData.getCallback().operationFailed(pubSubData.context, exception);
+            }
+            break;
+        case CLIENT_ALREADY_SUBSCRIBED:
+            // For Subscribe requests, the server says that the client is
+            // already subscribed to it.
+            pubSubData.getCallback().operationFailed(pubSubData.context, new ClientAlreadySubscribedException(
+                                                     "Client is already subscribed for topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: "
+                                                     + pubSubData.subscriberId.toStringUtf8()));
+            break;
+        case SERVICE_DOWN:
+            // Response was service down failure so just invoke the callback's
+            // operationFailed method.
+            pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
+                                                     "Server responded with a SERVICE_DOWN status"));
+            break;
+        case NOT_RESPONSIBLE_FOR_TOPIC:
+            // Redirect response so we'll need to repost the original Subscribe
+            // Request
+            handleRedirectResponse(response, pubSubData, channel);
+            break;
+        default:
+            // Consider all other status codes as errors, operation failed
+            // cases.
+            logger.error("Unexpected error response from server for PubSubResponse: " + response);
+            pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
+                                                     "Server responded with a status code of: " + response.getStatusCode()));
+            break;
+        }
+    }
+
+    @Override
+    public void handleSubscribeMessage(PubSubResponse response) {
+        Message message = response.getMessage();
+        TopicSubscriber ts = new TopicSubscriber(response.getTopic(),
+                                                 response.getSubscriberId());
+        if (logger.isDebugEnabled()) {
+            logger.debug("Handling a Subscribe message in response: {}, {}",
+                         va(response, ts));
+        }
+        ActiveSubscriber ss = getActiveSubscriber(ts);
+        if (null == ss) {
+            logger.error("Subscriber {} is not found receiving its message {}.",
+                         va(ts, MessageIdUtils.msgIdToReadableString(message.getMsgId())));
+            return;
+        }
+        ss.handleMessage(message);
+    }
+
+    @Override
+    protected void asyncMessageDeliver(TopicSubscriber topicSubscriber,
+                                       Message message) {
+        ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
+        if (null == ss) {
+            logger.error("Subscriber {} is not found delivering its message {}.",
+                         va(topicSubscriber,
+                            MessageIdUtils.msgIdToReadableString(message.getMsgId())));
+            return;
+        }
+        ss.asyncMessageDeliver(message);
+    }
+
+    @Override
+    protected void messageConsumed(TopicSubscriber topicSubscriber,
+                                   Message message) {
+        ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
+        if (null == ss) {
+            logger.warn("Subscriber {} is not found consumed its message {}.",
+                        va(topicSubscriber,
+                           MessageIdUtils.msgIdToReadableString(message.getMsgId())));
+            return;
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("Message has been successfully consumed by the client app : {}, {}",
+                         va(message, topicSubscriber));
+        }
+        // For consume response to server, there is a config param on how many
+        // messages to consume and buffer up before sending the consume request.
+        // We just need to keep a count of the number of messages consumed
+        // and the largest/latest msg ID seen so far in this batch. Messages
+        // should be delivered in order and without gaps. Do this only if
+        // auto-sending of consume messages is enabled.
+        if (cfg.isAutoSendConsumeMessageEnabled()) {
+            // Update these variables only if we are auto-sending consume
+            // messages to the server. Otherwise the onus is on the client app
+            // to call the Subscriber consume API to let the server know which
+            // messages it has successfully consumed.
+            if (ss.updateLastMessageSeqId(message.getMsgId())) {
+                // Send the consume request and reset the consumed messages buffer
+                // variables. We will use the same Channel created from the
+                // subscribe request for the TopicSubscriber.
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Consume message {} when reaching consumed message buffer limit.",
+                                 message.getMsgId());
+                }
+                ss.consume(message.getMsgId());
+            }
+        }
+    }
+
+    @Override
+    public void handleSubscriptionEvent(ByteString topic, ByteString subscriberId,
+                                        SubscriptionEvent event) {
+        TopicSubscriber ts = new TopicSubscriber(topic, subscriberId);
+        ActiveSubscriber ss = getActiveSubscriber(ts);
+        if (null == ss) {
+            logger.warn("No subscription {} found receiving subscription event {}.",
+                        va(ts, event));
+            return;
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("Received subscription event {} for ({}).",
+                         va(event, ts));
+        }
+        ss.processEvent(topic, subscriberId, event);
+    }
+
+    @Override
+    public void startDelivery(final TopicSubscriber topicSubscriber,
+                              MessageHandler messageHandler)
+    throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+        ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
+        if (null == ss) {
+            throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("Start delivering message for {} using message handler {}",
+                         va(topicSubscriber, messageHandler));
+        }
+        ss.startDelivery(messageHandler); 
+    }
+
+    @Override
+    public void stopDelivery(final TopicSubscriber topicSubscriber)
+    throws ClientNotSubscribedException {
+        ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
+        if (null == ss) {
+            throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("Stop delivering messages for {}", topicSubscriber);
+        }
+        ss.stopDelivery();
+    }
+
+    @Override
+    public boolean hasSubscription(TopicSubscriber topicSubscriber) {
+        return subscriptions.containsKey(topicSubscriber);
+    }
+
+    @Override
+    public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
+                                       final Callback<ResponseBody> callback,
+                                       final Object context) {
+        final ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
+        if (null == ss || null == hChannel) {
+            logger.debug("No subscription {} found when closing its subscription from {}.",
+                         va(topicSubscriber, hChannel));
+            callback.operationFinished(context, (ResponseBody)null);
+            return;
+        }
+        Callback<ResponseBody> closeCb = new Callback<ResponseBody>() {
+            @Override
+            public void operationFinished(Object ctx, ResponseBody respBody) {
+                disconnectLock.readLock().lock();
+                try {
+                    subscriptions.remove(topicSubscriber, ss);
+                } finally {
+                    disconnectLock.readLock().unlock();
+                }
+                callback.operationFinished(context, null);
+            }
+
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                callback.operationFailed(context, exception);
+            }
+        };
+        PubSubData closeOp = new PubSubData(topicSubscriber.getTopic(), null,
+                                            topicSubscriber.getSubscriberId(),
+                                            OperationType.CLOSESUBSCRIPTION,
+                                            null, closeCb, context);
+        hChannel.submitOp(closeOp);
+    }
+
+    @Override
+    public void consume(final TopicSubscriber topicSubscriber,
+                        final MessageSeqId messageSeqId) {
+        ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
+        if (null == ss) {
+            logger.warn("Subscriber {} is not found consuming message {}.",
+                        va(topicSubscriber,
+                           MessageIdUtils.msgIdToReadableString(messageSeqId)));
+            return;
+        }
+        ss.consume(messageSeqId); 
+    }
+
+    @Override
+    public void onChannelDisconnected(InetSocketAddress host, Channel channel) {
+        disconnectLock.writeLock().lock();
+        try {
+            onDisconnect(host);
+        } finally {
+            disconnectLock.writeLock().unlock();
+        }
+    }
+
+    private void onDisconnect(InetSocketAddress host) {
+        for (ActiveSubscriber ss : subscriptions.values()) {
+            onDisconnect(ss, host);
+        }
+    }
+
+    private void onDisconnect(ActiveSubscriber ss, InetSocketAddress host) {
+        TopicSubscriber ts = ss.getTopicSubscriber();
+        logger.info("Subscription channel for ({}) is disconnected.", ts);
+        ss.processEvent(ts.getTopic(), ts.getSubscriberId(),
+                        SubscriptionEvent.TOPIC_MOVED);
+    }
+
+}

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java?rev=1405028&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java Fri Nov  2 16:14:59 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl.multiplex;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.handlers.AbstractResponseHandler;
+import org.apache.hedwig.client.handlers.CloseSubscriptionResponseHandler;
+import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
+import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
+import org.apache.hedwig.client.netty.impl.HChannelHandler;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+
+public class MultiplexSubscriptionChannelPipelineFactory extends ClientChannelPipelineFactory {
+
+    public MultiplexSubscriptionChannelPipelineFactory(ClientConfiguration cfg,
+                                                       MultiplexHChannelManager channelManager) {
+        super(cfg, channelManager);
+    }
+
+    @Override
+    protected Map<OperationType, AbstractResponseHandler> createResponseHandlers() {
+        Map<OperationType, AbstractResponseHandler> handlers =
+            new HashMap<OperationType, AbstractResponseHandler>();
+        handlers.put(OperationType.SUBSCRIBE,
+                     new MultiplexSubscribeResponseHandler(cfg, channelManager));
+        handlers.put(OperationType.CLOSESUBSCRIPTION,
+                     new CloseSubscriptionResponseHandler(cfg, channelManager));
+        return handlers;
+    }
+
+}

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/ResubscribeCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/ResubscribeCallback.java?rev=1405028&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/ResubscribeCallback.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/ResubscribeCallback.java Fri Nov  2 16:14:59 2012
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl.multiplex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
+
+/**
+ * This class is used when a Subscribe channel gets disconnected and we attempt
+ * to resubmit subscribe requests existed in that channel. Once the resubscribe
+ * the topic is completed, we need to restart delivery for that topic.
+ */
+class ResubscribeCallback implements Callback<ResponseBody> {
+
+    private static Logger logger = LoggerFactory.getLogger(ResubscribeCallback.class);
+
+    // Private member variables
+    private final TopicSubscriber origTopicSubscriber;
+    private final PubSubData origSubData;
+    private final MultiplexHChannelManager channelManager;
+    private final long retryWaitTime;
+
+    // Constructor
+    ResubscribeCallback(TopicSubscriber origTopicSubscriber,
+                        PubSubData origSubData,
+                        MultiplexHChannelManager channelManager,
+                        long retryWaitTime) {
+        this.origTopicSubscriber = origTopicSubscriber;
+        this.origSubData = origSubData;
+        this.channelManager = channelManager;
+        this.retryWaitTime = retryWaitTime;
+    }
+
+    @Override
+    public void operationFinished(Object ctx, ResponseBody resultOfOperation) {
+        if (logger.isDebugEnabled())
+            logger.debug("Resubscribe succeeded for origSubData: " + origSubData);
+        // Now we want to restart delivery for the subscription channel only
+        // if delivery was started at the time the original subscribe channel
+        // was disconnected.
+        try {
+            channelManager.restartDelivery(origTopicSubscriber);
+        } catch (ClientNotSubscribedException e) {
+            // This exception should never be thrown here but just in case,
+            // log an error and just keep retrying the subscribe request.
+            logger.error("Subscribe was successful but error starting delivery for {} : {}",
+                         va(origTopicSubscriber, e.getMessage()));
+            retrySubscribeRequest();
+        } catch (AlreadyStartDeliveryException asde) {
+            // should not reach here
+        }
+    }
+
+    @Override
+    public void operationFailed(Object ctx, PubSubException exception) {
+        // If the resubscribe fails, just keep retrying the subscribe
+        // request. There isn't a way to flag to the application layer that
+        // a topic subscription has failed. So instead, we'll just keep
+        // retrying in the background until success.
+        logger.error("Resubscribe failed with error: " + exception.getMessage());
+        retrySubscribeRequest();
+    }
+
+    private void retrySubscribeRequest() {
+        origSubData.clearServersList();
+        logger.debug("Resubmit subscribe request for {} in {} ms later.",
+                     va(origTopicSubscriber, retryWaitTime));
+        channelManager.submitOpAfterDelay(origSubData, retryWaitTime);
+    }
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java?rev=1405028&r1=1405027&r2=1405028&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java Fri Nov  2 16:14:59 2012
@@ -247,6 +247,21 @@ public class SimpleSubscribeResponseHand
     }
 
     @Override
+    public void handleSubscriptionEvent(ByteString topic, ByteString subscriberId,
+                                        SubscriptionEvent event) {
+        Channel channel;
+        synchronized (this) {
+            channel = subscribeChannel;
+        }
+        if (null == channel) {
+            logger.warn("No subscription channel found when receiving subscription event {} for (topic:{}, subscriber:{}).",
+                        va(event, topic, subscriberId));
+            return;
+        }
+        processSubscriptionEvent(event, NetUtils.getHostFromChannel(channel), channel);
+    }
+
+    @Override
     protected void asyncMessageDeliver(TopicSubscriber topicSubscriber,
                                        Message message) {
         if (logger.isDebugEnabled()) {
@@ -488,7 +503,20 @@ public class SimpleSubscribeResponseHand
         if (origTopicSubscriber == null) {
             return;
         }
-        sChannelManager.clearHostForTopic(origTopicSubscriber.getTopic(), host);
+        processSubscriptionEvent(SubscriptionEvent.TOPIC_MOVED, host, channel);
+    }
+
+    private void processSubscriptionEvent(final SubscriptionEvent event, InetSocketAddress host,
+                                          final Channel channel) {
+        if (SubscriptionEvent.TOPIC_MOVED != event &&
+            SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED != event) {
+            logger.warn("Ignore subscription event {} received from channel {}.",
+                        event, channel);
+            return;
+        }
+        if (SubscriptionEvent.TOPIC_MOVED == event) {
+            sChannelManager.clearHostForTopic(origTopicSubscriber.getTopic(), host);
+        }
         // clear subscription status
         sChannelManager.asyncCloseSubscription(origTopicSubscriber, new Callback<ResponseBody>() {
 
@@ -530,10 +558,10 @@ public class SimpleSubscribeResponseHand
                                  origTopicSubscriber, origSubData);
                     sChannelManager.submitOpToDefaultServer(origSubData);
                 } else {
-                    logger.info("Subscription channel for ({}) is disconnected.",
-                                origTopicSubscriber);
+                    logger.info("Subscription channel {} for ({}) is disconnected.",
+                                channel, origTopicSubscriber);
                     sChannelManager.getSubscriptionEventEmitter().emitSubscriptionEvent(
-                        origSubData.topic, origSubData.subscriberId, SubscriptionEvent.TOPIC_MOVED);
+                        origSubData.topic, origSubData.subscriberId, event);
                 }
             }
         }, null);