You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/27 01:52:20 UTC

svn commit: r1390777 [4/4] - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/data/ hedwig-client/src/main/java/org/apache/hedwig/client/handlers/ hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig...

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl.simple;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import com.google.protobuf.ByteString;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.MessageConsumeData;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+import org.apache.hedwig.client.netty.HChannelManager;
+import org.apache.hedwig.client.netty.HChannel;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.client.netty.FilterableMessageHandler;
+import org.apache.hedwig.client.netty.impl.HChannelImpl;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.filter.ClientMessageFilter;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
+
+public class SimpleSubscribeResponseHandler extends SubscribeResponseHandler {
+
+    private static Logger logger = LoggerFactory.getLogger(SimpleSubscribeResponseHandler.class);
+
+    // Member variables used when this ResponseHandler is for a Subscribe
+    // channel. We need to be able to consume messages sent back to us from
+    // the server, and to also recreate the Channel connection if it ever goes
+    // down. For that, we need to store the original PubSubData for the
+    // subscribe request, and also the MessageHandler that was registered when
+    // delivery of messages started for the subscription.
+    private volatile PubSubData origSubData;
+    private volatile TopicSubscriber origTopicSubscriber;
+    private SubscriptionPreferences preferences;
+    private Channel subscribeChannel;
+    private MessageHandler messageHandler;
+    // Counter for the number of consumed messages so far to buffer up before we
+    // send the Consume message back to the server along with the last/largest
+    // message seq ID seen so far in that batch.
+    private int numConsumedMessagesInBuffer = 0;
+    private MessageSeqId lastMessageSeqId;
+    // Queue used for subscribes when the MessageHandler hasn't been registered
+    // yet but we've already received subscription messages from the server.
+    // This will be lazily created as needed.
+    private Queue<Message> subscribeMsgQueue;
+    // Set to store all of the outstanding subscribed messages that are pending
+    // to be consumed by the client app's MessageHandler. If this ever grows too
+    // big (e.g. problem at the client end for message consumption), we can
+    // throttle things by temporarily setting the Subscribe Netty Channel
+    // to not be readable. When the Set has shrunk sufficiently, we can turn the
+    // channel back on to read new messages.
+    private Set<Message> outstandingMsgSet;
+
+    private SimpleHChannelManager sChannelManager;
+
+    protected SimpleSubscribeResponseHandler(ClientConfiguration cfg,
+                                             HChannelManager channelManager) {
+        super(cfg, channelManager);
+        sChannelManager = (SimpleHChannelManager) channelManager;
+    }
+
+    protected HChannelManager getHChannelManager() {
+        return this.sChannelManager;
+    }
+
+    protected ClientConfiguration getConfiguration() {
+        return cfg;
+    }
+
+    protected MessageHandler getMessageHandler() {
+        return messageHandler;
+    }
+
+    @Override
+    public void handleResponse(PubSubResponse response, PubSubData pubSubData,
+                               Channel channel) throws Exception {
+        // If this was not a successful response to the Subscribe request, we
+        // won't be using the Netty Channel created so just close it.
+        if (!response.getStatusCode().equals(StatusCode.SUCCESS)) {
+            HChannelImpl.getHChannelHandlerFromChannel(channel).closeExplicitly();
+            channel.close();
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Handling a Subscribe response: {}, pubSubData: {}, host: {}.",
+                         va(response, pubSubData, NetUtils.getHostFromChannel(channel)));
+        }
+        switch (response.getStatusCode()) {
+        case SUCCESS:
+            // Store the original PubSubData used to create this successful
+            // Subscribe request.
+            origSubData = pubSubData;
+            origTopicSubscriber = new TopicSubscriber(pubSubData.topic,
+                                                      pubSubData.subscriberId);
+            synchronized(this) {
+                // For successful Subscribe requests, store this Channel locally
+                // and set it to not be readable initially.
+                // This way we won't be delivering messages for this topic
+                // subscription until the client explicitly says so.
+                subscribeChannel = channel;
+                subscribeChannel.setReadable(false);
+                if (response.hasResponseBody()) {
+                    ResponseBody respBody = response.getResponseBody(); 
+                    if (respBody.hasSubscribeResponse()) {
+                        SubscribeResponse resp = respBody.getSubscribeResponse();
+                        if (resp.hasPreferences()) {
+                            preferences = resp.getPreferences();
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("Receive subscription preferences for {} : {}",
+                                             va(origTopicSubscriber,
+                                                SubscriptionStateUtils.toString(preferences)));
+                            }
+                        }
+                    }
+                }
+
+                // Store the mapping for the TopicSubscriber to the Channel.
+                // This is so we can control the starting and stopping of
+                // message deliveries from the server on that Channel. Store
+                // this only on a successful ack response from the server.
+                sChannelManager.storeSubscriptionChannel(origTopicSubscriber,
+                                                                    channel);
+
+                // Lazily create the Set (from a concurrent hashmap) to keep track
+                // of outstanding Messages to be consumed by the client app. At this
+                // stage, delivery for that topic hasn't started yet so creation of
+                // this Set should be thread safe. We'll create the Set with an initial
+                // capacity equal to the configured parameter for the maximum number of
+                // outstanding messages to allow. The load factor will be set to
+                // 1.0f which means we'll only rehash and allocate more space if
+                // we ever exceed the initial capacity. That should be okay
+                // because when that happens, things are slow already and piling
+                // up on the client app side to consume messages.
+                outstandingMsgSet = Collections.newSetFromMap(
+                        new ConcurrentHashMap<Message,Boolean>(
+                                cfg.getMaximumOutstandingMessages(), 1.0f));
+            }
+            // Response was success so invoke the callback's operationFinished
+            // method.
+            pubSubData.getCallback().operationFinished(pubSubData.context, null);
+            break;
+        case CLIENT_ALREADY_SUBSCRIBED:
+            // For Subscribe requests, the server says that the client is
+            // already subscribed to it.
+            pubSubData.getCallback().operationFailed(pubSubData.context, new ClientAlreadySubscribedException(
+                                                     "Client is already subscribed for topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: "
+                                                     + pubSubData.subscriberId.toStringUtf8()));
+            break;
+        case SERVICE_DOWN:
+            // Response was service down failure so just invoke the callback's
+            // operationFailed method.
+            pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
+                                                     "Server responded with a SERVICE_DOWN status"));
+            break;
+        case NOT_RESPONSIBLE_FOR_TOPIC:
+            // Redirect response so we'll need to repost the original Subscribe
+            // Request
+            handleRedirectResponse(response, pubSubData, channel);
+            break;
+        default:
+            // Consider all other status codes as errors, operation failed
+            // cases.
+            logger.error("Unexpected error response from server for PubSubResponse: {}", response);
+            pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
+                                                     "Server responded with a status code of: " + response.getStatusCode()));
+            break;
+        }
+    }
+
+    @Override
+    public void handleSubscribeMessage(PubSubResponse response) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Handling a Subscribe message in response: {}, {}",
+                         va(response, origTopicSubscriber));
+        }
+        Message message = response.getMessage();
+
+        synchronized (this) {
+            // Consume the message asynchronously that the client is subscribed
+            // to. Do this only if delivery for the subscription has started and
+            // a MessageHandler has been registered for the TopicSubscriber.
+            if (messageHandler != null) {
+                asyncMessageDeliver(origTopicSubscriber, message);
+            } else {
+                // MessageHandler has not yet been registered so queue up these
+                // messages for the Topic Subscription. Make the initial lazy
+                // creation of the message queue thread safe just so we don't
+                // run into a race condition where two simultaneous threads process
+                // a received message and both try to create a new instance of
+                // the message queue. Performance overhead should be okay
+                // because the delivery of the topic has not even started yet
+                // so these messages are not consumed and just buffered up here.
+                if (subscribeMsgQueue == null)
+                    subscribeMsgQueue = new LinkedList<Message>();
+                if (logger.isDebugEnabled()) {
+                    logger
+                    .debug("Message has arrived but Subscribe channel does not have a registered MessageHandler yet so queueing up the message: {}",
+                           message);
+                }
+                subscribeMsgQueue.add(message);
+            }
+        }
+    }
+
+    @Override
+    protected void asyncMessageDeliver(TopicSubscriber topicSubscriber,
+                                       Message message) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Call the client app's MessageHandler asynchronously to deliver the message {} to {}",
+                         va(message, topicSubscriber));
+        }
+        synchronized (this) { 
+            // Add this "pending to be consumed" message to the outstandingMsgSet.
+            outstandingMsgSet.add(message);
+            // Check if we've exceeded the max size for the outstanding message set.
+            if (outstandingMsgSet.size() >= cfg.getMaximumOutstandingMessages()
+                    && subscribeChannel.isReadable()) {
+                // Too many outstanding messages so throttle it by setting the Netty
+                // Channel to not be readable.
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Too many outstanding messages ({}) so throttling the subscribe netty Channel",
+                                 outstandingMsgSet.size());
+                }
+                subscribeChannel.setReadable(false);
+            }
+        }
+        MessageConsumeData messageConsumeData =
+            new MessageConsumeData(topicSubscriber, message);
+        messageHandler.deliver(topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(),
+                               message, sChannelManager.getConsumeCallback(),
+                               messageConsumeData);
+    }
+
+    @Override
+    protected synchronized void messageConsumed(TopicSubscriber topicSubscriber,
+                                                Message message) {
+        logger.debug("Message has been successfully consumed by the client app for message:  {}, {}",
+                     message, topicSubscriber);
+        // Update the consumed messages buffer variables
+        if (cfg.isAutoSendConsumeMessageEnabled()) {
+            // Update these variables only if we are auto-sending consume
+            // messages to the server. Otherwise the onus is on the client app
+            // to call the Subscriber consume API to let the server know which
+            // messages it has successfully consumed.
+            numConsumedMessagesInBuffer++;
+            lastMessageSeqId = message.getMsgId();
+        }
+        // Remove this consumed message from the outstanding Message Set.
+        outstandingMsgSet.remove(message);
+
+        // For consume response to server, there is a config param on how many
+        // messages to consume and buffer up before sending the consume request.
+        // We just need to keep a count of the number of messages consumed
+        // and the largest/latest msg ID seen so far in this batch. Messages
+        // should be delivered in order and without gaps. Do this only if
+        // auto-sending of consume messages is enabled.
+        if (cfg.isAutoSendConsumeMessageEnabled()
+                && numConsumedMessagesInBuffer >= cfg.getConsumedMessagesBufferSize()) {
+            // Send the consume request and reset the consumed messages buffer
+            // variables. We will use the same Channel created from the
+            // subscribe request for the TopicSubscriber.
+            if (logger.isDebugEnabled()) {
+                logger
+                .debug("Consumed message buffer limit reached so send the Consume Request to the server with lastMessageSeqId: {}",
+                       lastMessageSeqId);
+            }
+            consume(topicSubscriber, lastMessageSeqId);
+            numConsumedMessagesInBuffer = 0;
+            lastMessageSeqId = null;
+        }
+
+        // Check if we throttled message consumption previously when the
+        // outstanding message limit was reached. For now, only turn the
+        // delivery back on if there are no more outstanding messages to
+        // consume. We could make this a configurable parameter if needed.
+        if (!subscribeChannel.isReadable() && outstandingMsgSet.size() == 0) {
+            if (logger.isDebugEnabled())
+                logger
+                .debug("Message consumption has caught up so okay to turn off throttling of messages on the subscribe channel for {}",
+                       topicSubscriber);
+            subscribeChannel.setReadable(true);
+        }
+    }
+
+    @Override
+    public void startDelivery(final TopicSubscriber topicSubscriber,
+                              MessageHandler messageHandler)
+    throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Start delivering message for {} using message handler {}",
+                         va(topicSubscriber, messageHandler));
+        }
+        if (!hasSubscription(topicSubscriber)) {
+            throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
+        }
+        synchronized (this) {
+            if (null != this.messageHandler) {
+                    throw new AlreadyStartDeliveryException("A message handler " + this.messageHandler
+                        + " has been started for " + topicSubscriber);
+            }
+            // instantiante a message handler
+            if (null != messageHandler &&
+                messageHandler instanceof FilterableMessageHandler) {
+                FilterableMessageHandler filterMsgHandler =
+                    (FilterableMessageHandler) messageHandler;
+                // pass subscription preferences to message filter
+                if (null == preferences) {
+                    // no preferences means talking to an old version hub server
+                    logger.warn("Start delivering messages with filter but no subscription "
+                              + "preferences found. It might due to talking to an old version"
+                              + " hub server.");
+                    // use the original message handler.
+                    messageHandler = filterMsgHandler.getMessageHandler();
+                } else {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Start delivering messages with filter on {}, preferences: {}",
+                                     va(topicSubscriber,
+                                        SubscriptionStateUtils.toString(preferences)));
+                    }
+                    ClientMessageFilter msgFilter = filterMsgHandler.getMessageFilter();
+                    msgFilter.setSubscriptionPreferences(
+                        topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(),
+                        preferences);
+                }
+            }
+
+            this.messageHandler = messageHandler;
+            // Once the MessageHandler is registered, see if we have any queued up
+            // subscription messages sent to us already from the server. If so,
+            // consume those first. Do this only if the MessageHandler registered is
+            // not null (since that would be the HedwigSubscriber.stopDelivery
+            // call).
+            if (messageHandler != null && subscribeMsgQueue != null && subscribeMsgQueue.size() > 0) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Consuming {} queued up messages for {}",
+                                 va(subscribeMsgQueue.size(), topicSubscriber));
+                }
+                for (Message message : subscribeMsgQueue) {
+                    asyncMessageDeliver(topicSubscriber, message);
+                }
+                // Now we can remove the queued up messages since they are all
+                // consumed.
+                subscribeMsgQueue.clear();
+            }
+            // Now make the TopicSubscriber Channel readable (it is set to not be
+            // readable when the initial subscription is done). Note that this is an
+            // asynchronous call. If this fails (not likely), the futureListener
+            // will just log an error message for now.
+            ChannelFuture future = subscribeChannel.setReadable(true);
+            future.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (!future.isSuccess()) {
+                        logger.error("Unable to make subscriber Channel readable in startDelivery call for {}",
+                                     topicSubscriber);
+                    }
+                }
+            });
+        }
+    }
+
+    @Override
+    public void stopDelivery(final TopicSubscriber topicSubscriber)
+    throws ClientNotSubscribedException {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Stop delivering messages for {}", topicSubscriber);
+        }
+
+        if (!hasSubscription(topicSubscriber)) {
+            throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
+        }
+
+        synchronized (this) {
+            this.messageHandler = null;
+            // Now make the TopicSubscriber channel not-readable. This will buffer
+            // up messages if any are sent from the server. Note that this is an
+            // asynchronous call. If this fails (not likely), the futureListener
+            // will just log an error message for now.
+            ChannelFuture future = subscribeChannel.setReadable(false);
+            future.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (!future.isSuccess()) {
+                        logger.error("Unable to make subscriber Channel not readable in stopDelivery call for {}",
+                                     topicSubscriber);
+                    }
+                }
+            });
+        }
+    }
+
+    @Override
+    public boolean hasSubscription(TopicSubscriber topicSubscriber) {
+        if (null == origTopicSubscriber) {
+            return false;
+        } else {
+            return origTopicSubscriber.equals(topicSubscriber);
+        }
+    }
+
+    @Override
+    public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
+                                       final Callback<ResponseBody> callback,
+                                       final Object context) {
+        // nothing to do just clear status
+        // channel manager takes the responsibility to close the channel
+        callback.operationFinished(context, (ResponseBody)null);
+    }
+
+    @Override
+    public synchronized  void consume(final TopicSubscriber topicSubscriber,
+                                      final MessageSeqId messageSeqId) {
+        PubSubRequest.Builder pubsubRequestBuilder =
+            NetUtils.buildConsumeRequest(sChannelManager.nextTxnId(),
+                                         topicSubscriber, messageSeqId);  
+
+        // For Consume requests, we will send them from the client in a fire and
+        // forget manner. We are not expecting the server to send back an ack
+        // response so no need to register this in the ResponseHandler. There
+        // are no callbacks to invoke since this isn't a client initiated
+        // action. Instead, just have a future listener that will log an error
+        // message if there was a problem writing the consume request.
+        if (logger.isDebugEnabled()) {
+            logger.debug("Writing a Consume request to host: {} with messageSeqId: {} for {}",
+                         va(NetUtils.getHostFromChannel(subscribeChannel),
+                            messageSeqId, topicSubscriber));
+        }
+        ChannelFuture future = subscribeChannel.write(pubsubRequestBuilder.build());
+        future.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                if (!future.isSuccess()) {
+                    logger.error("Error writing a Consume request to host: {} with messageSeqId: {} for {}",
+                                 va(NetUtils.getHostFromChannel(subscribeChannel),
+                                    messageSeqId, topicSubscriber));
+                }
+            }
+        });
+    }
+
+    @Override
+    public void onChannelDisconnected(InetSocketAddress host,
+                                      Channel channel) {
+        sChannelManager.clearHostForTopic(origTopicSubscriber.getTopic(), host);
+        // clear subscription status
+        sChannelManager.asyncCloseSubscription(origTopicSubscriber, new Callback<ResponseBody>() {
+
+            @Override
+            public void operationFinished(Object ctx, ResponseBody result) {
+                finish();
+            }
+
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                finish();
+            }
+
+            private void finish() {
+                // Since the connection to the server host that was responsible
+                // for the topic died, we are not sure about the state of that
+                // server. Resend the original subscribe request data to the default
+                // server host/VIP. Also clear out all of the servers we've
+                // contacted or attempted to from this request as we are starting a
+                // "fresh" subscribe request.
+                origSubData.clearServersList();
+                // do resubscribe if the subscription enables it
+                if (origSubData.options.getEnableResubscribe()) {
+                    // Set a new type of VoidCallback for this async call. We need this
+                    // hook so after the subscribe reconnect has completed, delivery for
+                    // that topic subscriber should also be restarted (if it was that
+                    // case before the channel disconnect).
+                    final long retryWaitTime = cfg.getSubscribeReconnectRetryWaitTime();
+                    SubscribeReconnectCallback reconnectCb =
+                        new SubscribeReconnectCallback(origTopicSubscriber,
+                                                       origSubData,
+                                                       sChannelManager,
+                                                       retryWaitTime);
+                    origSubData.setCallback(reconnectCb);
+                    origSubData.context = null;
+                    // Clear the shouldClaim flag
+                    origSubData.shouldClaim = false;
+                    logger.debug("Reconnect {}'s subscription channel with origSubData {}",
+                                 origTopicSubscriber, origSubData);
+                    sChannelManager.submitOpToDefaultServer(origSubData);
+                } else {
+                    logger.info("Subscription channel for ({}) is disconnected.",
+                                origTopicSubscriber);
+                    sChannelManager.getSubscriptionEventEmitter().emitSubscriptionEvent(
+                        origSubData.topic, origSubData.subscriberId, SubscriptionEvent.TOPIC_MOVED);
+                }
+            }
+        }, null);
+    }
+
+}

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl.simple;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.handlers.AbstractResponseHandler;
+import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
+import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
+import org.apache.hedwig.client.netty.impl.HChannelHandler;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+
+public class SimpleSubscriptionChannelPipelineFactory extends ClientChannelPipelineFactory {
+
+    public SimpleSubscriptionChannelPipelineFactory(ClientConfiguration cfg,
+                                                    SimpleHChannelManager channelManager) {
+        super(cfg, channelManager);
+    }
+
+    @Override
+    protected Map<OperationType, AbstractResponseHandler> createResponseHandlers() {
+        Map<OperationType, AbstractResponseHandler> handlers =
+            new HashMap<OperationType, AbstractResponseHandler>();
+        handlers.put(OperationType.SUBSCRIBE,
+                     new SimpleSubscribeResponseHandler(cfg, channelManager));
+        return handlers;
+    }
+
+}

Copied: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SubscribeReconnectCallback.java (from r1390401, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SubscribeReconnectCallback.java?p2=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SubscribeReconnectCallback.java&p1=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java&r1=1390401&r2=1390777&rev=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SubscribeReconnectCallback.java Wed Sep 26 23:52:18 2012
@@ -15,24 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hedwig.client.handlers;
+package org.apache.hedwig.client.netty.impl.simple;
 
-import java.util.TimerTask;
-
-import org.apache.hedwig.protocol.PubSubProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
 import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.netty.HedwigClientImpl;
-import org.apache.hedwig.client.netty.HedwigSubscriber;
 import org.apache.hedwig.exceptions.PubSubException;
-
 import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
 import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
 
 /**
  * This class is used when a Subscribe channel gets disconnected and we attempt
@@ -42,72 +37,60 @@ import org.apache.hedwig.util.Callback;
  * callback will be the hook for this.
  *
  */
-public class SubscribeReconnectCallback implements Callback<PubSubProtocol.ResponseBody> {
+class SubscribeReconnectCallback implements Callback<ResponseBody> {
 
     private static Logger logger = LoggerFactory.getLogger(SubscribeReconnectCallback.class);
 
     // Private member variables
+    private final TopicSubscriber origTopicSubscriber;
     private final PubSubData origSubData;
-    private final HedwigClientImpl client;
-    private final HedwigSubscriber sub;
-    private final ClientConfiguration cfg;
+    private final SimpleHChannelManager channelManager;
+    private final long retryWaitTime;
 
     // Constructor
-    public SubscribeReconnectCallback(PubSubData origSubData, HedwigClientImpl client) {
+    SubscribeReconnectCallback(TopicSubscriber origTopicSubscriber,
+                               PubSubData origSubData,
+                               SimpleHChannelManager channelManager,
+                               long retryWaitTime) {
+        this.origTopicSubscriber = origTopicSubscriber;
         this.origSubData = origSubData;
-        this.client = client;
-        this.sub = client.getSubscriber();
-        this.cfg = client.getConfiguration();
-    }
-
-    class SubscribeReconnectRetryTask extends TimerTask {
-        @Override
-        public void run() {
-            logger.debug("Retrying subscribe reconnect request for origSubData: {}", origSubData);
-            // Clear out all of the servers we've contacted or attempted to from
-            // this request.
-            origSubData.clearServersList();
-            client.doConnect(origSubData, cfg.getDefaultServerHost());
-        }
+        this.channelManager = channelManager;
+        this.retryWaitTime = retryWaitTime;
     }
 
-    public void operationFinished(Object ctx, PubSubProtocol.ResponseBody resultOfOperation) {
+    @Override
+    public void operationFinished(Object ctx, ResponseBody resultOfOperation) {
         logger.debug("Subscribe reconnect succeeded for origSubData: {}", origSubData);
         // Now we want to restart delivery for the subscription channel only
         // if delivery was started at the time the original subscribe channel
         // was disconnected.
         try {
-            sub.restartDelivery(origSubData.topic, origSubData.subscriberId);
+            channelManager.restartDelivery(origTopicSubscriber);
         } catch (ClientNotSubscribedException e) {
             // This exception should never be thrown here but just in case,
             // log an error and just keep retrying the subscribe request.
-            logger.error("Subscribe was successful but error starting delivery for topic: "
-                         + origSubData.topic.toStringUtf8() + ", subscriberId: "
-                         + origSubData.subscriberId.toStringUtf8(), e);
+            logger.error("Subscribe was successful but error starting delivery for {} : {}",
+                         va(origTopicSubscriber, e.getMessage()));
             retrySubscribeRequest();
         } catch (AlreadyStartDeliveryException asde) {
             // should not reach here
         }
     }
 
+    @Override
     public void operationFailed(Object ctx, PubSubException exception) {
         // If the subscribe reconnect fails, just keep retrying the subscribe
         // request. There isn't a way to flag to the application layer that
         // a topic subscription has failed. So instead, we'll just keep
         // retrying in the background until success.
-        logger.error("Subscribe reconnect failed with error: " + exception.getMessage());
+        logger.error("Subscribe reconnect failed with error: ", exception);
         retrySubscribeRequest();
     }
 
     private void retrySubscribeRequest() {
-        // If the client has stopped, there is no need to proceed with any
-        // callback logic here.
-        if (client.hasStopped())
-            return;
-
-        // Retry the subscribe request but only after waiting for a
-        // preconfigured amount of time.
-        client.getClientTimer().schedule(new SubscribeReconnectRetryTask(),
-                                         client.getConfiguration().getSubscribeReconnectRetryWaitTime());
+        origSubData.clearServersList();
+        logger.debug("Reconnect subscription channel for {} in {} ms later.",
+                     va(origTopicSubscriber, retryWaitTime));
+        channelManager.submitOpAfterDelay(origSubData, retryWaitTime);
     }
 }

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/package-info.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/package-info.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/package-info.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/package-info.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * A Netty based Hedwig client implementation.
+ *
+ * <h3>Components</h3>
+ *
+ * The netty based implementation contains following components:
+ * <ul>
+ *   <li>{@link HChannel}: A interface wrapper of netty {@link org.jboss.netty.channel.Channel}
+ *       to submit hedwig's {@link org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest}s
+ *       to target host.</li>
+ *   <li>{@link HChanneHandler}: A wrapper of netty {@link org.jboss.netty.channel.ChannelHandler}
+ *       to handle events of its underlying netty channel, such as responses received, channel
+ *       disconnected, etc. A {@link HChannelHandler} is bound with a {@link HChannel}.</li>
+ *   <li>{@link HChannelManager}: A manager manages all established {@link HChannel}s.
+ *       It provides a clean interface for publisher/subscriber to send
+ *       {@link org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest}s</li>
+ * </ul>
+ *
+ * <h3>Main Flow</h3>
+ *
+ * <ul>
+ *   <li>{@link HedwigPublisher}/{@link HedwigSubscriber} delegates {@link HChannelManager}
+ *       to submit pub/sub requests.</li>
+ *   <li>{@link HChannelManager} find the owner hubs, establish a {@link HChannel} to hub servers
+ *       and send the requests to them.</li>
+ *   <li>{@link HChannelHandler} dispatches responses to target
+ *       {@link org.apache.hedwig.client.handlers.AbstractResponseHandler} to process.</li>
+ *   <li>{@link HChannelHandler} detects an underlying netty {@link org.jboss.netty.channel.Channel}
+ *       disconnected. It calles {@link HChannelManager} to clear cached {@link HChannel} that
+ *       it bound with. For non-subscritpion channels, it would fail all pending requests;
+ *       For subscription channels, it would fail all pending requests and retry to reconnect
+ *       those successful subscriptions.</li>
+ * </ul>
+ *
+ * <h3>HChannel</h3>
+ *
+ * Two kinds of {@link HChannel}s provided in current implementation. {@link HChannelImpl}
+ * provides the ability to multiplex pub/sub requests in an underlying netty
+ * {@link org.jboss.netty.channel.Channel}, while {@link DefaultServerChannel} provides the
+ * ability to establish a netty channel {@link org.jboss.netty.channel.Channel} for a pub/sub
+ * request. After the underlying netty channel is estabilished, it would be converted into
+ * a {@link HChannelImpl} by {@link HChannelManager#submitOpThruChannel(pubSubData, channel)}.
+ *
+ * Although {@link HChannelImpl} provides multiplexing ability, it still could be used for
+ * one-channel-per-subscription case, which just sent only one subscribe request thru the
+ * underlying channel.
+ *
+ * <h3>HChannelHandler</h3>
+ *
+ * {@link HChannelHandler} is generic netty {@link org.jboss.netty.channel.ChannelHandler},
+ * which handles events from the underlying channel. A <i>HChannelHandler</i> is bound with
+ * a {@link HChannel} as channel pipeplien when the underlying channel is established. It
+ * takes the responsibility of dispatching response to target response handler. For a
+ * non-subscription channel, it just handles <b>PUBLISH</b> and <b>UNSUBSCRIBE</b> responses.
+ * For a subscription channel, it handles <b>SUBSCRIBE</b> response. For consume requests,
+ * we treated them in a fire-and-forget way, so they are not need to be handled by any response
+ * handler.
+ *
+ * <h3>HChannelManager</h3>
+ *
+ * {@link HChannelManager} manages all outstanding connections to target hub servers for a client.
+ * Since a subscription channel acts quite different from a non-subscription channel, the basic
+ * implementation {@link AbstractHChannelManager} manages non-subscription channels and
+ * subscription channels in different channel sets. Currently hedwig client provides
+ * {@link SimpleHChannelManager} which manages subscription channels in one-channel-per-subscription
+ * way. In future, if we want to multiplex multiple subscriptions in one channel, we just need
+ * to provide an multiplexing version of {@link AbstractHChannelManager} which manages channels
+ * in multiplexing way, and a multiplexing version of {@link org.apache.hedwig.client.handlers.SubscribeResponseHandler}
+ * which handles multiple subscriptions in one channel.
+ */
+package org.apache.hedwig.client.netty;

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/VarArgs.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/VarArgs.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/VarArgs.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/VarArgs.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.util;
+
+public class VarArgs {
+
+    public static Object[] va(Object...args) {
+        return args;
+    }
+
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java Wed Sep 26 23:52:18 2012
@@ -348,6 +348,33 @@ public class TestPubSubClient extends Pu
     }
 
     @Test
+    public void testStartDeliveryAfterCloseSub() throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("testStartDeliveryAfterCloseSub");
+        ByteString subid = ByteString.copyFromUtf8("mysubid");
+        subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+
+        // Start delivery for the subscriber
+        subscriber.startDelivery(topic, subid, new TestMessageHandler());
+
+        // Now publish some messages for the topic to be consumed by the
+        // subscriber.
+        publisher.publish(topic, Message.newBuilder()
+                                .setBody(ByteString.copyFromUtf8("Message #1")).build());
+        assertTrue(consumeQueue.take());
+
+        // Close subscriber for the subscriber
+        subscriber.closeSubscription(topic, subid);
+
+        // subscribe again
+        subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+        subscriber.startDelivery(topic, subid, new TestMessageHandler());
+
+        publisher.publish(topic, Message.newBuilder()
+                                .setBody(ByteString.copyFromUtf8("Message #2")).build());
+        assertTrue(consumeQueue.take());
+    }
+
+    @Test
     public void testSubscribeAndConsume() throws Exception {
         ByteString topic = ByteString.copyFromUtf8("myConsumeTopic");
         ByteString subscriberId = ByteString.copyFromUtf8("1");