You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/27 01:52:20 UTC
svn commit: r1390777 [4/4] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/java/org/apache/hedwig/client/data/
hedwig-client/src/main/java/org/apache/hedwig/client/handlers/
hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig...
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl.simple;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import com.google.protobuf.ByteString;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.MessageConsumeData;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+import org.apache.hedwig.client.netty.HChannelManager;
+import org.apache.hedwig.client.netty.HChannel;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.client.netty.FilterableMessageHandler;
+import org.apache.hedwig.client.netty.impl.HChannelImpl;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.filter.ClientMessageFilter;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
+
+public class SimpleSubscribeResponseHandler extends SubscribeResponseHandler {
+
+ private static Logger logger = LoggerFactory.getLogger(SimpleSubscribeResponseHandler.class);
+
+ // Member variables used when this ResponseHandler is for a Subscribe
+ // channel. We need to be able to consume messages sent back to us from
+ // the server, and to also recreate the Channel connection if it ever goes
+ // down. For that, we need to store the original PubSubData for the
+ // subscribe request, and also the MessageHandler that was registered when
+ // delivery of messages started for the subscription.
+ private volatile PubSubData origSubData;
+ private volatile TopicSubscriber origTopicSubscriber;
+ private SubscriptionPreferences preferences;
+ private Channel subscribeChannel;
+ private MessageHandler messageHandler;
+ // Counter for the number of consumed messages so far to buffer up before we
+ // send the Consume message back to the server along with the last/largest
+ // message seq ID seen so far in that batch.
+ private int numConsumedMessagesInBuffer = 0;
+ private MessageSeqId lastMessageSeqId;
+ // Queue used for subscribes when the MessageHandler hasn't been registered
+ // yet but we've already received subscription messages from the server.
+ // This will be lazily created as needed.
+ private Queue<Message> subscribeMsgQueue;
+ // Set to store all of the outstanding subscribed messages that are pending
+ // to be consumed by the client app's MessageHandler. If this ever grows too
+ // big (e.g. problem at the client end for message consumption), we can
+ // throttle things by temporarily setting the Subscribe Netty Channel
+ // to not be readable. When the Set has shrunk sufficiently, we can turn the
+ // channel back on to read new messages.
+ private Set<Message> outstandingMsgSet;
+
+ private SimpleHChannelManager sChannelManager;
+
+ protected SimpleSubscribeResponseHandler(ClientConfiguration cfg,
+ HChannelManager channelManager) {
+ super(cfg, channelManager);
+ sChannelManager = (SimpleHChannelManager) channelManager;
+ }
+
+ protected HChannelManager getHChannelManager() {
+ return this.sChannelManager;
+ }
+
+ protected ClientConfiguration getConfiguration() {
+ return cfg;
+ }
+
+ protected MessageHandler getMessageHandler() {
+ return messageHandler;
+ }
+
+ @Override
+ public void handleResponse(PubSubResponse response, PubSubData pubSubData,
+ Channel channel) throws Exception {
+ // If this was not a successful response to the Subscribe request, we
+ // won't be using the Netty Channel created so just close it.
+ if (!response.getStatusCode().equals(StatusCode.SUCCESS)) {
+ HChannelImpl.getHChannelHandlerFromChannel(channel).closeExplicitly();
+ channel.close();
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Handling a Subscribe response: {}, pubSubData: {}, host: {}.",
+ va(response, pubSubData, NetUtils.getHostFromChannel(channel)));
+ }
+ switch (response.getStatusCode()) {
+ case SUCCESS:
+ // Store the original PubSubData used to create this successful
+ // Subscribe request.
+ origSubData = pubSubData;
+ origTopicSubscriber = new TopicSubscriber(pubSubData.topic,
+ pubSubData.subscriberId);
+ synchronized(this) {
+ // For successful Subscribe requests, store this Channel locally
+ // and set it to not be readable initially.
+ // This way we won't be delivering messages for this topic
+ // subscription until the client explicitly says so.
+ subscribeChannel = channel;
+ subscribeChannel.setReadable(false);
+ if (response.hasResponseBody()) {
+ ResponseBody respBody = response.getResponseBody();
+ if (respBody.hasSubscribeResponse()) {
+ SubscribeResponse resp = respBody.getSubscribeResponse();
+ if (resp.hasPreferences()) {
+ preferences = resp.getPreferences();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Receive subscription preferences for {} : {}",
+ va(origTopicSubscriber,
+ SubscriptionStateUtils.toString(preferences)));
+ }
+ }
+ }
+ }
+
+ // Store the mapping for the TopicSubscriber to the Channel.
+ // This is so we can control the starting and stopping of
+ // message deliveries from the server on that Channel. Store
+ // this only on a successful ack response from the server.
+ sChannelManager.storeSubscriptionChannel(origTopicSubscriber,
+ channel);
+
+ // Lazily create the Set (from a concurrent hashmap) to keep track
+ // of outstanding Messages to be consumed by the client app. At this
+ // stage, delivery for that topic hasn't started yet so creation of
+ // this Set should be thread safe. We'll create the Set with an initial
+ // capacity equal to the configured parameter for the maximum number of
+ // outstanding messages to allow. The load factor will be set to
+ // 1.0f which means we'll only rehash and allocate more space if
+ // we ever exceed the initial capacity. That should be okay
+ // because when that happens, things are slow already and piling
+ // up on the client app side to consume messages.
+ outstandingMsgSet = Collections.newSetFromMap(
+ new ConcurrentHashMap<Message,Boolean>(
+ cfg.getMaximumOutstandingMessages(), 1.0f));
+ }
+ // Response was success so invoke the callback's operationFinished
+ // method.
+ pubSubData.getCallback().operationFinished(pubSubData.context, null);
+ break;
+ case CLIENT_ALREADY_SUBSCRIBED:
+ // For Subscribe requests, the server says that the client is
+ // already subscribed to it.
+ pubSubData.getCallback().operationFailed(pubSubData.context, new ClientAlreadySubscribedException(
+ "Client is already subscribed for topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: "
+ + pubSubData.subscriberId.toStringUtf8()));
+ break;
+ case SERVICE_DOWN:
+ // Response was service down failure so just invoke the callback's
+ // operationFailed method.
+ pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
+ "Server responded with a SERVICE_DOWN status"));
+ break;
+ case NOT_RESPONSIBLE_FOR_TOPIC:
+ // Redirect response so we'll need to repost the original Subscribe
+ // Request
+ handleRedirectResponse(response, pubSubData, channel);
+ break;
+ default:
+ // Consider all other status codes as errors, operation failed
+ // cases.
+ logger.error("Unexpected error response from server for PubSubResponse: {}", response);
+ pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
+ "Server responded with a status code of: " + response.getStatusCode()));
+ break;
+ }
+ }
+
+ @Override
+ public void handleSubscribeMessage(PubSubResponse response) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Handling a Subscribe message in response: {}, {}",
+ va(response, origTopicSubscriber));
+ }
+ Message message = response.getMessage();
+
+ synchronized (this) {
+ // Consume the message asynchronously that the client is subscribed
+ // to. Do this only if delivery for the subscription has started and
+ // a MessageHandler has been registered for the TopicSubscriber.
+ if (messageHandler != null) {
+ asyncMessageDeliver(origTopicSubscriber, message);
+ } else {
+ // MessageHandler has not yet been registered so queue up these
+ // messages for the Topic Subscription. Make the initial lazy
+ // creation of the message queue thread safe just so we don't
+ // run into a race condition where two simultaneous threads process
+ // a received message and both try to create a new instance of
+ // the message queue. Performance overhead should be okay
+ // because the delivery of the topic has not even started yet
+ // so these messages are not consumed and just buffered up here.
+ if (subscribeMsgQueue == null)
+ subscribeMsgQueue = new LinkedList<Message>();
+ if (logger.isDebugEnabled()) {
+ logger
+ .debug("Message has arrived but Subscribe channel does not have a registered MessageHandler yet so queueing up the message: {}",
+ message);
+ }
+ subscribeMsgQueue.add(message);
+ }
+ }
+ }
+
+ @Override
+ protected void asyncMessageDeliver(TopicSubscriber topicSubscriber,
+ Message message) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Call the client app's MessageHandler asynchronously to deliver the message {} to {}",
+ va(message, topicSubscriber));
+ }
+ synchronized (this) {
+ // Add this "pending to be consumed" message to the outstandingMsgSet.
+ outstandingMsgSet.add(message);
+ // Check if we've exceeded the max size for the outstanding message set.
+ if (outstandingMsgSet.size() >= cfg.getMaximumOutstandingMessages()
+ && subscribeChannel.isReadable()) {
+ // Too many outstanding messages so throttle it by setting the Netty
+ // Channel to not be readable.
+ if (logger.isDebugEnabled()) {
+ logger.debug("Too many outstanding messages ({}) so throttling the subscribe netty Channel",
+ outstandingMsgSet.size());
+ }
+ subscribeChannel.setReadable(false);
+ }
+ }
+ MessageConsumeData messageConsumeData =
+ new MessageConsumeData(topicSubscriber, message);
+ messageHandler.deliver(topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(),
+ message, sChannelManager.getConsumeCallback(),
+ messageConsumeData);
+ }
+
+ @Override
+ protected synchronized void messageConsumed(TopicSubscriber topicSubscriber,
+ Message message) {
+ logger.debug("Message has been successfully consumed by the client app for message: {}, {}",
+ message, topicSubscriber);
+ // Update the consumed messages buffer variables
+ if (cfg.isAutoSendConsumeMessageEnabled()) {
+ // Update these variables only if we are auto-sending consume
+ // messages to the server. Otherwise the onus is on the client app
+ // to call the Subscriber consume API to let the server know which
+ // messages it has successfully consumed.
+ numConsumedMessagesInBuffer++;
+ lastMessageSeqId = message.getMsgId();
+ }
+ // Remove this consumed message from the outstanding Message Set.
+ outstandingMsgSet.remove(message);
+
+ // For consume response to server, there is a config param on how many
+ // messages to consume and buffer up before sending the consume request.
+ // We just need to keep a count of the number of messages consumed
+ // and the largest/latest msg ID seen so far in this batch. Messages
+ // should be delivered in order and without gaps. Do this only if
+ // auto-sending of consume messages is enabled.
+ if (cfg.isAutoSendConsumeMessageEnabled()
+ && numConsumedMessagesInBuffer >= cfg.getConsumedMessagesBufferSize()) {
+ // Send the consume request and reset the consumed messages buffer
+ // variables. We will use the same Channel created from the
+ // subscribe request for the TopicSubscriber.
+ if (logger.isDebugEnabled()) {
+ logger
+ .debug("Consumed message buffer limit reached so send the Consume Request to the server with lastMessageSeqId: {}",
+ lastMessageSeqId);
+ }
+ consume(topicSubscriber, lastMessageSeqId);
+ numConsumedMessagesInBuffer = 0;
+ lastMessageSeqId = null;
+ }
+
+ // Check if we throttled message consumption previously when the
+ // outstanding message limit was reached. For now, only turn the
+ // delivery back on if there are no more outstanding messages to
+ // consume. We could make this a configurable parameter if needed.
+ if (!subscribeChannel.isReadable() && outstandingMsgSet.size() == 0) {
+ if (logger.isDebugEnabled())
+ logger
+ .debug("Message consumption has caught up so okay to turn off throttling of messages on the subscribe channel for {}",
+ topicSubscriber);
+ subscribeChannel.setReadable(true);
+ }
+ }
+
+ @Override
+ public void startDelivery(final TopicSubscriber topicSubscriber,
+ MessageHandler messageHandler)
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Start delivering message for {} using message handler {}",
+ va(topicSubscriber, messageHandler));
+ }
+ if (!hasSubscription(topicSubscriber)) {
+ throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
+ }
+ synchronized (this) {
+ if (null != this.messageHandler) {
+ throw new AlreadyStartDeliveryException("A message handler " + this.messageHandler
+ + " has been started for " + topicSubscriber);
+ }
+ // instantiante a message handler
+ if (null != messageHandler &&
+ messageHandler instanceof FilterableMessageHandler) {
+ FilterableMessageHandler filterMsgHandler =
+ (FilterableMessageHandler) messageHandler;
+ // pass subscription preferences to message filter
+ if (null == preferences) {
+ // no preferences means talking to an old version hub server
+ logger.warn("Start delivering messages with filter but no subscription "
+ + "preferences found. It might due to talking to an old version"
+ + " hub server.");
+ // use the original message handler.
+ messageHandler = filterMsgHandler.getMessageHandler();
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Start delivering messages with filter on {}, preferences: {}",
+ va(topicSubscriber,
+ SubscriptionStateUtils.toString(preferences)));
+ }
+ ClientMessageFilter msgFilter = filterMsgHandler.getMessageFilter();
+ msgFilter.setSubscriptionPreferences(
+ topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(),
+ preferences);
+ }
+ }
+
+ this.messageHandler = messageHandler;
+ // Once the MessageHandler is registered, see if we have any queued up
+ // subscription messages sent to us already from the server. If so,
+ // consume those first. Do this only if the MessageHandler registered is
+ // not null (since that would be the HedwigSubscriber.stopDelivery
+ // call).
+ if (messageHandler != null && subscribeMsgQueue != null && subscribeMsgQueue.size() > 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Consuming {} queued up messages for {}",
+ va(subscribeMsgQueue.size(), topicSubscriber));
+ }
+ for (Message message : subscribeMsgQueue) {
+ asyncMessageDeliver(topicSubscriber, message);
+ }
+ // Now we can remove the queued up messages since they are all
+ // consumed.
+ subscribeMsgQueue.clear();
+ }
+ // Now make the TopicSubscriber Channel readable (it is set to not be
+ // readable when the initial subscription is done). Note that this is an
+ // asynchronous call. If this fails (not likely), the futureListener
+ // will just log an error message for now.
+ ChannelFuture future = subscribeChannel.setReadable(true);
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.error("Unable to make subscriber Channel readable in startDelivery call for {}",
+ topicSubscriber);
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public void stopDelivery(final TopicSubscriber topicSubscriber)
+ throws ClientNotSubscribedException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stop delivering messages for {}", topicSubscriber);
+ }
+
+ if (!hasSubscription(topicSubscriber)) {
+ throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
+ }
+
+ synchronized (this) {
+ this.messageHandler = null;
+ // Now make the TopicSubscriber channel not-readable. This will buffer
+ // up messages if any are sent from the server. Note that this is an
+ // asynchronous call. If this fails (not likely), the futureListener
+ // will just log an error message for now.
+ ChannelFuture future = subscribeChannel.setReadable(false);
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.error("Unable to make subscriber Channel not readable in stopDelivery call for {}",
+ topicSubscriber);
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public boolean hasSubscription(TopicSubscriber topicSubscriber) {
+ if (null == origTopicSubscriber) {
+ return false;
+ } else {
+ return origTopicSubscriber.equals(topicSubscriber);
+ }
+ }
+
+ @Override
+ public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
+ final Callback<ResponseBody> callback,
+ final Object context) {
+ // nothing to do just clear status
+ // channel manager takes the responsibility to close the channel
+ callback.operationFinished(context, (ResponseBody)null);
+ }
+
+ @Override
+ public synchronized void consume(final TopicSubscriber topicSubscriber,
+ final MessageSeqId messageSeqId) {
+ PubSubRequest.Builder pubsubRequestBuilder =
+ NetUtils.buildConsumeRequest(sChannelManager.nextTxnId(),
+ topicSubscriber, messageSeqId);
+
+ // For Consume requests, we will send them from the client in a fire and
+ // forget manner. We are not expecting the server to send back an ack
+ // response so no need to register this in the ResponseHandler. There
+ // are no callbacks to invoke since this isn't a client initiated
+ // action. Instead, just have a future listener that will log an error
+ // message if there was a problem writing the consume request.
+ if (logger.isDebugEnabled()) {
+ logger.debug("Writing a Consume request to host: {} with messageSeqId: {} for {}",
+ va(NetUtils.getHostFromChannel(subscribeChannel),
+ messageSeqId, topicSubscriber));
+ }
+ ChannelFuture future = subscribeChannel.write(pubsubRequestBuilder.build());
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.error("Error writing a Consume request to host: {} with messageSeqId: {} for {}",
+ va(NetUtils.getHostFromChannel(subscribeChannel),
+ messageSeqId, topicSubscriber));
+ }
+ }
+ });
+ }
+
+ @Override
+ public void onChannelDisconnected(InetSocketAddress host,
+ Channel channel) {
+ sChannelManager.clearHostForTopic(origTopicSubscriber.getTopic(), host);
+ // clear subscription status
+ sChannelManager.asyncCloseSubscription(origTopicSubscriber, new Callback<ResponseBody>() {
+
+ @Override
+ public void operationFinished(Object ctx, ResponseBody result) {
+ finish();
+ }
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ finish();
+ }
+
+ private void finish() {
+ // Since the connection to the server host that was responsible
+ // for the topic died, we are not sure about the state of that
+ // server. Resend the original subscribe request data to the default
+ // server host/VIP. Also clear out all of the servers we've
+ // contacted or attempted to from this request as we are starting a
+ // "fresh" subscribe request.
+ origSubData.clearServersList();
+ // do resubscribe if the subscription enables it
+ if (origSubData.options.getEnableResubscribe()) {
+ // Set a new type of VoidCallback for this async call. We need this
+ // hook so after the subscribe reconnect has completed, delivery for
+ // that topic subscriber should also be restarted (if it was that
+ // case before the channel disconnect).
+ final long retryWaitTime = cfg.getSubscribeReconnectRetryWaitTime();
+ SubscribeReconnectCallback reconnectCb =
+ new SubscribeReconnectCallback(origTopicSubscriber,
+ origSubData,
+ sChannelManager,
+ retryWaitTime);
+ origSubData.setCallback(reconnectCb);
+ origSubData.context = null;
+ // Clear the shouldClaim flag
+ origSubData.shouldClaim = false;
+ logger.debug("Reconnect {}'s subscription channel with origSubData {}",
+ origTopicSubscriber, origSubData);
+ sChannelManager.submitOpToDefaultServer(origSubData);
+ } else {
+ logger.info("Subscription channel for ({}) is disconnected.",
+ origTopicSubscriber);
+ sChannelManager.getSubscriptionEventEmitter().emitSubscriptionEvent(
+ origSubData.topic, origSubData.subscriberId, SubscriptionEvent.TOPIC_MOVED);
+ }
+ }
+ }, null);
+ }
+
+}
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl.simple;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.handlers.AbstractResponseHandler;
+import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
+import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
+import org.apache.hedwig.client.netty.impl.HChannelHandler;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+
+public class SimpleSubscriptionChannelPipelineFactory extends ClientChannelPipelineFactory {
+
+ public SimpleSubscriptionChannelPipelineFactory(ClientConfiguration cfg,
+ SimpleHChannelManager channelManager) {
+ super(cfg, channelManager);
+ }
+
+ @Override
+ protected Map<OperationType, AbstractResponseHandler> createResponseHandlers() {
+ Map<OperationType, AbstractResponseHandler> handlers =
+ new HashMap<OperationType, AbstractResponseHandler>();
+ handlers.put(OperationType.SUBSCRIBE,
+ new SimpleSubscribeResponseHandler(cfg, channelManager));
+ return handlers;
+ }
+
+}
Copied: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SubscribeReconnectCallback.java (from r1390401, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SubscribeReconnectCallback.java?p2=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SubscribeReconnectCallback.java&p1=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java&r1=1390401&r2=1390777&rev=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SubscribeReconnectCallback.java Wed Sep 26 23:52:18 2012
@@ -15,24 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hedwig.client.handlers;
+package org.apache.hedwig.client.netty.impl.simple;
-import java.util.TimerTask;
-
-import org.apache.hedwig.protocol.PubSubProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.netty.HedwigClientImpl;
-import org.apache.hedwig.client.netty.HedwigSubscriber;
import org.apache.hedwig.exceptions.PubSubException;
-
import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
/**
* This class is used when a Subscribe channel gets disconnected and we attempt
@@ -42,72 +37,60 @@ import org.apache.hedwig.util.Callback;
* callback will be the hook for this.
*
*/
-public class SubscribeReconnectCallback implements Callback<PubSubProtocol.ResponseBody> {
+class SubscribeReconnectCallback implements Callback<ResponseBody> {
private static Logger logger = LoggerFactory.getLogger(SubscribeReconnectCallback.class);
// Private member variables
+ private final TopicSubscriber origTopicSubscriber;
private final PubSubData origSubData;
- private final HedwigClientImpl client;
- private final HedwigSubscriber sub;
- private final ClientConfiguration cfg;
+ private final SimpleHChannelManager channelManager;
+ private final long retryWaitTime;
// Constructor
- public SubscribeReconnectCallback(PubSubData origSubData, HedwigClientImpl client) {
+ SubscribeReconnectCallback(TopicSubscriber origTopicSubscriber,
+ PubSubData origSubData,
+ SimpleHChannelManager channelManager,
+ long retryWaitTime) {
+ this.origTopicSubscriber = origTopicSubscriber;
this.origSubData = origSubData;
- this.client = client;
- this.sub = client.getSubscriber();
- this.cfg = client.getConfiguration();
- }
-
- class SubscribeReconnectRetryTask extends TimerTask {
- @Override
- public void run() {
- logger.debug("Retrying subscribe reconnect request for origSubData: {}", origSubData);
- // Clear out all of the servers we've contacted or attempted to from
- // this request.
- origSubData.clearServersList();
- client.doConnect(origSubData, cfg.getDefaultServerHost());
- }
+ this.channelManager = channelManager;
+ this.retryWaitTime = retryWaitTime;
}
- public void operationFinished(Object ctx, PubSubProtocol.ResponseBody resultOfOperation) {
+ @Override
+ public void operationFinished(Object ctx, ResponseBody resultOfOperation) {
logger.debug("Subscribe reconnect succeeded for origSubData: {}", origSubData);
// Now we want to restart delivery for the subscription channel only
// if delivery was started at the time the original subscribe channel
// was disconnected.
try {
- sub.restartDelivery(origSubData.topic, origSubData.subscriberId);
+ channelManager.restartDelivery(origTopicSubscriber);
} catch (ClientNotSubscribedException e) {
// This exception should never be thrown here but just in case,
// log an error and just keep retrying the subscribe request.
- logger.error("Subscribe was successful but error starting delivery for topic: "
- + origSubData.topic.toStringUtf8() + ", subscriberId: "
- + origSubData.subscriberId.toStringUtf8(), e);
+ logger.error("Subscribe was successful but error starting delivery for {} : {}",
+ va(origTopicSubscriber, e.getMessage()));
retrySubscribeRequest();
} catch (AlreadyStartDeliveryException asde) {
// should not reach here
}
}
+ @Override
public void operationFailed(Object ctx, PubSubException exception) {
// If the subscribe reconnect fails, just keep retrying the subscribe
// request. There isn't a way to flag to the application layer that
// a topic subscription has failed. So instead, we'll just keep
// retrying in the background until success.
- logger.error("Subscribe reconnect failed with error: " + exception.getMessage());
+ logger.error("Subscribe reconnect failed with error: ", exception);
retrySubscribeRequest();
}
private void retrySubscribeRequest() {
- // If the client has stopped, there is no need to proceed with any
- // callback logic here.
- if (client.hasStopped())
- return;
-
- // Retry the subscribe request but only after waiting for a
- // preconfigured amount of time.
- client.getClientTimer().schedule(new SubscribeReconnectRetryTask(),
- client.getConfiguration().getSubscribeReconnectRetryWaitTime());
+ origSubData.clearServersList();
+ logger.debug("Reconnect subscription channel for {} in {} ms later.",
+ va(origTopicSubscriber, retryWaitTime));
+ channelManager.submitOpAfterDelay(origSubData, retryWaitTime);
}
}
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/package-info.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/package-info.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/package-info.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/package-info.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * A Netty based Hedwig client implementation.
+ *
+ * <h3>Components</h3>
+ *
+ * The netty based implementation contains following components:
+ * <ul>
+ * <li>{@link HChannel}: A interface wrapper of netty {@link org.jboss.netty.channel.Channel}
+ * to submit hedwig's {@link org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest}s
+ * to target host.</li>
+ * <li>{@link HChanneHandler}: A wrapper of netty {@link org.jboss.netty.channel.ChannelHandler}
+ * to handle events of its underlying netty channel, such as responses received, channel
+ * disconnected, etc. A {@link HChannelHandler} is bound with a {@link HChannel}.</li>
+ * <li>{@link HChannelManager}: A manager manages all established {@link HChannel}s.
+ * It provides a clean interface for publisher/subscriber to send
+ * {@link org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest}s</li>
+ * </ul>
+ *
+ * <h3>Main Flow</h3>
+ *
+ * <ul>
+ * <li>{@link HedwigPublisher}/{@link HedwigSubscriber} delegates {@link HChannelManager}
+ * to submit pub/sub requests.</li>
+ * <li>{@link HChannelManager} find the owner hubs, establish a {@link HChannel} to hub servers
+ * and send the requests to them.</li>
+ * <li>{@link HChannelHandler} dispatches responses to target
+ * {@link org.apache.hedwig.client.handlers.AbstractResponseHandler} to process.</li>
+ * <li>{@link HChannelHandler} detects an underlying netty {@link org.jboss.netty.channel.Channel}
+ * disconnected. It calles {@link HChannelManager} to clear cached {@link HChannel} that
+ * it bound with. For non-subscritpion channels, it would fail all pending requests;
+ * For subscription channels, it would fail all pending requests and retry to reconnect
+ * those successful subscriptions.</li>
+ * </ul>
+ *
+ * <h3>HChannel</h3>
+ *
+ * Two kinds of {@link HChannel}s provided in current implementation. {@link HChannelImpl}
+ * provides the ability to multiplex pub/sub requests in an underlying netty
+ * {@link org.jboss.netty.channel.Channel}, while {@link DefaultServerChannel} provides the
+ * ability to establish a netty channel {@link org.jboss.netty.channel.Channel} for a pub/sub
+ * request. After the underlying netty channel is estabilished, it would be converted into
+ * a {@link HChannelImpl} by {@link HChannelManager#submitOpThruChannel(pubSubData, channel)}.
+ *
+ * Although {@link HChannelImpl} provides multiplexing ability, it still could be used for
+ * one-channel-per-subscription case, which just sent only one subscribe request thru the
+ * underlying channel.
+ *
+ * <h3>HChannelHandler</h3>
+ *
+ * {@link HChannelHandler} is generic netty {@link org.jboss.netty.channel.ChannelHandler},
+ * which handles events from the underlying channel. A <i>HChannelHandler</i> is bound with
+ * a {@link HChannel} as channel pipeplien when the underlying channel is established. It
+ * takes the responsibility of dispatching response to target response handler. For a
+ * non-subscription channel, it just handles <b>PUBLISH</b> and <b>UNSUBSCRIBE</b> responses.
+ * For a subscription channel, it handles <b>SUBSCRIBE</b> response. For consume requests,
+ * we treated them in a fire-and-forget way, so they are not need to be handled by any response
+ * handler.
+ *
+ * <h3>HChannelManager</h3>
+ *
+ * {@link HChannelManager} manages all outstanding connections to target hub servers for a client.
+ * Since a subscription channel acts quite different from a non-subscription channel, the basic
+ * implementation {@link AbstractHChannelManager} manages non-subscription channels and
+ * subscription channels in different channel sets. Currently hedwig client provides
+ * {@link SimpleHChannelManager} which manages subscription channels in one-channel-per-subscription
+ * way. In future, if we want to multiplex multiple subscriptions in one channel, we just need
+ * to provide an multiplexing version of {@link AbstractHChannelManager} which manages channels
+ * in multiplexing way, and a multiplexing version of {@link org.apache.hedwig.client.handlers.SubscribeResponseHandler}
+ * which handles multiple subscriptions in one channel.
+ */
+package org.apache.hedwig.client.netty;
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/VarArgs.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/VarArgs.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/VarArgs.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/VarArgs.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.util;
+
+public class VarArgs {
+
+ public static Object[] va(Object...args) {
+ return args;
+ }
+
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java Wed Sep 26 23:52:18 2012
@@ -348,6 +348,33 @@ public class TestPubSubClient extends Pu
}
@Test
+ public void testStartDeliveryAfterCloseSub() throws Exception {
+ ByteString topic = ByteString.copyFromUtf8("testStartDeliveryAfterCloseSub");
+ ByteString subid = ByteString.copyFromUtf8("mysubid");
+ subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+
+ // Start delivery for the subscriber
+ subscriber.startDelivery(topic, subid, new TestMessageHandler());
+
+ // Now publish some messages for the topic to be consumed by the
+ // subscriber.
+ publisher.publish(topic, Message.newBuilder()
+ .setBody(ByteString.copyFromUtf8("Message #1")).build());
+ assertTrue(consumeQueue.take());
+
+ // Close subscriber for the subscriber
+ subscriber.closeSubscription(topic, subid);
+
+ // subscribe again
+ subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ subscriber.startDelivery(topic, subid, new TestMessageHandler());
+
+ publisher.publish(topic, Message.newBuilder()
+ .setBody(ByteString.copyFromUtf8("Message #2")).build());
+ assertTrue(consumeQueue.take());
+ }
+
+ @Test
public void testSubscribeAndConsume() throws Exception {
ByteString topic = ByteString.copyFromUtf8("myConsumeTopic");
ByteString subscriberId = ByteString.copyFromUtf8("1");