You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/12/07 12:21:54 UTC
svn commit: r1418280 [1/2] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/
hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/
hedwig-client/src/main/java/org/apache/hedwig/cl...
Author: ivank
Date: Fri Dec 7 11:21:52 2012
New Revision: 1418280
URL: http://svn.apache.org/viewvc?rev=1418280&view=rev
Log:
BOOKKEEPER-453: Extract commonality from MultiplexSubscribeResponseHandler and SimpleSubscribeResponseHandler and put into an abstract class (sijie via ivank)
Added:
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java
- copied, changed from r1417190, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/ResubscribeCallback.java
Removed:
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/ResubscribeCallback.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SubscribeReconnectCallback.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1418280&r1=1418279&r2=1418280&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Dec 7 11:21:52 2012
@@ -284,6 +284,8 @@ Trunk (unreleased changes)
BOOKKEEPER-370: implement multiplexing cpp client. (sijie via ivank)
+ BOOKKEEPER-453: Extract commonality from MultiplexSubscribeResponseHandler and SimpleSubscribeResponseHandler and put into an abstract class (sijie via ivank)
+
Release 4.1.0 - 2012-06-07
Non-backward compatible changes:
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java?rev=1418280&r1=1418279&r2=1418280&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java Fri Dec 7 11:21:52 2012
@@ -573,6 +573,9 @@ public abstract class AbstractHChannelMa
}
}
+ protected abstract void restartDelivery(TopicSubscriber topicSubscriber)
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException;
+
/**
* Chekout the pub/sub requests on subscription channels.
*/
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java?rev=1418280&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java Fri Dec 7 11:21:52 2012
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.protobuf.ByteString;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.MessageConsumeData;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+import org.apache.hedwig.client.netty.HChannelManager;
+import org.apache.hedwig.client.netty.HChannel;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.client.netty.FilterableMessageHandler;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException;
+import org.apache.hedwig.filter.ClientMessageFilter;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
+import org.apache.hedwig.protoextensions.MessageIdUtils;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.SubscriptionListener;
+import static org.apache.hedwig.util.VarArgs.va;
+
+public abstract class AbstractSubscribeResponseHandler extends SubscribeResponseHandler {
+
+ private static Logger logger =
+ LoggerFactory.getLogger(AbstractSubscribeResponseHandler.class);
+
+ protected final ReentrantReadWriteLock disconnectLock =
+ new ReentrantReadWriteLock();
+
+ protected final ConcurrentMap<TopicSubscriber, ActiveSubscriber> subscriptions
+ = new ConcurrentHashMap<TopicSubscriber, ActiveSubscriber>();
+ protected final AbstractHChannelManager aChannelManager;
+
+ protected AbstractSubscribeResponseHandler(ClientConfiguration cfg,
+ HChannelManager channelManager) {
+ super(cfg, channelManager);
+ this.aChannelManager = (AbstractHChannelManager) channelManager;
+ }
+
+ protected HChannelManager getHChannelManager() {
+ return this.channelManager;
+ }
+
+ protected ClientConfiguration getConfiguration() {
+ return cfg;
+ }
+
+ protected ActiveSubscriber getActiveSubscriber(TopicSubscriber ts) {
+ return subscriptions.get(ts);
+ }
+
+ protected ActiveSubscriber createActiveSubscriber(
+ ClientConfiguration cfg, AbstractHChannelManager channelManager,
+ TopicSubscriber ts, PubSubData op, SubscriptionPreferences preferences,
+ Channel channel) {
+ return new ActiveSubscriber(cfg, channelManager, ts, op, preferences, channel);
+ }
+
+ @Override
+ public void handleResponse(PubSubResponse response, PubSubData pubSubData,
+ Channel channel) throws Exception {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Handling a Subscribe response: {}, pubSubData: {}, host: {}.",
+ va(response, pubSubData, NetUtils.getHostFromChannel(channel)));
+ }
+ switch (response.getStatusCode()) {
+ case SUCCESS:
+ TopicSubscriber ts = new TopicSubscriber(pubSubData.topic,
+ pubSubData.subscriberId);
+ SubscriptionPreferences preferences = null;
+ if (response.hasResponseBody()) {
+ ResponseBody respBody = response.getResponseBody();
+ if (respBody.hasSubscribeResponse()) {
+ SubscribeResponse resp = respBody.getSubscribeResponse();
+ if (resp.hasPreferences()) {
+ preferences = resp.getPreferences();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Receive subscription preferences for {} : {}",
+ va(ts,
+ SubscriptionStateUtils.toString(preferences)));
+ }
+ }
+ }
+ }
+
+ ActiveSubscriber ss = createActiveSubscriber(cfg, aChannelManager, ts,
+ pubSubData, preferences, channel);
+ boolean success = false;
+ // Store the Subscribe state
+ disconnectLock.readLock().lock();
+ try {
+ ActiveSubscriber oldSS = subscriptions.putIfAbsent(ts, ss);
+ if (null != oldSS) {
+ logger.warn("Subscribe {} has existed in channel {}.",
+ va(ts, channel));
+ success = false;
+ } else {
+ logger.debug("Succeed to add subscription {} in channel {}.",
+ va(ts, channel));
+ success = true;
+ }
+ } finally {
+ disconnectLock.readLock().unlock();
+ }
+ if (success) {
+ handleSuccessResponse(ts, ss, channel);
+ // Response was success so invoke the callback's operationFinished
+ // method.
+ pubSubData.getCallback().operationFinished(pubSubData.context, null);
+ } else {
+ ClientAlreadySubscribedException exception =
+ new ClientAlreadySubscribedException("Client is already subscribed for " + ts);
+ pubSubData.getCallback().operationFailed(pubSubData.context, exception);
+ }
+ break;
+ case CLIENT_ALREADY_SUBSCRIBED:
+ // For Subscribe requests, the server says that the client is
+ // already subscribed to it.
+ pubSubData.getCallback().operationFailed(pubSubData.context,
+ new ClientAlreadySubscribedException("Client is already subscribed for topic: "
+ + pubSubData.topic.toStringUtf8() + ", subscriberId: "
+ + pubSubData.subscriberId.toStringUtf8()));
+ break;
+ case SERVICE_DOWN:
+ // Response was service down failure so just invoke the callback's
+ // operationFailed method.
+ pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
+ "Server responded with a SERVICE_DOWN status"));
+ break;
+ case NOT_RESPONSIBLE_FOR_TOPIC:
+ // Redirect response so we'll need to repost the original Subscribe
+ // Request
+ handleRedirectResponse(response, pubSubData, channel);
+ break;
+ default:
+ // Consider all other status codes as errors, operation failed
+ // cases.
+ logger.error("Unexpected error response from server for PubSubResponse: " + response);
+ pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
+ "Server responded with a status code of: "
+ + response.getStatusCode()));
+ break;
+ }
+ }
+
+ /**
+ * Handle success response for a specific TopicSubscriber <code>ts</code>. The method
+ * is triggered after subscribed successfully.
+ *
+ * @param ts
+ * Topic Subscriber.
+ * @param ss
+ * Active Subscriber Object handle subscription actions for the subscriber.
+ * @param channel
+ * Subscription Channel.
+ */
+ protected abstract void handleSuccessResponse(TopicSubscriber ts, ActiveSubscriber as,
+ Channel channel);
+
+ @Override
+ public void handleSubscribeMessage(PubSubResponse response) {
+ Message message = response.getMessage();
+ TopicSubscriber ts = new TopicSubscriber(response.getTopic(),
+ response.getSubscriberId());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Handling a Subscribe message in response: {}, {}",
+ va(response, ts));
+ }
+ ActiveSubscriber ss = getActiveSubscriber(ts);
+ if (null == ss) {
+ logger.error("Subscriber {} is not found receiving its message {}.",
+ va(ts, MessageIdUtils.msgIdToReadableString(message.getMsgId())));
+ return;
+ }
+ ss.handleMessage(message);
+ }
+
+ @Override
+ protected void asyncMessageDeliver(TopicSubscriber topicSubscriber,
+ Message message) {
+ ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
+ if (null == ss) {
+ logger.error("Subscriber {} is not found delivering its message {}.",
+ va(topicSubscriber,
+ MessageIdUtils.msgIdToReadableString(message.getMsgId())));
+ return;
+ }
+ ss.asyncMessageDeliver(message);
+ }
+
+ @Override
+ protected void messageConsumed(TopicSubscriber topicSubscriber,
+ Message message) {
+ ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
+ if (null == ss) {
+ logger.warn("Subscriber {} is not found consumed its message {}.",
+ va(topicSubscriber,
+ MessageIdUtils.msgIdToReadableString(message.getMsgId())));
+ return;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Message has been successfully consumed by the client app : {}, {}",
+ va(message, topicSubscriber));
+ }
+ ss.messageConsumed(message);
+ }
+
+ @Override
+ public void handleSubscriptionEvent(ByteString topic, ByteString subscriberId,
+ SubscriptionEvent event) {
+ TopicSubscriber ts = new TopicSubscriber(topic, subscriberId);
+ ActiveSubscriber ss = getActiveSubscriber(ts);
+ if (null == ss) {
+ logger.warn("No subscription {} found receiving subscription event {}.",
+ va(ts, event));
+ return;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received subscription event {} for ({}).",
+ va(event, ts));
+ }
+ processSubscriptionEvent(ss, event);
+ }
+
+ protected void processSubscriptionEvent(ActiveSubscriber as, SubscriptionEvent event) {
+ switch (event) {
+ // for all cases we need to resubscribe for the subscription
+ case TOPIC_MOVED:
+ case SUBSCRIPTION_FORCED_CLOSED:
+ resubscribeIfNecessary(as, event);
+ break;
+ default:
+ logger.error("Receive unknown subscription event {} for {}.",
+ va(event, as.getTopicSubscriber()));
+ }
+ }
+
+ @Override
+ public void startDelivery(final TopicSubscriber topicSubscriber,
+ MessageHandler messageHandler)
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+ ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
+ if (null == ss) {
+ throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Start delivering message for {} using message handler {}",
+ va(topicSubscriber, messageHandler));
+ }
+ ss.startDelivery(messageHandler);
+ }
+
+ @Override
+ public void stopDelivery(final TopicSubscriber topicSubscriber)
+ throws ClientNotSubscribedException {
+ ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
+ if (null == ss) {
+ throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stop delivering messages for {}", topicSubscriber);
+ }
+ ss.stopDelivery();
+ }
+
+ @Override
+ public boolean hasSubscription(TopicSubscriber topicSubscriber) {
+ return subscriptions.containsKey(topicSubscriber);
+ }
+
+ @Override
+ public void consume(final TopicSubscriber topicSubscriber,
+ final MessageSeqId messageSeqId) {
+ ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
+ if (null == ss) {
+ logger.warn("Subscriber {} is not found consuming message {}.",
+ va(topicSubscriber,
+ MessageIdUtils.msgIdToReadableString(messageSeqId)));
+ return;
+ }
+ ss.consume(messageSeqId);
+ }
+
+ @Override
+ public void onChannelDisconnected(InetSocketAddress host, Channel channel) {
+ disconnectLock.writeLock().lock();
+ try {
+ onDisconnect(host);
+ } finally {
+ disconnectLock.writeLock().unlock();
+ }
+ }
+
+ private void onDisconnect(InetSocketAddress host) {
+ for (ActiveSubscriber ss : subscriptions.values()) {
+ onDisconnect(ss, host);
+ }
+ }
+
+ private void onDisconnect(ActiveSubscriber ss, InetSocketAddress host) {
+ logger.info("Subscription channel for ({}) is disconnected.", ss);
+ resubscribeIfNecessary(ss, SubscriptionEvent.TOPIC_MOVED);
+ }
+
+ protected boolean removeSubscription(TopicSubscriber ts, ActiveSubscriber ss) {
+ return subscriptions.remove(ts, ss);
+ }
+
+ protected void resubscribeIfNecessary(ActiveSubscriber ss, SubscriptionEvent event) {
+ // if subscriber has been changed, we don't need to resubscribe
+ if (!removeSubscription(ss.getTopicSubscriber(), ss)) {
+ return;
+ }
+ ss.resubscribeIfNecessary(event);
+ }
+
+}
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java?rev=1418280&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java Fri Dec 7 11:21:52 2012
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import com.google.protobuf.ByteString;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.MessageConsumeData;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.client.netty.FilterableMessageHandler;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.filter.ClientMessageFilter;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
+import org.apache.hedwig.protoextensions.MessageIdUtils;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import static org.apache.hedwig.util.VarArgs.va;
+
+/**
+ * an active subscriber handles subscription actions in a channel.
+ */
+public class ActiveSubscriber {
+
+ private static final Logger logger = LoggerFactory.getLogger(ActiveSubscriber.class);
+
+ protected final ClientConfiguration cfg;
+ protected final AbstractHChannelManager channelManager;
+
+ // Subscriber related variables
+ protected final TopicSubscriber topicSubscriber;
+ protected final PubSubData op;
+ protected final SubscriptionPreferences preferences;
+
+ // the underlying netty channel to send request
+ protected final Channel channel;
+
+ // Counter for the number of consumed messages so far to buffer up before we
+ // send the Consume message back to the server along with the last/largest
+ // message seq ID seen so far in that batch.
+ private int numConsumedMessagesInBuffer = 0;
+ private MessageSeqId lastMessageSeqId = null;
+
+ // Message Handler
+ private MessageHandler msgHandler = null;
+
+ // Queue used for subscribes when the MessageHandler hasn't been registered
+ // yet but we've already received subscription messages from the server.
+ // This will be lazily created as needed.
+ private final Queue<Message> msgQueue = new LinkedList<Message>();
+
+ /**
+ * Construct an active subscriber instance.
+ *
+ * @param cfg
+ * Client configuration object.
+ * @param channelManager
+ * Channel manager instance.
+ * @param ts
+ * Topic subscriber.
+ * @param op
+ * Pub/Sub request.
+ * @param preferences
+ * Subscription preferences for the subscriber.
+ * @param channel
+ * Netty channel the subscriber lived.
+ */
+ public ActiveSubscriber(ClientConfiguration cfg,
+ AbstractHChannelManager channelManager,
+ TopicSubscriber ts, PubSubData op,
+ SubscriptionPreferences preferences,
+ Channel channel) {
+ this.cfg = cfg;
+ this.channelManager = channelManager;
+ this.topicSubscriber = ts;
+ this.op = op;
+ this.preferences = preferences;
+ this.channel = channel;
+ }
+
+ /**
+ * @return pub/sub request for the subscription.
+ */
+ public PubSubData getPubSubData() {
+ return this.op;
+ }
+
+ /**
+ * @return topic subscriber id for the active subscriber.
+ */
+ public TopicSubscriber getTopicSubscriber() {
+ return this.topicSubscriber;
+ }
+
+ /**
+ * Start delivering messages using given message handler.
+ *
+ * @param messageHandler
+ * Message handler to deliver messages
+ * @throws AlreadyStartDeliveryException if someone already started delivery.
+ * @throws ClientNotSubscribedException when start delivery before subscribe.
+ */
+ public synchronized void startDelivery(MessageHandler messageHandler)
+ throws AlreadyStartDeliveryException, ClientNotSubscribedException {
+ if (null != this.msgHandler) {
+ throw new AlreadyStartDeliveryException("A message handler " + msgHandler
+ + " has been started for " + topicSubscriber);
+ }
+ if (null != messageHandler && messageHandler instanceof FilterableMessageHandler) {
+ FilterableMessageHandler filterMsgHandler =
+ (FilterableMessageHandler) messageHandler;
+ if (filterMsgHandler.hasMessageFilter()) {
+ if (null == preferences) {
+ // no preferences means talking to an old version hub server
+ logger.warn("Start delivering messages with filter but no subscription "
+ + "preferences found. It might due to talking to an old version"
+ + " hub server.");
+ // use the original message handler.
+ messageHandler = filterMsgHandler.getMessageHandler();
+ } else {
+ // pass subscription preferences to message filter
+ if (logger.isDebugEnabled()) {
+ logger.debug("Start delivering messages with filter on {}, preferences: {}",
+ va(topicSubscriber,
+ SubscriptionStateUtils.toString(preferences)));
+ }
+ ClientMessageFilter msgFilter = filterMsgHandler.getMessageFilter();
+ msgFilter.setSubscriptionPreferences(topicSubscriber.getTopic(),
+ topicSubscriber.getSubscriberId(),
+ preferences);
+ }
+ }
+ }
+
+ this.msgHandler = messageHandler;
+ // Once the MessageHandler is registered, see if we have any queued up
+ // subscription messages sent to us already from the server. If so,
+ // consume those first. Do this only if the MessageHandler registered is
+ // not null (since that would be the HedwigSubscriber.stopDelivery
+ // call).
+ if (null == msgHandler) {
+ return;
+ }
+ if (msgQueue.size() > 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Consuming {} queued up messages for {}",
+ va(msgQueue.size(), topicSubscriber));
+ }
+ for (Message message : msgQueue) {
+ asyncMessageDeliver(message);
+ }
+ // Now we can remove the queued up messages since they are all
+ // consumed.
+ msgQueue.clear();
+ }
+ }
+
+ /**
+ * Stop delivering messages to the subscriber.
+ */
+ public synchronized void stopDelivery() {
+ this.msgHandler = null;
+ }
+
+ /**
+ * Handle received message.
+ *
+ * @param message
+ * Received message.
+ */
+ public synchronized void handleMessage(Message message) {
+ if (null != msgHandler) {
+ asyncMessageDeliver(message);
+ } else {
+ // MessageHandler has not yet been registered so queue up these
+ // messages for the Topic Subscription. Make the initial lazy
+ // creation of the message queue thread safe just so we don't
+ // run into a race condition where two simultaneous threads process
+ // a received message and both try to create a new instance of
+ // the message queue. Performance overhead should be okay
+ // because the delivery of the topic has not even started yet
+ // so these messages are not consumed and just buffered up here.
+ if (logger.isDebugEnabled()) {
+ logger.debug("Message {} has arrived but no MessageHandler provided for {}"
+ + " yet so queueing up the message.",
+ va(MessageIdUtils.msgIdToReadableString(message.getMsgId()),
+ topicSubscriber));
+ }
+ msgQueue.add(message);
+ }
+ }
+
+ /**
+ * Deliver message to the client.
+ *
+ * @param message
+ * Message to deliver.
+ */
+ public synchronized void asyncMessageDeliver(Message message) {
+ if (null == msgHandler) {
+ logger.error("No message handler found to deliver message {} to {}.",
+ va(MessageIdUtils.msgIdToReadableString(message.getMsgId()),
+ topicSubscriber));
+ return;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Call the client app's MessageHandler asynchronously to deliver the message {} to {}",
+ va(message, topicSubscriber));
+ }
+ unsafeDeliverMessage(message);
+ }
+
+ /**
+ * Unsafe version to deliver message to a message handler.
+ * Caller need to handle synchronization issue.
+ *
+ * @param message
+ * Message to deliver.
+ */
+ protected void unsafeDeliverMessage(Message message) {
+ MessageConsumeData messageConsumeData =
+ new MessageConsumeData(topicSubscriber, message);
+ msgHandler.deliver(topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(),
+ message, channelManager.getConsumeCallback(),
+ messageConsumeData);
+ }
+
+ private synchronized boolean updateLastMessageSeqId(MessageSeqId seqId) {
+ if (null != lastMessageSeqId &&
+ seqId.getLocalComponent() <= lastMessageSeqId.getLocalComponent()) {
+ return false;
+ }
+ ++numConsumedMessagesInBuffer;
+ lastMessageSeqId = seqId;
+ if (numConsumedMessagesInBuffer >= cfg.getConsumedMessagesBufferSize()) {
+ numConsumedMessagesInBuffer = 0;
+ lastMessageSeqId = null;
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Consume a specific message.
+ *
+ * @param messageSeqId
+ * Message seq id.
+ */
+ public void consume(final MessageSeqId messageSeqId) {
+ PubSubRequest.Builder pubsubRequestBuilder =
+ NetUtils.buildConsumeRequest(channelManager.nextTxnId(),
+ topicSubscriber, messageSeqId);
+
+ // For Consume requests, we will send them from the client in a fire and
+ // forget manner. We are not expecting the server to send back an ack
+ // response so no need to register this in the ResponseHandler. There
+ // are no callbacks to invoke since this isn't a client initiated
+ // action. Instead, just have a future listener that will log an error
+ // message if there was a problem writing the consume request.
+ if (logger.isDebugEnabled()) {
+ logger.debug("Writing a Consume request to channel: {} with messageSeqId: {} for {}",
+ va(channel, messageSeqId, topicSubscriber));
+ }
+ ChannelFuture future = channel.write(pubsubRequestBuilder.build());
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.error("Error writing a Consume request to channel: {} with messageSeqId: {} for {}",
+ va(channel, messageSeqId, topicSubscriber));
+ }
+ }
+ });
+ }
+
+ /**
+ * Application acked to consume message.
+ *
+ * @param message
+ * Message consumed by application.
+ */
+ public void messageConsumed(Message message) {
+ // For consume response to server, there is a config param on how many
+ // messages to consume and buffer up before sending the consume request.
+ // We just need to keep a count of the number of messages consumed
+ // and the largest/latest msg ID seen so far in this batch. Messages
+ // should be delivered in order and without gaps. Do this only if
+ // auto-sending of consume messages is enabled.
+ if (cfg.isAutoSendConsumeMessageEnabled()) {
+ // Update these variables only if we are auto-sending consume
+ // messages to the server. Otherwise the onus is on the client app
+ // to call the Subscriber consume API to let the server know which
+ // messages it has successfully consumed.
+ if (updateLastMessageSeqId(message.getMsgId())) {
+ // Send the consume request and reset the consumed messages buffer
+ // variables. We will use the same Channel created from the
+ // subscribe request for the TopicSubscriber.
+ if (logger.isDebugEnabled()) {
+ logger.debug("Consume message {} when reaching consumed message buffer limit.",
+ message.getMsgId());
+ }
+ consume(message.getMsgId());
+ }
+ }
+ }
+
+ /**
+ * Resubscribe a subscriber if necessary.
+ *
+ * @param event
+ * Subscription Event.
+ */
+ public void resubscribeIfNecessary(SubscriptionEvent event) {
+ // clear topic ownership
+ if (SubscriptionEvent.TOPIC_MOVED == event) {
+ channelManager.clearHostForTopic(topicSubscriber.getTopic(),
+ NetUtils.getHostFromChannel(channel));
+ }
+ if (!op.options.getEnableResubscribe()) {
+ channelManager.getSubscriptionEventEmitter().emitSubscriptionEvent(
+ topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(), event);
+ return;
+ }
+ // Since the connection to the server host that was responsible
+ // for the topic died, we are not sure about the state of that
+ // server. Resend the original subscribe request data to the default
+ // server host/VIP. Also clear out all of the servers we've
+ // contacted or attempted to from this request as we are starting a
+ // "fresh" subscribe request.
+ op.clearServersList();
+ // Set a new type of VoidCallback for this async call. We need this
+ // hook so after the resubscribe has completed, delivery for
+ // that topic subscriber should also be restarted (if it was that
+ // case before the channel disconnect).
+ final long retryWaitTime = cfg.getSubscribeReconnectRetryWaitTime();
+ ResubscribeCallback resubscribeCb =
+ new ResubscribeCallback(topicSubscriber, op,
+ channelManager, retryWaitTime);
+ op.setCallback(resubscribeCb);
+ op.context = null;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Resubscribe {} with origSubData {}",
+ va(topicSubscriber, op));
+ }
+ // resubmit the request
+ channelManager.submitOp(op);
+ }
+}
Copied: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java (from r1417190, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/ResubscribeCallback.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java?p2=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java&p1=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/ResubscribeCallback.java&r1=1417190&r2=1418280&rev=1418280&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/ResubscribeCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java Fri Dec 7 11:21:52 2012
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hedwig.client.netty.impl.multiplex;
+package org.apache.hedwig.client.netty.impl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,13 +41,13 @@ class ResubscribeCallback implements Cal
// Private member variables
private final TopicSubscriber origTopicSubscriber;
private final PubSubData origSubData;
- private final MultiplexHChannelManager channelManager;
+ private final AbstractHChannelManager channelManager;
private final long retryWaitTime;
// Constructor
ResubscribeCallback(TopicSubscriber origTopicSubscriber,
PubSubData origSubData,
- MultiplexHChannelManager channelManager,
+ AbstractHChannelManager channelManager,
long retryWaitTime) {
this.origTopicSubscriber = origTopicSubscriber;
this.origSubData = origSubData;
@@ -90,6 +90,9 @@ class ResubscribeCallback implements Cal
}
private void retrySubscribeRequest() {
+ if (channelManager.isClosed()) {
+ return;
+ }
origSubData.clearServersList();
logger.debug("Resubmit subscribe request for {} in {} ms later.",
va(origTopicSubscriber, retryWaitTime));
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java?rev=1418280&r1=1418279&r2=1418280&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java Fri Dec 7 11:21:52 2012
@@ -194,6 +194,7 @@ public class MultiplexHChannelManager ex
startDelivery(topicSubscriber, messageHandler, false);
}
+ @Override
protected void restartDelivery(TopicSubscriber topicSubscriber)
throws ClientNotSubscribedException, AlreadyStartDeliveryException {
startDelivery(topicSubscriber, null, true);
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java?rev=1418280&r1=1418279&r2=1418280&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java Fri Dec 7 11:21:52 2012
@@ -18,301 +18,34 @@
package org.apache.hedwig.client.netty.impl.multiplex;
import java.net.InetSocketAddress;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.MessageConsumeData;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
import org.apache.hedwig.client.netty.HChannelManager;
import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.client.netty.FilterableMessageHandler;
+import org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler;
+import org.apache.hedwig.client.netty.impl.ActiveSubscriber;
import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException;
-import org.apache.hedwig.filter.ClientMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protoextensions.MessageIdUtils;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.SubscriptionListener;
import static org.apache.hedwig.util.VarArgs.va;
-public class MultiplexSubscribeResponseHandler extends SubscribeResponseHandler {
+public class MultiplexSubscribeResponseHandler extends AbstractSubscribeResponseHandler {
private static Logger logger =
LoggerFactory.getLogger(MultiplexSubscribeResponseHandler.class);
- class ActiveSubscriber implements SubscriptionListener {
- private final TopicSubscriber topicSubscriber;
- private final PubSubData op;
- private final SubscriptionPreferences preferences;
-
- // the underlying netty channel to send request
- private final Channel channel;
-
- // Counter for the number of consumed messages so far to buffer up before we
- // send the Consume message back to the server along with the last/largest
- // message seq ID seen so far in that batch.
- private int numConsumedMessagesInBuffer = 0;
- private MessageSeqId lastMessageSeqId;
-
- // Message Handler
- private MessageHandler msgHandler;
-
- // Queue used for subscribes when the MessageHandler hasn't been registered
- // yet but we've already received subscription messages from the server.
- // This will be lazily created as needed.
- private Queue<Message> msgQueue = new LinkedList<Message>();
-
- ActiveSubscriber(TopicSubscriber ts, PubSubData op,
- SubscriptionPreferences preferences,
- Channel channel) {
- this.topicSubscriber = ts;
- this.op = op;
- this.preferences = preferences;
- this.channel = channel;
- }
-
- PubSubData getPubSubData() {
- return this.op;
- }
-
- TopicSubscriber getTopicSubscriber() {
- return this.topicSubscriber;
- }
-
- synchronized boolean updateLastMessageSeqId(MessageSeqId seqId) {
- if (null != lastMessageSeqId &&
- seqId.getLocalComponent() <= lastMessageSeqId.getLocalComponent()) {
- return false;
- }
- ++numConsumedMessagesInBuffer;
- lastMessageSeqId = seqId;
- if (numConsumedMessagesInBuffer >= cfg.getConsumedMessagesBufferSize()) {
- numConsumedMessagesInBuffer = 0;
- lastMessageSeqId = null;
- return true;
- }
- return false;
- }
-
- synchronized void startDelivery(MessageHandler messageHandler)
- throws AlreadyStartDeliveryException, ClientNotSubscribedException {
- if (null != this.msgHandler) {
- throw new AlreadyStartDeliveryException("A message handler " + msgHandler
- + " has been started for " + topicSubscriber);
- }
- if (null != messageHandler && messageHandler instanceof FilterableMessageHandler) {
- FilterableMessageHandler filterMsgHandler =
- (FilterableMessageHandler) messageHandler;
- if (filterMsgHandler.hasMessageFilter()) {
- if (null == preferences) {
- // no preferences means talking to an old version hub server
- logger.warn("Start delivering messages with filter but no subscription "
- + "preferences found. It might due to talking to an old version"
- + " hub server.");
- // use the original message handler.
- messageHandler = filterMsgHandler.getMessageHandler();
- } else {
- // pass subscription preferences to message filter
- if (logger.isDebugEnabled()) {
- logger.debug("Start delivering messages with filter on {}, preferences: {}",
- va(topicSubscriber,
- SubscriptionStateUtils.toString(preferences)));
- }
- ClientMessageFilter msgFilter = filterMsgHandler.getMessageFilter();
- msgFilter.setSubscriptionPreferences(topicSubscriber.getTopic(),
- topicSubscriber.getSubscriberId(),
- preferences);
- }
- }
- }
-
- this.msgHandler = messageHandler;
- // Once the MessageHandler is registered, see if we have any queued up
- // subscription messages sent to us already from the server. If so,
- // consume those first. Do this only if the MessageHandler registered is
- // not null (since that would be the HedwigSubscriber.stopDelivery
- // call).
- if (null == msgHandler) {
- return;
- }
- if (msgQueue.size() > 0) {
- if (logger.isDebugEnabled()) {
- logger.debug("Consuming {} queued up messages for {}",
- va(msgQueue.size(), topicSubscriber));
- }
- for (Message message : msgQueue) {
- asyncMessageDeliver(message);
- }
- // Now we can remove the queued up messages since they are all
- // consumed.
- msgQueue.clear();
- }
- }
-
- synchronized void stopDelivery() {
- this.msgHandler = null;
- }
-
- synchronized void handleMessage(Message message) {
- if (null != msgHandler) {
- asyncMessageDeliver(message);
- } else {
- // MessageHandler has not yet been registered so queue up these
- // messages for the Topic Subscription. Make the initial lazy
- // creation of the message queue thread safe just so we don't
- // run into a race condition where two simultaneous threads process
- // a received message and both try to create a new instance of
- // the message queue. Performance overhead should be okay
- // because the delivery of the topic has not even started yet
- // so these messages are not consumed and just buffered up here.
- if (logger.isDebugEnabled()) {
- logger.debug("Message {} has arrived but no MessageHandler provided for {}"
- + " yet so queueing up the message.",
- va(MessageIdUtils.msgIdToReadableString(message.getMsgId()),
- topicSubscriber));
- }
- msgQueue.add(message);
- }
- }
-
- synchronized void asyncMessageDeliver(Message message) {
- if (null == msgHandler) {
- logger.error("No message handler found to deliver message {} to {}.",
- va(MessageIdUtils.msgIdToReadableString(message.getMsgId()),
- topicSubscriber));
- return;
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Call the client app's MessageHandler asynchronously to deliver the message {} to {}",
- va(message, topicSubscriber));
- }
- MessageConsumeData messageConsumeData =
- new MessageConsumeData(topicSubscriber, message);
- msgHandler.deliver(topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(),
- message, sChannelManager.getConsumeCallback(),
- messageConsumeData);
- }
-
- void consume(final MessageSeqId messageSeqId) {
- PubSubRequest.Builder pubsubRequestBuilder =
- NetUtils.buildConsumeRequest(sChannelManager.nextTxnId(),
- topicSubscriber, messageSeqId);
-
- // For Consume requests, we will send them from the client in a fire and
- // forget manner. We are not expecting the server to send back an ack
- // response so no need to register this in the ResponseHandler. There
- // are no callbacks to invoke since this isn't a client initiated
- // action. Instead, just have a future listener that will log an error
- // message if there was a problem writing the consume request.
- if (logger.isDebugEnabled()) {
- logger.debug("Writing a Consume request to host: {} with messageSeqId: {} for {}",
- va(NetUtils.getHostFromChannel(channel),
- messageSeqId, topicSubscriber));
- }
- ChannelFuture future = channel.write(pubsubRequestBuilder.build());
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- logger.error("Error writing a Consume request to host: {} with messageSeqId: {} for {}",
- va(host, messageSeqId, topicSubscriber));
- }
- }
- });
- }
-
- @Override
- public void processEvent(ByteString topic, ByteString subscriberId,
- SubscriptionEvent event) {
- switch (event) {
- // for all cases we need to resubscribe for the subscription
- case TOPIC_MOVED:
- sChannelManager.clearHostForTopic(topic, NetUtils.getHostFromChannel(channel));
- resubscribeIfNecessary(event);
- break;
- case SUBSCRIPTION_FORCED_CLOSED:
- resubscribeIfNecessary(event);
- break;
- default:
- logger.error("Receive unknown subscription event {} for {}.",
- va(event, topicSubscriber));
- }
- }
-
- private void resubscribeIfNecessary(SubscriptionEvent event) {
- // if subscriber has been changed, we don't need to resubscribe
- if (!subscriptions.remove(topicSubscriber, this)) {
- return;
- }
- if (!op.options.getEnableResubscribe()) {
- sChannelManager.getSubscriptionEventEmitter().emitSubscriptionEvent(
- topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(), event);
- return;
- }
- // Since the connection to the server host that was responsible
- // for the topic died, we are not sure about the state of that
- // server. Resend the original subscribe request data to the default
- // server host/VIP. Also clear out all of the servers we've
- // contacted or attempted to from this request as we are starting a
- // "fresh" subscribe request.
- op.clearServersList();
- // Set a new type of VoidCallback for this async call. We need this
- // hook so after the resubscribe has completed, delivery for
- // that topic subscriber should also be restarted (if it was that
- // case before the channel disconnect).
- final long retryWaitTime = cfg.getSubscribeReconnectRetryWaitTime();
- ResubscribeCallback resubscribeCb =
- new ResubscribeCallback(topicSubscriber, op,
- sChannelManager, retryWaitTime);
- op.setCallback(resubscribeCb);
- op.context = null;
- if (logger.isDebugEnabled()) {
- logger.debug("Resubscribe {} with origSubData {}",
- va(topicSubscriber, op));
- }
- // resubmit the request
- sChannelManager.submitOp(op);
- }
- }
-
- protected final ReentrantReadWriteLock disconnectLock =
- new ReentrantReadWriteLock();
-
// the underlying subscription channel
volatile HChannel hChannel;
- InetSocketAddress host;
- protected final ConcurrentMap<TopicSubscriber, ActiveSubscriber> subscriptions
- = new ConcurrentHashMap<TopicSubscriber, ActiveSubscriber>();
private final MultiplexHChannelManager sChannelManager;
protected MultiplexSubscribeResponseHandler(ClientConfiguration cfg,
@@ -321,23 +54,11 @@ public class MultiplexSubscribeResponseH
sChannelManager = (MultiplexHChannelManager) channelManager;
}
- protected HChannelManager getHChannelManager() {
- return this.sChannelManager;
- }
-
- protected ClientConfiguration getConfiguration() {
- return cfg;
- }
-
- private ActiveSubscriber getActiveSubscriber(TopicSubscriber ts) {
- return subscriptions.get(ts);
- }
-
@Override
public void handleResponse(PubSubResponse response, PubSubData pubSubData,
Channel channel) throws Exception {
if (null == hChannel) {
- host = NetUtils.getHostFromChannel(channel);
+ InetSocketAddress host = NetUtils.getHostFromChannel(channel);
hChannel = sChannelManager.getSubscriptionChannel(host);
if (null == hChannel ||
!channel.equals(hChannel.getChannel())) {
@@ -347,205 +68,13 @@ public class MultiplexSubscribeResponseH
return;
}
}
- if (logger.isDebugEnabled()) {
- logger.debug("Handling a Subscribe response: {}, pubSubData: {}, host: {}.",
- va(response, pubSubData, NetUtils.getHostFromChannel(channel)));
- }
- switch (response.getStatusCode()) {
- case SUCCESS:
- TopicSubscriber ts = new TopicSubscriber(pubSubData.topic,
- pubSubData.subscriberId);
- SubscriptionPreferences preferences = null;
- if (response.hasResponseBody()) {
- ResponseBody respBody = response.getResponseBody();
- if (respBody.hasSubscribeResponse()) {
- SubscribeResponse resp = respBody.getSubscribeResponse();
- if (resp.hasPreferences()) {
- preferences = resp.getPreferences();
- if (logger.isDebugEnabled()) {
- logger.debug("Receive subscription preferences for {} : {}",
- va(ts,
- SubscriptionStateUtils.toString(preferences)));
- }
- }
- }
- }
-
- ActiveSubscriber ss = new ActiveSubscriber(ts, pubSubData, preferences,
- channel);
-
- boolean success = false;
- // Store the Subscribe state
- disconnectLock.readLock().lock();
- try {
- ActiveSubscriber oldSS = subscriptions.putIfAbsent(ts, ss);
- if (null != oldSS) {
- logger.warn("Subscribe {} has existed in channel {}.",
- va(ts, channel));
- success = false;
- } else {
- logger.debug("Succeed to add subscription {} in channel {}.",
- va(ts, channel));
- success = true;
- }
- } finally {
- disconnectLock.readLock().unlock();
- }
- if (success) {
- // Response was success so invoke the callback's operationFinished
- // method.
- pubSubData.getCallback().operationFinished(pubSubData.context, null);
- } else {
- ClientAlreadySubscribedException exception =
- new ClientAlreadySubscribedException("Client is already subscribed for " + ts);
- pubSubData.getCallback().operationFailed(pubSubData.context, exception);
- }
- break;
- case CLIENT_ALREADY_SUBSCRIBED:
- // For Subscribe requests, the server says that the client is
- // already subscribed to it.
- pubSubData.getCallback().operationFailed(pubSubData.context, new ClientAlreadySubscribedException(
- "Client is already subscribed for topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: "
- + pubSubData.subscriberId.toStringUtf8()));
- break;
- case SERVICE_DOWN:
- // Response was service down failure so just invoke the callback's
- // operationFailed method.
- pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
- "Server responded with a SERVICE_DOWN status"));
- break;
- case NOT_RESPONSIBLE_FOR_TOPIC:
- // Redirect response so we'll need to repost the original Subscribe
- // Request
- handleRedirectResponse(response, pubSubData, channel);
- break;
- default:
- // Consider all other status codes as errors, operation failed
- // cases.
- logger.error("Unexpected error response from server for PubSubResponse: " + response);
- pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
- "Server responded with a status code of: " + response.getStatusCode()));
- break;
- }
- }
-
- @Override
- public void handleSubscribeMessage(PubSubResponse response) {
- Message message = response.getMessage();
- TopicSubscriber ts = new TopicSubscriber(response.getTopic(),
- response.getSubscriberId());
- if (logger.isDebugEnabled()) {
- logger.debug("Handling a Subscribe message in response: {}, {}",
- va(response, ts));
- }
- ActiveSubscriber ss = getActiveSubscriber(ts);
- if (null == ss) {
- logger.error("Subscriber {} is not found receiving its message {}.",
- va(ts, MessageIdUtils.msgIdToReadableString(message.getMsgId())));
- return;
- }
- ss.handleMessage(message);
+ super.handleResponse(response, pubSubData, channel);
}
@Override
- protected void asyncMessageDeliver(TopicSubscriber topicSubscriber,
- Message message) {
- ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
- if (null == ss) {
- logger.error("Subscriber {} is not found delivering its message {}.",
- va(topicSubscriber,
- MessageIdUtils.msgIdToReadableString(message.getMsgId())));
- return;
- }
- ss.asyncMessageDeliver(message);
- }
-
- @Override
- protected void messageConsumed(TopicSubscriber topicSubscriber,
- Message message) {
- ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
- if (null == ss) {
- logger.warn("Subscriber {} is not found consumed its message {}.",
- va(topicSubscriber,
- MessageIdUtils.msgIdToReadableString(message.getMsgId())));
- return;
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Message has been successfully consumed by the client app : {}, {}",
- va(message, topicSubscriber));
- }
- // For consume response to server, there is a config param on how many
- // messages to consume and buffer up before sending the consume request.
- // We just need to keep a count of the number of messages consumed
- // and the largest/latest msg ID seen so far in this batch. Messages
- // should be delivered in order and without gaps. Do this only if
- // auto-sending of consume messages is enabled.
- if (cfg.isAutoSendConsumeMessageEnabled()) {
- // Update these variables only if we are auto-sending consume
- // messages to the server. Otherwise the onus is on the client app
- // to call the Subscriber consume API to let the server know which
- // messages it has successfully consumed.
- if (ss.updateLastMessageSeqId(message.getMsgId())) {
- // Send the consume request and reset the consumed messages buffer
- // variables. We will use the same Channel created from the
- // subscribe request for the TopicSubscriber.
- if (logger.isDebugEnabled()) {
- logger.debug("Consume message {} when reaching consumed message buffer limit.",
- message.getMsgId());
- }
- ss.consume(message.getMsgId());
- }
- }
- }
-
- @Override
- public void handleSubscriptionEvent(ByteString topic, ByteString subscriberId,
- SubscriptionEvent event) {
- TopicSubscriber ts = new TopicSubscriber(topic, subscriberId);
- ActiveSubscriber ss = getActiveSubscriber(ts);
- if (null == ss) {
- logger.warn("No subscription {} found receiving subscription event {}.",
- va(ts, event));
- return;
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Received subscription event {} for ({}).",
- va(event, ts));
- }
- ss.processEvent(topic, subscriberId, event);
- }
-
- @Override
- public void startDelivery(final TopicSubscriber topicSubscriber,
- MessageHandler messageHandler)
- throws ClientNotSubscribedException, AlreadyStartDeliveryException {
- ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
- if (null == ss) {
- throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Start delivering message for {} using message handler {}",
- va(topicSubscriber, messageHandler));
- }
- ss.startDelivery(messageHandler);
- }
-
- @Override
- public void stopDelivery(final TopicSubscriber topicSubscriber)
- throws ClientNotSubscribedException {
- ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
- if (null == ss) {
- throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Stop delivering messages for {}", topicSubscriber);
- }
- ss.stopDelivery();
- }
-
- @Override
- public boolean hasSubscription(TopicSubscriber topicSubscriber) {
- return subscriptions.containsKey(topicSubscriber);
+ protected void handleSuccessResponse(TopicSubscriber ts, ActiveSubscriber as,
+ Channel channel) {
+ // do nothing now
}
@Override
@@ -562,12 +91,7 @@ public class MultiplexSubscribeResponseH
Callback<ResponseBody> closeCb = new Callback<ResponseBody>() {
@Override
public void operationFinished(Object ctx, ResponseBody respBody) {
- disconnectLock.readLock().lock();
- try {
- subscriptions.remove(topicSubscriber, ss);
- } finally {
- disconnectLock.readLock().unlock();
- }
+ removeSubscription(topicSubscriber, ss);
callback.operationFinished(context, null);
}
@@ -583,40 +107,4 @@ public class MultiplexSubscribeResponseH
hChannel.submitOp(closeOp);
}
- @Override
- public void consume(final TopicSubscriber topicSubscriber,
- final MessageSeqId messageSeqId) {
- ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
- if (null == ss) {
- logger.warn("Subscriber {} is not found consuming message {}.",
- va(topicSubscriber,
- MessageIdUtils.msgIdToReadableString(messageSeqId)));
- return;
- }
- ss.consume(messageSeqId);
- }
-
- @Override
- public void onChannelDisconnected(InetSocketAddress host, Channel channel) {
- disconnectLock.writeLock().lock();
- try {
- onDisconnect(host);
- } finally {
- disconnectLock.writeLock().unlock();
- }
- }
-
- private void onDisconnect(InetSocketAddress host) {
- for (ActiveSubscriber ss : subscriptions.values()) {
- onDisconnect(ss, host);
- }
- }
-
- private void onDisconnect(ActiveSubscriber ss, InetSocketAddress host) {
- TopicSubscriber ts = ss.getTopicSubscriber();
- logger.info("Subscription channel for ({}) is disconnected.", ts);
- ss.processEvent(ts.getTopic(), ts.getSubscriberId(),
- SubscriptionEvent.TOPIC_MOVED);
- }
-
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java?rev=1418280&r1=1418279&r2=1418280&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java Fri Dec 7 11:21:52 2012
@@ -182,6 +182,7 @@ public class SimpleHChannelManager exten
startDelivery(topicSubscriber, messageHandler, false);
}
+ @Override
protected void restartDelivery(TopicSubscriber topicSubscriber)
throws ClientNotSubscribedException, AlreadyStartDeliveryException {
startDelivery(topicSubscriber, null, true);