You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/12/07 12:21:54 UTC
svn commit: r1418280 [2/2] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/
hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/
hedwig-client/src/main/java/org/apache/hedwig/cl...
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java?rev=1418280&r1=1418279&r2=1418280&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java Fri Dec 7 11:21:52 2012
@@ -18,13 +18,9 @@
package org.apache.hedwig.client.netty.impl.simple;
import java.net.InetSocketAddress;
+import java.util.Set;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
-
-import com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,380 +28,103 @@ import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
+import com.google.protobuf.ByteString;
+
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.MessageConsumeData;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.client.netty.FilterableMessageHandler;
+import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
+import org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler;
+import org.apache.hedwig.client.netty.impl.ActiveSubscriber;
import org.apache.hedwig.client.netty.impl.HChannelImpl;
import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.filter.ClientMessageFilter;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.protoextensions.MessageIdUtils;
import org.apache.hedwig.util.Callback;
-import static org.apache.hedwig.util.VarArgs.va;
-public class SimpleSubscribeResponseHandler extends SubscribeResponseHandler {
+public class SimpleSubscribeResponseHandler extends AbstractSubscribeResponseHandler {
private static Logger logger = LoggerFactory.getLogger(SimpleSubscribeResponseHandler.class);
- // Member variables used when this ResponseHandler is for a Subscribe
- // channel. We need to be able to consume messages sent back to us from
- // the server, and to also recreate the Channel connection if it ever goes
- // down. For that, we need to store the original PubSubData for the
- // subscribe request, and also the MessageHandler that was registered when
- // delivery of messages started for the subscription.
- private volatile PubSubData origSubData;
- private volatile TopicSubscriber origTopicSubscriber;
- private SubscriptionPreferences preferences;
- private Channel subscribeChannel;
- private MessageHandler messageHandler;
- // Counter for the number of consumed messages so far to buffer up before we
- // send the Consume message back to the server along with the last/largest
- // message seq ID seen so far in that batch.
- private int numConsumedMessagesInBuffer = 0;
- private MessageSeqId lastMessageSeqId;
- // Queue used for subscribes when the MessageHandler hasn't been registered
- // yet but we've already received subscription messages from the server.
- // This will be lazily created as needed.
- private Queue<Message> subscribeMsgQueue;
- // Set to store all of the outstanding subscribed messages that are pending
- // to be consumed by the client app's MessageHandler. If this ever grows too
- // big (e.g. problem at the client end for message consumption), we can
- // throttle things by temporarily setting the Subscribe Netty Channel
- // to not be readable. When the Set has shrunk sufficiently, we can turn the
- // channel back on to read new messages.
- private Set<Message> outstandingMsgSet;
-
- private SimpleHChannelManager sChannelManager;
-
- protected SimpleSubscribeResponseHandler(ClientConfiguration cfg,
- HChannelManager channelManager) {
- super(cfg, channelManager);
- sChannelManager = (SimpleHChannelManager) channelManager;
- origTopicSubscriber = null;
- }
-
- protected HChannelManager getHChannelManager() {
- return this.sChannelManager;
- }
-
- protected ClientConfiguration getConfiguration() {
- return cfg;
- }
-
- protected MessageHandler getMessageHandler() {
- return messageHandler;
- }
-
- @Override
- public void handleResponse(PubSubResponse response, PubSubData pubSubData,
- Channel channel) throws Exception {
- // If this was not a successful response to the Subscribe request, we
- // won't be using the Netty Channel created so just close it.
- if (!response.getStatusCode().equals(StatusCode.SUCCESS)) {
- HChannelImpl.getHChannelHandlerFromChannel(channel).closeExplicitly();
- channel.close();
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug("Handling a Subscribe response: {}, pubSubData: {}, host: {}.",
- va(response, pubSubData, NetUtils.getHostFromChannel(channel)));
- }
- switch (response.getStatusCode()) {
- case SUCCESS:
- // Store the original PubSubData used to create this successful
- // Subscribe request.
- origSubData = pubSubData;
- origTopicSubscriber = new TopicSubscriber(pubSubData.topic,
- pubSubData.subscriberId);
- synchronized(this) {
- // For successful Subscribe requests, store this Channel locally
- // and set it to not be readable initially.
- // This way we won't be delivering messages for this topic
- // subscription until the client explicitly says so.
- subscribeChannel = channel;
- subscribeChannel.setReadable(false);
- if (response.hasResponseBody()) {
- ResponseBody respBody = response.getResponseBody();
- if (respBody.hasSubscribeResponse()) {
- SubscribeResponse resp = respBody.getSubscribeResponse();
- if (resp.hasPreferences()) {
- preferences = resp.getPreferences();
- if (logger.isDebugEnabled()) {
- logger.debug("Receive subscription preferences for {} : {}",
- va(origTopicSubscriber,
- SubscriptionStateUtils.toString(preferences)));
- }
- }
- }
- }
-
- // Store the mapping for the TopicSubscriber to the Channel.
- // This is so we can control the starting and stopping of
- // message deliveries from the server on that Channel. Store
- // this only on a successful ack response from the server.
- sChannelManager.storeSubscriptionChannel(origTopicSubscriber,
- channel);
-
- // Lazily create the Set (from a concurrent hashmap) to keep track
- // of outstanding Messages to be consumed by the client app. At this
- // stage, delivery for that topic hasn't started yet so creation of
- // this Set should be thread safe. We'll create the Set with an initial
- // capacity equal to the configured parameter for the maximum number of
- // outstanding messages to allow. The load factor will be set to
- // 1.0f which means we'll only rehash and allocate more space if
- // we ever exceed the initial capacity. That should be okay
- // because when that happens, things are slow already and piling
- // up on the client app side to consume messages.
- outstandingMsgSet = Collections.newSetFromMap(
- new ConcurrentHashMap<Message,Boolean>(
- cfg.getMaximumOutstandingMessages(), 1.0f));
- }
- // Response was success so invoke the callback's operationFinished
- // method.
- pubSubData.getCallback().operationFinished(pubSubData.context, null);
- break;
- case CLIENT_ALREADY_SUBSCRIBED:
- // For Subscribe requests, the server says that the client is
- // already subscribed to it.
- pubSubData.getCallback().operationFailed(pubSubData.context, new ClientAlreadySubscribedException(
- "Client is already subscribed for topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: "
- + pubSubData.subscriberId.toStringUtf8()));
- break;
- case SERVICE_DOWN:
- // Response was service down failure so just invoke the callback's
- // operationFailed method.
- pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
- "Server responded with a SERVICE_DOWN status"));
- break;
- case NOT_RESPONSIBLE_FOR_TOPIC:
- // Redirect response so we'll need to repost the original Subscribe
- // Request
- handleRedirectResponse(response, pubSubData, channel);
- break;
- default:
- // Consider all other status codes as errors, operation failed
- // cases.
- logger.error("Unexpected error response from server for PubSubResponse: {}", response);
- pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
- "Server responded with a status code of: " + response.getStatusCode()));
- break;
- }
- }
-
- @Override
- public void handleSubscribeMessage(PubSubResponse response) {
- if (logger.isDebugEnabled()) {
- logger.debug("Handling a Subscribe message in response: {}, {}",
- va(response, origTopicSubscriber));
- }
- Message message = response.getMessage();
-
- synchronized (this) {
- // Consume the message asynchronously that the client is subscribed
- // to. Do this only if delivery for the subscription has started and
- // a MessageHandler has been registered for the TopicSubscriber.
- if (messageHandler != null) {
- asyncMessageDeliver(origTopicSubscriber, message);
- } else {
- // MessageHandler has not yet been registered so queue up these
- // messages for the Topic Subscription. Make the initial lazy
- // creation of the message queue thread safe just so we don't
- // run into a race condition where two simultaneous threads process
- // a received message and both try to create a new instance of
- // the message queue. Performance overhead should be okay
- // because the delivery of the topic has not even started yet
- // so these messages are not consumed and just buffered up here.
- if (subscribeMsgQueue == null)
- subscribeMsgQueue = new LinkedList<Message>();
- if (logger.isDebugEnabled()) {
- logger
- .debug("Message has arrived but Subscribe channel does not have a registered MessageHandler yet so queueing up the message: {}",
- message);
- }
- subscribeMsgQueue.add(message);
- }
- }
- }
-
- @Override
- public void handleSubscriptionEvent(ByteString topic, ByteString subscriberId,
- SubscriptionEvent event) {
- Channel channel;
- synchronized (this) {
- channel = subscribeChannel;
- }
- if (null == channel) {
- logger.warn("No subscription channel found when receiving subscription event {} for (topic:{}, subscriber:{}).",
- va(event, topic, subscriberId));
- return;
+ /**
+ * Simple Active Subscriber enabling client-side throttling.
+ */
+ static class SimpleActiveSubscriber extends ActiveSubscriber {
+
+ // Set to store all of the outstanding subscribed messages that are pending
+ // to be consumed by the client app's MessageHandler. If this ever grows too
+ // big (e.g. problem at the client end for message consumption), we can
+ // throttle things by temporarily setting the Subscribe Netty Channel
+ // to not be readable. When the Set has shrunk sufficiently, we can turn the
+ // channel back on to read new messages.
+ private final Set<Message> outstandingMsgSet;
+
+ public SimpleActiveSubscriber(ClientConfiguration cfg,
+ AbstractHChannelManager channelManager,
+ TopicSubscriber ts, PubSubData op,
+ SubscriptionPreferences preferences,
+ Channel channel) {
+ super(cfg, channelManager, ts, op, preferences, channel);
+ outstandingMsgSet = Collections.newSetFromMap(
+ new ConcurrentHashMap<Message, Boolean>(
+ cfg.getMaximumOutstandingMessages(), 1.0f));
}
- processSubscriptionEvent(event, NetUtils.getHostFromChannel(channel), channel);
- }
- @Override
- protected void asyncMessageDeliver(TopicSubscriber topicSubscriber,
- Message message) {
- if (logger.isDebugEnabled()) {
- logger.debug("Call the client app's MessageHandler asynchronously to deliver the message {} to {}",
- va(message, topicSubscriber));
- }
- synchronized (this) {
+ @Override
+ protected void unsafeDeliverMessage(Message message) {
// Add this "pending to be consumed" message to the outstandingMsgSet.
outstandingMsgSet.add(message);
// Check if we've exceeded the max size for the outstanding message set.
- if (outstandingMsgSet.size() >= cfg.getMaximumOutstandingMessages()
- && subscribeChannel.isReadable()) {
+ if (outstandingMsgSet.size() >= cfg.getMaximumOutstandingMessages() &&
+ channel.isReadable()) {
// Too many outstanding messages so throttle it by setting the Netty
// Channel to not be readable.
if (logger.isDebugEnabled()) {
logger.debug("Too many outstanding messages ({}) so throttling the subscribe netty Channel",
outstandingMsgSet.size());
}
- subscribeChannel.setReadable(false);
+ channel.setReadable(false);
}
+ super.unsafeDeliverMessage(message);
}
- MessageConsumeData messageConsumeData =
- new MessageConsumeData(topicSubscriber, message);
- messageHandler.deliver(topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(),
- message, sChannelManager.getConsumeCallback(),
- messageConsumeData);
- }
- @Override
- protected synchronized void messageConsumed(TopicSubscriber topicSubscriber,
- Message message) {
- logger.debug("Message has been successfully consumed by the client app for message: {}, {}",
- message, topicSubscriber);
- // Update the consumed messages buffer variables
- if (cfg.isAutoSendConsumeMessageEnabled()) {
- // Update these variables only if we are auto-sending consume
- // messages to the server. Otherwise the onus is on the client app
- // to call the Subscriber consume API to let the server know which
- // messages it has successfully consumed.
- numConsumedMessagesInBuffer++;
- lastMessageSeqId = message.getMsgId();
- }
- // Remove this consumed message from the outstanding Message Set.
- outstandingMsgSet.remove(message);
-
- // For consume response to server, there is a config param on how many
- // messages to consume and buffer up before sending the consume request.
- // We just need to keep a count of the number of messages consumed
- // and the largest/latest msg ID seen so far in this batch. Messages
- // should be delivered in order and without gaps. Do this only if
- // auto-sending of consume messages is enabled.
- if (cfg.isAutoSendConsumeMessageEnabled()
- && numConsumedMessagesInBuffer >= cfg.getConsumedMessagesBufferSize()) {
- // Send the consume request and reset the consumed messages buffer
- // variables. We will use the same Channel created from the
- // subscribe request for the TopicSubscriber.
- if (logger.isDebugEnabled()) {
- logger
- .debug("Consumed message buffer limit reached so send the Consume Request to the server with lastMessageSeqId: {}",
- lastMessageSeqId);
+ @Override
+ public synchronized void messageConsumed(Message message) {
+ super.messageConsumed(message);
+ // Remove this consumed message from the outstanding Message Set.
+ outstandingMsgSet.remove(message);
+ // Check if we throttled message consumption previously when the
+ // outstanding message limit was reached. For now, only turn the
+ // delivery back on if there are no more outstanding messages to
+ // consume. We could make this a configurable parameter if needed.
+ if (!channel.isReadable() && outstandingMsgSet.size() == 0) {
+ if (logger.isDebugEnabled())
+ logger.debug("Message consumption has caught up so okay to turn off"
+ + " throttling of messages on the subscribe channel for {}",
+ topicSubscriber);
+ channel.setReadable(true);
}
- consume(topicSubscriber, lastMessageSeqId);
- numConsumedMessagesInBuffer = 0;
- lastMessageSeqId = null;
- }
-
- // Check if we throttled message consumption previously when the
- // outstanding message limit was reached. For now, only turn the
- // delivery back on if there are no more outstanding messages to
- // consume. We could make this a configurable parameter if needed.
- if (!subscribeChannel.isReadable() && outstandingMsgSet.size() == 0) {
- if (logger.isDebugEnabled())
- logger
- .debug("Message consumption has caught up so okay to turn off throttling of messages on the subscribe channel for {}",
- topicSubscriber);
- subscribeChannel.setReadable(true);
}
- }
- @Override
- public void startDelivery(final TopicSubscriber topicSubscriber,
- MessageHandler messageHandler)
- throws ClientNotSubscribedException, AlreadyStartDeliveryException {
- if (logger.isDebugEnabled()) {
- logger.debug("Start delivering message for {} using message handler {}",
- va(topicSubscriber, messageHandler));
- }
- if (!hasSubscription(topicSubscriber)) {
- throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
- }
- synchronized (this) {
- if (null != this.messageHandler) {
- throw new AlreadyStartDeliveryException("A message handler " + this.messageHandler
- + " has been started for " + topicSubscriber);
- }
- // instantiante a message handler
- if (null != messageHandler &&
- messageHandler instanceof FilterableMessageHandler) {
- FilterableMessageHandler filterMsgHandler =
- (FilterableMessageHandler) messageHandler;
- // pass subscription preferences to message filter
- if (null == preferences) {
- // no preferences means talking to an old version hub server
- logger.warn("Start delivering messages with filter but no subscription "
- + "preferences found. It might due to talking to an old version"
- + " hub server.");
- // use the original message handler.
- messageHandler = filterMsgHandler.getMessageHandler();
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("Start delivering messages with filter on {}, preferences: {}",
- va(topicSubscriber,
- SubscriptionStateUtils.toString(preferences)));
- }
- ClientMessageFilter msgFilter = filterMsgHandler.getMessageFilter();
- msgFilter.setSubscriptionPreferences(
- topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(),
- preferences);
- }
- }
-
- this.messageHandler = messageHandler;
- // Once the MessageHandler is registered, see if we have any queued up
- // subscription messages sent to us already from the server. If so,
- // consume those first. Do this only if the MessageHandler registered is
- // not null (since that would be the HedwigSubscriber.stopDelivery
- // call).
- if (messageHandler != null && subscribeMsgQueue != null && subscribeMsgQueue.size() > 0) {
- if (logger.isDebugEnabled()) {
- logger.debug("Consuming {} queued up messages for {}",
- va(subscribeMsgQueue.size(), topicSubscriber));
- }
- for (Message message : subscribeMsgQueue) {
- asyncMessageDeliver(topicSubscriber, message);
- }
- // Now we can remove the queued up messages since they are all
- // consumed.
- subscribeMsgQueue.clear();
- }
+ @Override
+ public synchronized void startDelivery(MessageHandler messageHandler)
+ throws AlreadyStartDeliveryException, ClientNotSubscribedException {
+ super.startDelivery(messageHandler);
// Now make the TopicSubscriber Channel readable (it is set to not be
// readable when the initial subscription is done). Note that this is an
// asynchronous call. If this fails (not likely), the futureListener
// will just log an error message for now.
- ChannelFuture future = subscribeChannel.setReadable(true);
+ ChannelFuture future = channel.setReadable(true);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@@ -416,26 +135,15 @@ public class SimpleSubscribeResponseHand
}
});
}
- }
-
- @Override
- public void stopDelivery(final TopicSubscriber topicSubscriber)
- throws ClientNotSubscribedException {
- if (logger.isDebugEnabled()) {
- logger.debug("Stop delivering messages for {}", topicSubscriber);
- }
- if (!hasSubscription(topicSubscriber)) {
- throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
- }
-
- synchronized (this) {
- this.messageHandler = null;
+ @Override
+ public synchronized void stopDelivery() {
+ super.stopDelivery();
// Now make the TopicSubscriber channel not-readable. This will buffer
// up messages if any are sent from the server. Note that this is an
// asynchronous call. If this fails (not likely), the futureListener
// will just log an error message for now.
- ChannelFuture future = subscribeChannel.setReadable(false);
+ ChannelFuture future = channel.setReadable(false);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@@ -446,79 +154,114 @@ public class SimpleSubscribeResponseHand
}
});
}
+
+ }
+
+ // Track which subscriber is alive in this response handler
+ // Which is used for backward compat, since old version hub
+ // server doesn't carry (topic, subscriberid) in each message.
+ private volatile TopicSubscriber origTopicSubscriber;
+ private volatile ActiveSubscriber origActiveSubscriber;
+
+ private SimpleHChannelManager sChannelManager;
+
+ protected SimpleSubscribeResponseHandler(ClientConfiguration cfg,
+ HChannelManager channelManager) {
+ super(cfg, channelManager);
+ sChannelManager = (SimpleHChannelManager) channelManager;
+ }
+
+ @Override
+ protected ActiveSubscriber createActiveSubscriber(
+ ClientConfiguration cfg, AbstractHChannelManager channelManager,
+ TopicSubscriber ts, PubSubData op, SubscriptionPreferences preferences,
+ Channel channel) {
+ return new SimpleActiveSubscriber(cfg, channelManager, ts, op, preferences, channel);
+ }
+
+ @Override
+ protected synchronized ActiveSubscriber getActiveSubscriber(TopicSubscriber ts) {
+ if (null == origTopicSubscriber || !origTopicSubscriber.equals(ts)) {
+ return null;
+ }
+ return origActiveSubscriber;
+ }
+
+ private synchronized ActiveSubscriber getActiveSubscriber() {
+ return origActiveSubscriber;
}
@Override
- public boolean hasSubscription(TopicSubscriber topicSubscriber) {
+ public synchronized boolean hasSubscription(TopicSubscriber ts) {
if (null == origTopicSubscriber) {
return false;
- } else {
- return origTopicSubscriber.equals(topicSubscriber);
}
+ return origTopicSubscriber.equals(ts);
}
@Override
- public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
- final Callback<ResponseBody> callback,
- final Object context) {
- // nothing to do just clear status
- // channel manager takes the responsibility to close the channel
- callback.operationFinished(context, (ResponseBody)null);
+ protected synchronized boolean removeSubscription(TopicSubscriber ts, ActiveSubscriber ss) {
+ if (null != origTopicSubscriber && !origTopicSubscriber.equals(ts)) {
+ return false;
+ }
+ origTopicSubscriber = null;
+ origActiveSubscriber = null;
+ return super.removeSubscription(ts, ss);
}
@Override
- public synchronized void consume(final TopicSubscriber topicSubscriber,
- final MessageSeqId messageSeqId) {
- PubSubRequest.Builder pubsubRequestBuilder =
- NetUtils.buildConsumeRequest(sChannelManager.nextTxnId(),
- topicSubscriber, messageSeqId);
-
- // For Consume requests, we will send them from the client in a fire and
- // forget manner. We are not expecting the server to send back an ack
- // response so no need to register this in the ResponseHandler. There
- // are no callbacks to invoke since this isn't a client initiated
- // action. Instead, just have a future listener that will log an error
- // message if there was a problem writing the consume request.
- if (logger.isDebugEnabled()) {
- logger.debug("Writing a Consume request to host: {} with messageSeqId: {} for {}",
- va(NetUtils.getHostFromChannel(subscribeChannel),
- messageSeqId, topicSubscriber));
+ public void handleResponse(PubSubResponse response, PubSubData pubSubData,
+ Channel channel) throws Exception {
+ // If this was not a successful response to the Subscribe request, we
+ // won't be using the Netty Channel created so just close it.
+ if (!response.getStatusCode().equals(StatusCode.SUCCESS)) {
+ HChannelImpl.getHChannelHandlerFromChannel(channel).closeExplicitly();
+ channel.close();
}
- ChannelFuture future = subscribeChannel.write(pubsubRequestBuilder.build());
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- logger.error("Error writing a Consume request to host: {} with messageSeqId: {} for {}",
- va(NetUtils.getHostFromChannel(subscribeChannel),
- messageSeqId, topicSubscriber));
- }
- }
- });
+ super.handleResponse(response, pubSubData, channel);
}
@Override
- public void onChannelDisconnected(InetSocketAddress host,
- Channel channel) {
- if (origTopicSubscriber == null) {
+ public void handleSubscribeMessage(PubSubResponse response) {
+ Message message = response.getMessage();
+ ActiveSubscriber ss = getActiveSubscriber();
+ if (null == ss) {
+ logger.error("No Subscriber is alive receiving its message {}.",
+ MessageIdUtils.msgIdToReadableString(message.getMsgId()));
return;
}
- processSubscriptionEvent(SubscriptionEvent.TOPIC_MOVED, host, channel);
+ ss.handleMessage(message);
}
- private void processSubscriptionEvent(final SubscriptionEvent event, InetSocketAddress host,
- final Channel channel) {
- if (SubscriptionEvent.TOPIC_MOVED != event &&
- SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED != event) {
- logger.warn("Ignore subscription event {} received from channel {}.",
- event, channel);
- return;
- }
- if (SubscriptionEvent.TOPIC_MOVED == event) {
- sChannelManager.clearHostForTopic(origTopicSubscriber.getTopic(), host);
+ @Override
+ protected void handleSuccessResponse(TopicSubscriber ts, ActiveSubscriber as,
+ Channel channel) {
+ synchronized (this) {
+ origTopicSubscriber = ts;
+ origActiveSubscriber = as;
}
+ // Store the mapping for the TopicSubscriber to the Channel.
+ // This is so we can control the starting and stopping of
+ // message deliveries from the server on that Channel. Store
+ // this only on a successful ack response from the server.
+ sChannelManager.storeSubscriptionChannel(ts, channel);
+ }
+
+ @Override
+ public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
+ final Callback<ResponseBody> callback,
+ final Object context) {
+ // nothing to do just clear status
+ // channel manager takes the responsibility to close the channel
+ callback.operationFinished(context, (ResponseBody)null);
+ }
+
+ @Override
+ protected void resubscribeIfNecessary(final ActiveSubscriber ss,
+ final SubscriptionEvent event) {
+ final TopicSubscriber ts = ss.getTopicSubscriber();
// clear subscription status
- sChannelManager.asyncCloseSubscription(origTopicSubscriber, new Callback<ResponseBody>() {
+ sChannelManager.asyncCloseSubscription(ts, new Callback<ResponseBody>() {
@Override
public void operationFinished(Object ctx, ResponseBody result) {
@@ -531,39 +274,9 @@ public class SimpleSubscribeResponseHand
}
private void finish() {
- // Since the connection to the server host that was responsible
- // for the topic died, we are not sure about the state of that
- // server. Resend the original subscribe request data to the default
- // server host/VIP. Also clear out all of the servers we've
- // contacted or attempted to from this request as we are starting a
- // "fresh" subscribe request.
- origSubData.clearServersList();
- // do resubscribe if the subscription enables it
- if (origSubData.options.getEnableResubscribe()) {
- // Set a new type of VoidCallback for this async call. We need this
- // hook so after the subscribe reconnect has completed, delivery for
- // that topic subscriber should also be restarted (if it was that
- // case before the channel disconnect).
- final long retryWaitTime = cfg.getSubscribeReconnectRetryWaitTime();
- SubscribeReconnectCallback reconnectCb =
- new SubscribeReconnectCallback(origTopicSubscriber,
- origSubData,
- sChannelManager,
- retryWaitTime);
- origSubData.setCallback(reconnectCb);
- origSubData.context = null;
- // Clear the shouldClaim flag
- origSubData.shouldClaim = false;
- logger.debug("Reconnect {}'s subscription channel with origSubData {}",
- origTopicSubscriber, origSubData);
- sChannelManager.submitOpToDefaultServer(origSubData);
- } else {
- logger.info("Subscription channel {} for ({}) is disconnected.",
- channel, origTopicSubscriber);
- sChannelManager.getSubscriptionEventEmitter().emitSubscriptionEvent(
- origSubData.topic, origSubData.subscriberId, event);
- }
+ SimpleSubscribeResponseHandler.super.resubscribeIfNecessary(ss, event);
}
+
}, null);
}