You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/27 01:52:20 UTC
svn commit: r1390777 [1/4] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/java/org/apache/hedwig/client/data/
hedwig-client/src/main/java/org/apache/hedwig/client/handlers/
hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig...
Author: ivank
Date: Wed Sep 26 23:52:18 2012
New Revision: 1390777
URL: http://svn.apache.org/viewvc?rev=1390777&view=rev
Log:
BOOKKEEPER-364: re-factor hedwig java client to support both one-subscription-per-channel and multiplex-subscriptions-per-channel. (sijie via ivank)
Added:
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java
- copied, changed from r1390401, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java
- copied, changed from r1390401, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SubscribeReconnectCallback.java
- copied, changed from r1390401, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/package-info.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/VarArgs.java
Removed:
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Sep 26 23:52:18 2012
@@ -174,6 +174,8 @@ Trunk (unreleased changes)
BOOKKEEPER-335: client-side message filter for cpp client. (sijie via ivank)
+ BOOKKEEPER-364: re-factor hedwig java client to support both one-subscription-per-channel and multiplex-subscriptions-per-channel. (sijie via ivank)
+
Release 4.1.0 - 2012-06-07
Non-backward compatible changes:
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java Wed Sep 26 23:52:18 2012
@@ -17,7 +17,7 @@
*/
package org.apache.hedwig.client.data;
-import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
/**
@@ -31,28 +31,26 @@ import org.apache.hedwig.protocol.PubSub
public class MessageConsumeData {
// Member variables
- public final ByteString topic;
- public final ByteString subscriberId;
+ public final TopicSubscriber topicSubscriber;
// This is the Message sent from the server for Subscribes for consumption
// by the client.
public final Message msg;
// Constructor
- public MessageConsumeData(final ByteString topic, final ByteString subscriberId, final Message msg) {
- this.topic = topic;
- this.subscriberId = subscriberId;
+ public MessageConsumeData(final TopicSubscriber topicSubscriber, final Message msg) {
+ this.topicSubscriber = topicSubscriber;
this.msg = msg;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- if (topic != null)
- sb.append("Topic: " + topic.toStringUtf8());
- if (subscriberId != null)
- sb.append(PubSubData.COMMA).append("SubscriberId: " + subscriberId.toStringUtf8());
- if (msg != null)
- sb.append(PubSubData.COMMA).append("Message: " + msg);
+ if (topicSubscriber != null) {
+ sb.append("Subscription: ").append(topicSubscriber);
+ }
+ if (msg != null) {
+ sb.append(PubSubData.COMMA).append("Message: ").append(msg);
+ }
return sb.toString();
}
}
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.handlers;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+
+import com.google.protobuf.ByteString;
+
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.netty.HChannelManager;
+import org.apache.hedwig.client.exceptions.ServerRedirectLoopException;
+import org.apache.hedwig.client.exceptions.TooManyServerRedirectsException;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.util.HedwigSocketAddress;
+import static org.apache.hedwig.util.VarArgs.va;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractResponseHandler {
+
+ private static Logger logger = LoggerFactory.getLogger(AbstractResponseHandler.class);
+
+ protected final ClientConfiguration cfg;
+ protected final HChannelManager channelManager;
+
+ protected AbstractResponseHandler(ClientConfiguration cfg,
+ HChannelManager channelManager) {
+ this.cfg = cfg;
+ this.channelManager = channelManager;
+ }
+
+ /**
+ * Logic to handle received response.
+ *
+ * @param response
+ * PubSubResponse received from hub server.
+ * @param pubSubData
+ * PubSubData for the pub/sub request.
+ * @param channel
+ * Channel we used to make the request.
+ */
+ public abstract void handleResponse(PubSubResponse response, PubSubData pubSubData,
+ Channel channel) throws Exception;
+
+ /**
+ * Logic to repost a PubSubRequest when the server responds with a redirect
+ * indicating they are not the topic master.
+ *
+ * @param response
+ * PubSubResponse from the server for the redirect
+ * @param pubSubData
+ * PubSubData for the original PubSubRequest made
+ * @param channel
+ * Channel Channel we used to make the original PubSubRequest
+ * @throws Exception
+ * Throws an exception if there was an error in doing the
+ * redirect repost of the PubSubRequest
+ */
+ protected void handleRedirectResponse(PubSubResponse response, PubSubData pubSubData,
+ Channel channel)
+ throws Exception {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Handling a redirect from host: {}, response: {}, pubSubData: {}",
+ va(NetUtils.getHostFromChannel(channel), response, pubSubData));
+ }
+ // In this case, the PubSub request was done to a server that is not
+ // responsible for the topic. First make sure that we haven't
+ // exceeded the maximum number of server redirects.
+ int curNumServerRedirects = (pubSubData.triedServers == null) ? 0 : pubSubData.triedServers.size();
+ if (curNumServerRedirects >= cfg.getMaximumServerRedirects()) {
+ // We've already exceeded the maximum number of server redirects
+ // so consider this as an error condition for the client.
+ // Invoke the operationFailed callback and just return.
+ logger.debug("Exceeded the number of server redirects ({}) so error out.",
+ curNumServerRedirects);
+ PubSubException exception = new ServiceDownException(
+ new TooManyServerRedirectsException("Already reached max number of redirects: "
+ + curNumServerRedirects));
+ pubSubData.getCallback().operationFailed(pubSubData.context, exception);
+ return;
+ }
+
+ // We will redirect and try to connect to the correct server
+ // stored in the StatusMsg of the response. First store the
+ // server that we sent the PubSub request to for the topic.
+ ByteString triedServer = ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(
+ NetUtils.getHostFromChannel(channel)));
+ if (pubSubData.triedServers == null) {
+ pubSubData.triedServers = new LinkedList<ByteString>();
+ }
+ pubSubData.shouldClaim = true;
+ pubSubData.triedServers.add(triedServer);
+
+ // Now get the redirected server host (expected format is
+ // Hostname:Port:SSLPort) from the server's response message. If one is
+ // not given for some reason, then redirect to the default server
+ // host/VIP to repost the request.
+ String statusMsg = response.getStatusMsg();
+ InetSocketAddress redirectedHost;
+ boolean redirectToDefaultServer;
+ if (statusMsg != null && statusMsg.length() > 0) {
+ if (cfg.isSSLEnabled()) {
+ redirectedHost = new HedwigSocketAddress(statusMsg).getSSLSocketAddress();
+ } else {
+ redirectedHost = new HedwigSocketAddress(statusMsg).getSocketAddress();
+ }
+ redirectToDefaultServer = false;
+ } else {
+ redirectedHost = cfg.getDefaultServerHost();
+ redirectToDefaultServer = true;
+ }
+
+ // Make sure the redirected server is not one we've already attempted
+ // already before in this PubSub request.
+ if (pubSubData.triedServers.contains(ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(redirectedHost)))) {
+ logger.error("We've already sent this PubSubRequest before to redirectedHost: {}, pubSubData: {}",
+ va(redirectedHost, pubSubData));
+ PubSubException exception = new ServiceDownException(
+ new ServerRedirectLoopException("Already made the request before to redirected host: "
+ + redirectedHost));
+ pubSubData.getCallback().operationFailed(pubSubData.context, exception);
+ return;
+ }
+
+ // submit the pub/sub request to redirected host
+ if (redirectToDefaultServer) {
+ channelManager.submitOpToDefaultServer(pubSubData);
+ } else {
+ channelManager.redirectToHost(pubSubData, redirectedHost);
+ }
+ }
+
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java Wed Sep 26 23:52:18 2012
@@ -19,17 +19,16 @@ package org.apache.hedwig.client.handler
import java.util.TimerTask;
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
-import org.apache.hedwig.client.netty.ResponseHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
+import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.MessageConsumeData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.netty.HedwigClientImpl;
+import org.apache.hedwig.client.netty.HChannelManager;
import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protoextensions.MessageIdUtils;
import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
/**
* This is the Callback used by the MessageHandlers on the client app when
@@ -37,83 +36,82 @@ import org.apache.hedwig.util.Callback;
* asynchronously. This callback back to the client libs will be stateless so we
* can use a singleton for the class. The object context used should be the
* MessageConsumeData type. That will contain all of the information needed to
- * call the message consume logic in the client lib ResponseHandler.
+ * call the message consume logic in the client lib HChannelHandler.
*
*/
public class MessageConsumeCallback implements Callback<Void> {
private static Logger logger = LoggerFactory.getLogger(MessageConsumeCallback.class);
- private final HedwigClientImpl client;
+ private final HChannelManager channelManager;
+ private final long consumeRetryWaitTime;
- public MessageConsumeCallback(HedwigClientImpl client) {
- this.client = client;
+ public MessageConsumeCallback(ClientConfiguration cfg,
+ HChannelManager channelManager) {
+ this.channelManager = channelManager;
+ this.consumeRetryWaitTime =
+ cfg.getMessageConsumeRetryWaitTime();
}
class MessageConsumeRetryTask extends TimerTask {
private final MessageConsumeData messageConsumeData;
- private final TopicSubscriber topicSubscriber;
- public MessageConsumeRetryTask(MessageConsumeData messageConsumeData, TopicSubscriber topicSubscriber) {
+ public MessageConsumeRetryTask(MessageConsumeData messageConsumeData) {
this.messageConsumeData = messageConsumeData;
- this.topicSubscriber = topicSubscriber;
}
@Override
public void run() {
// Try to consume the message again
- Channel topicSubscriberChannel = client.getSubscriber().getChannelForTopic(topicSubscriber);
- ResponseHandler handler = null;
- try {
- handler = HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel);
- } catch (NoResponseHandlerException e) {
- logger.debug("No response handler found while invoking asyncMessageConsumed in the Message"
- + " consume retry task.", e);
- // Explicitly close the channel
- if (null != topicSubscriberChannel) {
- topicSubscriberChannel.close();
- }
+ SubscribeResponseHandler subscribeHChannelHandler =
+ channelManager.getSubscribeResponseHandler(messageConsumeData.topicSubscriber);
+ if (null == subscribeHChannelHandler ||
+ !subscribeHChannelHandler.hasSubscription(messageConsumeData.topicSubscriber)) {
+ logger.warn("No subscription {} found to retry delivering message {}.",
+ va(messageConsumeData.topicSubscriber,
+ MessageIdUtils.msgIdToReadableString(messageConsumeData.msg.getMsgId())));
return;
}
- handler.getSubscribeResponseHandler().asyncMessageConsume(messageConsumeData.msg);
+
+ subscribeHChannelHandler.asyncMessageDeliver(messageConsumeData.topicSubscriber,
+ messageConsumeData.msg);
}
}
public void operationFinished(Object ctx, Void resultOfOperation) {
MessageConsumeData messageConsumeData = (MessageConsumeData) ctx;
- TopicSubscriber topicSubscriber = new TopicSubscriber(messageConsumeData.topic, messageConsumeData.subscriberId);
- // Message has been successfully consumed by the client app so callback
- // to the ResponseHandler indicating that the message is consumed.
- Channel topicSubscriberChannel = client.getSubscriber().getChannelForTopic(topicSubscriber);
- ResponseHandler handler = null;
- try {
- handler = HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel);
- } catch (NoResponseHandlerException e) {
- logger.debug("No response handler found while invoking messageConsumed.", e);
- // Explicitly close the channel
- if (null != topicSubscriberChannel) {
- topicSubscriberChannel.close();
- }
+
+ SubscribeResponseHandler subscribeHChannelHandler =
+ channelManager.getSubscribeResponseHandler(messageConsumeData.topicSubscriber);
+ if (null == subscribeHChannelHandler ||
+ !subscribeHChannelHandler.hasSubscription(messageConsumeData.topicSubscriber)) {
+ logger.warn("No subscription {} found to consume message {}.",
+ va(messageConsumeData.topicSubscriber,
+ MessageIdUtils.msgIdToReadableString(messageConsumeData.msg.getMsgId())));
return;
}
- handler.getSubscribeResponseHandler().messageConsumed(messageConsumeData.msg);
+
+ // Message has been successfully consumed by the client app so callback
+ // to the HChannelHandler indicating that the message is consumed.
+ subscribeHChannelHandler.messageConsumed(messageConsumeData.topicSubscriber,
+ messageConsumeData.msg);
}
public void operationFailed(Object ctx, PubSubException exception) {
// Message has NOT been successfully consumed by the client app so
- // callback to the ResponseHandler to try the async MessageHandler
+ // callback to the HChannelHandler to try the async MessageHandler
// Consume logic again.
MessageConsumeData messageConsumeData = (MessageConsumeData) ctx;
- TopicSubscriber topicSubscriber = new TopicSubscriber(messageConsumeData.topic, messageConsumeData.subscriberId);
- logger.error("Message was not consumed successfully by client MessageHandler: " + messageConsumeData);
+ logger.error("Message was not consumed successfully by client MessageHandler: {}",
+ messageConsumeData);
// Sleep a pre-configured amount of time (in milliseconds) before we
// do the retry. In the future, we can have more dynamic logic on
// what duration to sleep based on how many times we've retried, or
// perhaps what the last amount of time we slept was. We could stick
// some of this meta-data into the MessageConsumeData when we retry.
- client.getClientTimer().schedule(new MessageConsumeRetryTask(messageConsumeData, topicSubscriber),
- client.getConfiguration().getMessageConsumeRetryWaitTime());
+ channelManager.schedule(new MessageConsumeRetryTask(messageConsumeData),
+ consumeRetryWaitTime);
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java Wed Sep 26 23:52:18 2012
@@ -17,32 +17,29 @@
*/
package org.apache.hedwig.client.handlers;
-import org.apache.hedwig.protocol.PubSubProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.jboss.netty.channel.Channel;
+import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.netty.HedwigClientImpl;
-import org.apache.hedwig.client.netty.ResponseHandler;
+import org.apache.hedwig.client.netty.HChannelManager;
import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-public class PublishResponseHandler {
+public class PublishResponseHandler extends AbstractResponseHandler {
private static Logger logger = LoggerFactory.getLogger(PublishResponseHandler.class);
- private final ResponseHandler responseHandler;
-
- public PublishResponseHandler(ResponseHandler responseHandler) {
- this.responseHandler = responseHandler;
+ public PublishResponseHandler(ClientConfiguration cfg,
+ HChannelManager channelManager) {
+ super(cfg, channelManager);
}
- // Main method to handle Publish Response messages from the server.
- public void handlePublishResponse(PubSubResponse response, PubSubData pubSubData, Channel channel) throws Exception {
- if (logger.isDebugEnabled())
- logger.debug("Handling a Publish response: " + response + ", pubSubData: " + pubSubData + ", host: "
- + HedwigClientImpl.getHostFromChannel(channel));
+ @Override
+ public void handleResponse(PubSubResponse response, PubSubData pubSubData,
+ Channel channel) throws Exception {
switch (response.getStatusCode()) {
case SUCCESS:
// Response was success so invoke the callback's operationFinished
@@ -59,7 +56,7 @@ public class PublishResponseHandler {
case NOT_RESPONSIBLE_FOR_TOPIC:
// Redirect response so we'll need to repost the original Publish
// Request
- responseHandler.handleRedirectResponse(response, pubSubData, channel);
+ handleRedirectResponse(response, pubSubData, channel);
break;
default:
// Consider all other status codes as errors, operation failed
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Wed Sep 26 23:52:18 2012
@@ -17,244 +17,59 @@
*/
package org.apache.hedwig.client.handlers;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
+import java.net.InetSocketAddress;
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.jboss.netty.channel.Channel;
import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.data.MessageConsumeData;
-import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.netty.HedwigClientImpl;
-import org.apache.hedwig.client.netty.ResponseHandler;
-import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
+import org.apache.hedwig.client.netty.HChannelManager;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.util.Callback;
-public class SubscribeResponseHandler {
-
- private static Logger logger = LoggerFactory.getLogger(SubscribeResponseHandler.class);
-
- private final ResponseHandler responseHandler;
-
- // Member variables used when this ResponseHandler is for a Subscribe
- // channel. We need to be able to consume messages sent back to us from
- // the server, and to also recreate the Channel connection if it ever goes
- // down. For that, we need to store the original PubSubData for the
- // subscribe request, and also the MessageHandler that was registered when
- // delivery of messages started for the subscription.
- private PubSubData origSubData;
- private Channel subscribeChannel;
- private MessageHandler messageHandler;
- // Counter for the number of consumed messages so far to buffer up before we
- // send the Consume message back to the server along with the last/largest
- // message seq ID seen so far in that batch.
- private int numConsumedMessagesInBuffer = 0;
- private MessageSeqId lastMessageSeqId;
- // Queue used for subscribes when the MessageHandler hasn't been registered
- // yet but we've already received subscription messages from the server.
- // This will be lazily created as needed.
- private Queue<Message> subscribeMsgQueue;
- // Set to store all of the outstanding subscribed messages that are pending
- // to be consumed by the client app's MessageHandler. If this ever grows too
- // big (e.g. problem at the client end for message consumption), we can
- // throttle things by temporarily setting the Subscribe Netty Channel
- // to not be readable. When the Set has shrunk sufficiently, we can turn the
- // channel back on to read new messages.
- private Set<Message> outstandingMsgSet;
-
- public SubscribeResponseHandler(ResponseHandler responseHandler) {
- this.responseHandler = responseHandler;
- }
-
- // Public getter to retrieve the original PubSubData used for the Subscribe
- // request.
- synchronized public PubSubData getOrigSubData() {
- return origSubData;
- }
+/**
+ * An interface provided to manage all subscriptions on a channel.
+ *
+ * Its responsibility is to handle all subscribe responses received on that channel,
+ * clear up subscriptions and retry reconnectin subscriptions when channel disconnected,
+ * and handle delivering messages to {@link MessageHandler} and sent consume messages
+ * back to hub servers.
+ */
+public abstract class SubscribeResponseHandler extends AbstractResponseHandler {
- // Main method to handle Subscribe responses from the server that we sent
- // a Subscribe Request to.
- public void handleSubscribeResponse(PubSubResponse response, PubSubData pubSubData, Channel channel)
- throws Exception {
- // If this was not a successful response to the Subscribe request, we
- // won't be using the Netty Channel created so just close it.
- if (!response.getStatusCode().equals(StatusCode.SUCCESS)) {
- try {
- HedwigClientImpl.getResponseHandlerFromChannel(channel).handleChannelClosedExplicitly();
- } catch (NoResponseHandlerException e) {
- // Log an error. But should we also return and not process anything further?
- logger.error("No response handler found while trying to close channel explicitly while handling a " +
- "failed subscription response.", e);
- // Continue closing the channel because this is an unexpected event and state should be reset.
- }
- channel.close();
- }
-
- if (logger.isDebugEnabled())
- logger.debug("Handling a Subscribe response: " + response + ", pubSubData: " + pubSubData + ", host: "
- + HedwigClientImpl.getHostFromChannel(channel));
- switch (response.getStatusCode()) {
- case SUCCESS:
- synchronized(this) {
- // For successful Subscribe requests, store this Channel locally
- // and set it to not be readable initially.
- // This way we won't be delivering messages for this topic
- // subscription until the client explicitly says so.
- subscribeChannel = channel;
- subscribeChannel.setReadable(false);
- // Store the original PubSubData used to create this successful
- // Subscribe request.
- origSubData = pubSubData;
-
- SubscriptionPreferences preferences = null;
- if (response.hasResponseBody()) {
- ResponseBody respBody = response.getResponseBody();
- if (respBody.hasSubscribeResponse()) {
- SubscribeResponse resp = respBody.getSubscribeResponse();
- if (resp.hasPreferences()) {
- preferences = resp.getPreferences();
- if (logger.isDebugEnabled()) {
- logger.debug("Receive subscription preferences for (topic:" + pubSubData.topic.toStringUtf8()
- + ", subscriber:" + pubSubData.subscriberId.toStringUtf8() + ") :"
- + SubscriptionStateUtils.toString(preferences));
- }
- }
- }
- }
-
- // Store the mapping for the TopicSubscriber to the Channel.
- // This is so we can control the starting and stopping of
- // message deliveries from the server on that Channel. Store
- // this only on a successful ack response from the server.
- TopicSubscriber topicSubscriber = new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId);
- responseHandler.getSubscriber().setChannelAndPreferencesForTopic(topicSubscriber, channel, preferences);
- // Lazily create the Set (from a concurrent hashmap) to keep track
- // of outstanding Messages to be consumed by the client app. At this
- // stage, delivery for that topic hasn't started yet so creation of
- // this Set should be thread safe. We'll create the Set with an initial
- // capacity equal to the configured parameter for the maximum number of
- // outstanding messages to allow. The load factor will be set to
- // 1.0f which means we'll only rehash and allocate more space if
- // we ever exceed the initial capacity. That should be okay
- // because when that happens, things are slow already and piling
- // up on the client app side to consume messages.
- outstandingMsgSet = Collections.newSetFromMap(
- new ConcurrentHashMap<Message,Boolean>(
- responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f));
- }
- // Response was success so invoke the callback's operationFinished
- // method.
- pubSubData.getCallback().operationFinished(pubSubData.context, null);
- break;
- case CLIENT_ALREADY_SUBSCRIBED:
- // For Subscribe requests, the server says that the client is
- // already subscribed to it.
- pubSubData.getCallback().operationFailed(pubSubData.context, new ClientAlreadySubscribedException(
- "Client is already subscribed for topic: " +
- pubSubData.topic.toStringUtf8() + ", subscriberId: " +
- pubSubData.subscriberId.toStringUtf8()));
- break;
- case SERVICE_DOWN:
- // Response was service down failure so just invoke the callback's
- // operationFailed method.
- pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
- "Server responded with a SERVICE_DOWN status"));
- break;
- case NOT_RESPONSIBLE_FOR_TOPIC:
- // Redirect response so we'll need to repost the original Subscribe
- // Request
- responseHandler.handleRedirectResponse(response, pubSubData, channel);
- break;
- default:
- // Consider all other status codes as errors, operation failed
- // cases.
- logger.error("Unexpected error response from server for PubSubResponse: " + response);
- pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
- "Server responded with a status code of: " +
- response.getStatusCode()));
- break;
- }
+ protected SubscribeResponseHandler(ClientConfiguration cfg,
+ HChannelManager channelManager) {
+ super(cfg, channelManager);
}
- // Main method to handle consuming a message for a topic that the client is
- // subscribed to.
- public void handleSubscribeMessage(PubSubResponse response) {
- if (logger.isDebugEnabled()) {
- logger.debug("Handling a Subscribe message in response: {}, topic: {}, subscriberId: {}",
- new Object[] { response, getOrigSubData().topic.toStringUtf8(),
- getOrigSubData().subscriberId.toStringUtf8() });
- }
- Message message = response.getMessage();
-
- synchronized (this) {
- // Consume the message asynchronously that the client is subscribed
- // to. Do this only if delivery for the subscription has started and
- // a MessageHandler has been registered for the TopicSubscriber.
- if (messageHandler != null) {
- asyncMessageConsume(message);
- } else {
- // MessageHandler has not yet been registered so queue up these
- // messages for the Topic Subscription. Make the initial lazy
- // creation of the message queue thread safe just so we don't
- // run into a race condition where two simultaneous threads process
- // a received message and both try to create a new instance of
- // the message queue. Performance overhead should be okay
- // because the delivery of the topic has not even started yet
- // so these messages are not consumed and just buffered up here.
- if (subscribeMsgQueue == null)
- subscribeMsgQueue = new LinkedList<Message>();
- logger.debug("Message has arrived but Subscribe channel does not have a registered "
- + "MessageHandler yet so queueing up the message: {}", message);
- subscribeMsgQueue.add(message);
- }
- }
- }
+ /**
+ * Handle Message delivered by the server.
+ *
+ * @param response
+ * Message received from the server.
+ */
+ public abstract void handleSubscribeMessage(PubSubResponse response);
/**
* Method called when a message arrives for a subscribe Channel and we want
- * to consume it asynchronously via the registered MessageHandler (should
+ * to deliver it asynchronously via the registered MessageHandler (should
* not be null when called here).
*
* @param message
* Message from Subscribe Channel we want to consume.
*/
- protected void asyncMessageConsume(Message message) {
- if (logger.isDebugEnabled())
- logger.debug("Call the client app's MessageHandler asynchronously to consume the message: " + message
- + ", topic: " + origSubData.topic.toStringUtf8() + ", subscriberId: "
- + origSubData.subscriberId.toStringUtf8());
- // Add this "pending to be consumed" message to the outstandingMsgSet.
- outstandingMsgSet.add(message);
- // Check if we've exceeded the max size for the outstanding message set.
- if (outstandingMsgSet.size() >= responseHandler.getConfiguration().getMaximumOutstandingMessages()
- && subscribeChannel.isReadable()) {
- // Too many outstanding messages so throttle it by setting the Netty
- // Channel to not be readable.
- if (logger.isDebugEnabled())
- logger.debug("Too many outstanding messages (" + outstandingMsgSet.size()
- + ") so throttling the subscribe netty Channel");
- subscribeChannel.setReadable(false);
- }
- MessageConsumeData messageConsumeData = new MessageConsumeData(origSubData.topic, origSubData.subscriberId,
- message);
- messageHandler.deliver(origSubData.topic, origSubData.subscriberId, message, responseHandler.getClient()
- .getConsumeCallback(), messageConsumeData);
- }
+ protected abstract void asyncMessageDeliver(TopicSubscriber topicSubscriber,
+ Message message);
/**
* Method called when the client app's MessageHandler has asynchronously
@@ -267,100 +82,85 @@ public class SubscribeResponseHandler {
* could be consumed by the client app and then called back to here, make
* this method synchronized.
*
+ * @param topicSubscriber
+ * Topic Subscriber
* @param message
* Message sent from server for topic subscription that has been
* consumed by the client.
*/
- protected synchronized void messageConsumed(Message message) {
- if (logger.isDebugEnabled())
- logger.debug("Message has been successfully consumed by the client app for message: " + message
- + ", topic: " + origSubData.topic.toStringUtf8() + ", subscriberId: "
- + origSubData.subscriberId.toStringUtf8());
- // Update the consumed messages buffer variables
- if (responseHandler.getConfiguration().isAutoSendConsumeMessageEnabled()) {
- // Update these variables only if we are auto-sending consume
- // messages to the server. Otherwise the onus is on the client app
- // to call the Subscriber consume API to let the server know which
- // messages it has successfully consumed.
- numConsumedMessagesInBuffer++;
- lastMessageSeqId = message.getMsgId();
- }
- // Remove this consumed message from the outstanding Message Set.
- outstandingMsgSet.remove(message);
-
- // For consume response to server, there is a config param on how many
- // messages to consume and buffer up before sending the consume request.
- // We just need to keep a count of the number of messages consumed
- // and the largest/latest msg ID seen so far in this batch. Messages
- // should be delivered in order and without gaps. Do this only if
- // auto-sending of consume messages is enabled.
- if (responseHandler.getConfiguration().isAutoSendConsumeMessageEnabled()
- && numConsumedMessagesInBuffer >= responseHandler.getConfiguration().getConsumedMessagesBufferSize()) {
- // Send the consume request and reset the consumed messages buffer
- // variables. We will use the same Channel created from the
- // subscribe request for the TopicSubscriber.
- logger.debug("Consumed message buffer limit reached so send the Consume Request to the "
- + "server with lastMessageSeqId: {}", lastMessageSeqId);
- responseHandler.getSubscriber().doConsume(origSubData, subscribeChannel, lastMessageSeqId);
- numConsumedMessagesInBuffer = 0;
- lastMessageSeqId = null;
- }
-
- // Check if we throttled message consumption previously when the
- // outstanding message limit was reached. For now, only turn the
- // delivery back on if there are no more outstanding messages to
- // consume. We could make this a configurable parameter if needed.
- if (!subscribeChannel.isReadable() && outstandingMsgSet.isEmpty()) {
- if (logger.isDebugEnabled())
- logger.debug("Message consumption has caught up so okay to turn off throttling of " +
- "messages on the subscribe channel for topic: " + origSubData.topic.toStringUtf8()
- + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
- subscribeChannel.setReadable(true);
- }
- }
+ protected abstract void messageConsumed(TopicSubscriber topicSubscriber,
+ Message message);
/**
- * Setter used for Subscribe flows when delivery for the subscription is
- * started. This is used to register the MessageHandler needed to consumer
- * the subscribed messages for the topic.
+ * Start delivering messages for a given topic subscriber.
*
+ * @param topicSubscriber
+ * Topic Subscriber
* @param messageHandler
* MessageHandler to register for this ResponseHandler instance.
+ * @throws ClientNotSubscribedException
+ * If the client is not currently subscribed to the topic
+ * @throws AlreadyStartDeliveryException
+ * If someone started delivery a message handler before stopping existed one.
*/
- public void setMessageHandler(MessageHandler messageHandler) {
- if (logger.isDebugEnabled()) {
- logger.debug("Setting the messageHandler for topic: {}, subscriberId: {}",
- getOrigSubData().topic.toStringUtf8(),
- getOrigSubData().subscriberId.toStringUtf8());
- }
- synchronized (this) {
- this.messageHandler = messageHandler;
- // Once the MessageHandler is registered, see if we have any queued up
- // subscription messages sent to us already from the server. If so,
- // consume those first. Do this only if the MessageHandler registered is
- // not null (since that would be the HedwigSubscriber.stopDelivery
- // call).
- if (messageHandler != null && subscribeMsgQueue != null && subscribeMsgQueue.size() > 0) {
- if (logger.isDebugEnabled())
- logger.debug("Consuming " + subscribeMsgQueue.size() + " queued up messages for topic: "
- + origSubData.topic.toStringUtf8() + ", subscriberId: "
- + origSubData.subscriberId.toStringUtf8());
- for (Message message : subscribeMsgQueue) {
- asyncMessageConsume(message);
- }
- // Now we can remove the queued up messages since they are all
- // consumed.
- subscribeMsgQueue.clear();
- }
- }
- }
+ public abstract void startDelivery(TopicSubscriber topicSubscriber,
+ MessageHandler messageHandler)
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException;
/**
- * Getter for the MessageHandler that is set for this subscribe channel.
+ * Stop delivering messages for a given topic subscriber.
*
- * @return The MessageHandler for consuming messages
+ * @param topicSubscriber
+ * Topic Subscriber
+ * @throws ClientNotSubscribedException
+ * If the client is not currently subscribed to the topic
*/
- public MessageHandler getMessageHandler() {
- return messageHandler;
- }
+ public abstract void stopDelivery(TopicSubscriber topicSubscriber)
+ throws ClientNotSubscribedException;
+
+ /**
+ * Whether the given topic subscriber subscribed thru this handler.
+ *
+ * @param topicSubscriber
+ * Topic Subscriber
+ * @return whether the given topic subscriber subscribed thru this handler.
+ */
+ public abstract boolean hasSubscription(TopicSubscriber topicSubscriber);
+
+ /**
+ * Close subscription from this handler.
+ *
+ * @param topicSubscriber
+ * Topic Subscriber
+ * @param callback
+ * Callback when the subscription is closed.
+ * @param context
+ * Callback context.
+ */
+ public abstract void asyncCloseSubscription(TopicSubscriber topicSubscriber,
+ Callback<ResponseBody> callback,
+ Object context);
+
+ /**
+ * Consume a given message for given topic subscriber thru this handler.
+ *
+ * @param topicSubscriber
+ * Topic Subscriber
+ */
+ public abstract void consume(TopicSubscriber topicSubscriber,
+ MessageSeqId messageSeqId);
+
+ /**
+ * This method is called when the underlying channel is disconnected due to server failure.
+ *
+ * The implementation should take the responsibility to clear subscriptions and retry
+ * reconnecting subscriptions to new hub servers.
+ *
+ * @param host
+ * Host that channel connected to has disconnected.
+ * @param channel
+ * Channel connected to.
+ */
+ public abstract void onChannelDisconnected(InetSocketAddress host,
+ Channel channel);
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java Wed Sep 26 23:52:18 2012
@@ -21,36 +21,35 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.jboss.netty.channel.Channel;
+import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.netty.HedwigClientImpl;
-import org.apache.hedwig.client.netty.ResponseHandler;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.netty.HChannelManager;
+import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
-public class UnsubscribeResponseHandler {
+public class UnsubscribeResponseHandler extends AbstractResponseHandler {
private static Logger logger = LoggerFactory.getLogger(UnsubscribeResponseHandler.class);
- private final ResponseHandler responseHandler;
-
- public UnsubscribeResponseHandler(ResponseHandler responseHandler) {
- this.responseHandler = responseHandler;
+ public UnsubscribeResponseHandler(ClientConfiguration cfg,
+ HChannelManager channelManager) {
+ super(cfg, channelManager);
}
- // Main method to handle Unsubscribe Response messages from the server.
- public void handleUnsubscribeResponse(PubSubResponse response, PubSubData pubSubData, Channel channel)
+ @Override
+ public void handleResponse(final PubSubResponse response, final PubSubData pubSubData,
+ final Channel channel)
throws Exception {
- if (logger.isDebugEnabled())
- logger.debug("Handling an Unsubscribe response: " + response + ", pubSubData: " + pubSubData + ", host: "
- + HedwigClientImpl.getHostFromChannel(channel));
switch (response.getStatusCode()) {
case SUCCESS:
- // For successful Unsubscribe requests, we can now safely close the
- // Subscribe Channel and any cached data for that TopicSubscriber.
- responseHandler.getSubscriber().closeSubscription(pubSubData.topic, pubSubData.subscriberId);
- // Response was success so invoke the callback's operationFinished
- // method.
+ // since for unsubscribe request, we close subscription first
+ // for now, we don't need to do anything now.
pubSubData.getCallback().operationFinished(pubSubData.context, null);
break;
case CLIENT_NOT_SUBSCRIBED:
@@ -70,7 +69,7 @@ public class UnsubscribeResponseHandler
case NOT_RESPONSIBLE_FOR_TOPIC:
// Redirect response so we'll need to repost the original
// Unsubscribe Request
- responseHandler.handleRedirectResponse(response, pubSubData, channel);
+ handleRedirectResponse(response, pubSubData, channel);
break;
default:
// Consider all other status codes as errors, operation failed
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CleanupChannelMap<T> {
+
+ private static Logger logger = LoggerFactory.getLogger(CleanupChannelMap.class);
+
+ private final ConcurrentHashMap<T, HChannel> channels;
+
+ // Boolean indicating if the channel map is closed or not.
+ protected boolean closed = false;
+ protected final ReentrantReadWriteLock closedLock =
+ new ReentrantReadWriteLock();
+
+ public CleanupChannelMap() {
+ channels = new ConcurrentHashMap<T, HChannel>();
+ }
+
+ /**
+ * Add channel to the map. If an old channel has been bound
+ * to <code>key</code>, the <code>channel</code> would be
+ * closed immediately and the old channel is returned. Otherwise,
+ * the <code>channel</code> is put in the map for future usage.
+ *
+ * If the channel map has been closed, the channel would be closed
+ * immediately.
+ *
+ * @param key
+ * Key
+ * @param channel
+ * Channel
+ * @return the channel instance to use.
+ */
+ public HChannel addChannel(T key, HChannel channel) {
+ this.closedLock.readLock().lock();
+ try {
+ if (closed) {
+ channel.close();
+ return channel;
+ }
+ HChannel oldChannel = channels.putIfAbsent(key, channel);
+ if (null != oldChannel) {
+ logger.info("Channel for {} already exists, so no need to store it.", key);
+ channel.close();
+ return oldChannel;
+ } else {
+ logger.debug("Storing a new channel for {}.", key);
+ return channel;
+ }
+ } finally {
+ this.closedLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Returns the channel bound with <code>key</code>.
+ *
+ * @param key Key
+ * @return the channel bound with <code>key</code>.
+ */
+ public HChannel getChannel(T key) {
+ return channels.get(key);
+ }
+
+ /**
+ * Remove the channel bound with <code>key</code>.
+ *
+ * @param key Key
+ * @return the channel bound with <code>key</code>, null if no channel
+ * is bound with <code>key</code>.
+ */
+ public HChannel removeChannel(T key) {
+ return channels.remove(key);
+ }
+
+ /**
+ * Remove the channel bound with <code>key</code>.
+ *
+ * @param key Key
+ * @param channel The channel expected to be bound with <code>key</code>.
+ * @return true if the channel is removed, false otherwise.
+ */
+ public boolean removeChannel(T key, HChannel channel) {
+ return channels.remove(key, channel);
+ }
+
+ /**
+ * Return the channels in the map.
+ *
+ * @return the set of channels.
+ */
+ public Collection<HChannel> getChannels() {
+ return channels.values();
+ }
+
+ /**
+ * Close the channels map.
+ */
+ public void close() {
+ closedLock.writeLock().lock();
+ try {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ } finally {
+ closedLock.writeLock().unlock();
+ }
+ logger.debug("Closing channels map.");
+ for (HChannel channel : channels.values()) {
+ channel.close(true);
+ }
+ channels.clear();
+ logger.debug("Closed channels map.");
+ }
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java Wed Sep 26 23:52:18 2012
@@ -27,7 +27,7 @@ import org.apache.hedwig.util.Callback;
/**
* Handlers used by a subscription.
*/
-class FilterableMessageHandler implements MessageHandler {
+public class FilterableMessageHandler implements MessageHandler {
MessageHandler msgHandler;
ClientMessageFilter msgFilter;
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.apache.hedwig.client.data.PubSubData;
+
+/**
+ * A wrapper interface over netty {@link Channel} to submit hedwig's
+ * {@link PubSubData} requests.
+ */
+public interface HChannel {
+
+ /**
+ * Submit a pub/sub request.
+ *
+ * @param op
+ * Pub/Sub Request.
+ */
+ public void submitOp(PubSubData op);
+
+ /**
+ * @return underlying netty channel
+ */
+ public Channel getChannel();
+
+ /**
+ * Close the channel without waiting.
+ */
+ public void close();
+
+ /**
+ * Close the channel
+ *
+ * @param wait
+ * Whether wait until the channel is closed.
+ */
+ public void close(boolean wait);
+}
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.net.InetSocketAddress;
+import java.util.TimerTask;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * A manager manages 1) all channels established to hub servers,
+ * 2) the actions taken by the topic subscribers.
+ */
+public interface HChannelManager {
+
+ /**
+ * Submit a pub/sub request after a given <code>delay</code>.
+ *
+ * @param op
+ * Pub/Sub Request.
+ * @param delay
+ * Delay time in ms.
+ */
+ public void submitOpAfterDelay(PubSubData op, long delay);
+
+ /**
+ * Submit a pub/sub request.
+ *
+ * @param pubSubData
+ * Pub/Sub Request.
+ */
+ public void submitOp(PubSubData pubSubData);
+
+ /**
+ * Submit a pub/sub request to default server.
+ *
+ * @param pubSubData
+ * Pub/Sub request.
+ */
+ public void submitOpToDefaultServer(PubSubData pubSubData);
+
+ /**
+ * Submit a pub/sub request to a given host.
+ *
+ * @param pubSubData
+ * Pub/Sub request.
+ * @param host
+ * Given host address.
+ */
+ public void redirectToHost(PubSubData pubSubData, InetSocketAddress host);
+
+ /**
+ * Generate next transaction id for pub/sub request sending thru this manager.
+ *
+ * @return next transaction id.
+ */
+ public long nextTxnId();
+
+ /**
+ * Schedule a timer task after a given <code>delay</code>.
+ *
+ * @param task
+ * A timer task
+ * @param delay
+ * Delay time in ms.
+ */
+ public void schedule(TimerTask task, long delay);
+
+ /**
+ * Get the subscribe response handler managed the given <code>topicSubscriber</code>.
+ *
+ * @param topicSubscriber
+ * Topic Subscriber
+ * @return subscribe response handler managed it, otherwise return null.
+ */
+ public SubscribeResponseHandler getSubscribeResponseHandler(
+ TopicSubscriber topicSubscriber);
+
+ /**
+ * Start delivering messages for a given topic subscriber.
+ *
+ * @param topicSubscriber
+ * Topic Subscriber
+ * @param messageHandler
+ * MessageHandler to register for this ResponseHandler instance.
+ * @throws ClientNotSubscribedException
+ * If the client is not currently subscribed to the topic
+ * @throws AlreadyStartDeliveryException
+ * If someone started delivery a message handler before stopping existed one.
+ */
+ public void startDelivery(TopicSubscriber topicSubscriber,
+ MessageHandler messageHandler)
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException;
+
+ /**
+ * Stop delivering messages for a given topic subscriber.
+ *
+ * @param topicSubscriber
+ * Topic Subscriber
+ * @throws ClientNotSubscribedException
+ * If the client is not currently subscribed to the topic
+ */
+ public void stopDelivery(TopicSubscriber topicSubscriber)
+ throws ClientNotSubscribedException;
+
+ /**
+ * Close the subscription of the given <code>topicSubscriber</code>.
+ *
+ * @param topicSubscriber
+ * Topic Subscriber
+ * @param callback
+ * Callback
+ * @param context
+ * Callback context
+ */
+ public void asyncCloseSubscription(TopicSubscriber topicSubscriber,
+ Callback<ResponseBody> callback,
+ Object context);
+
+ /**
+ * Return the subscription event emitter to emit subscription events.
+ *
+ * @return subscription event emitter.
+ */
+ public SubscriptionEventEmitter getSubscriptionEventEmitter();
+
+ /**
+ * Is the channel manager closed.
+ *
+ * @return true if the channel manager is closed, otherwise return false.
+ */
+ public boolean isClosed();
+
+ /**
+ * Close the channel manager.
+ */
+ public void close();
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java Wed Sep 26 23:52:18 2012
@@ -17,33 +17,18 @@
*/
package org.apache.hedwig.client.netty;
-import java.net.InetSocketAddress;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import com.google.protobuf.ByteString;
-import org.apache.bookkeeper.util.MathUtils;
import org.apache.hedwig.client.api.Client;
import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.handlers.MessageConsumeCallback;
-import org.apache.hedwig.client.ssl.SslClientContextFactory;
-import org.apache.hedwig.exceptions.PubSubException.UncertainStateException;
+import org.apache.hedwig.client.netty.impl.simple.SimpleHChannelManager;
/**
* This is a top level Hedwig Client class that encapsulates the common
@@ -54,49 +39,18 @@ public class HedwigClientImpl implements
private static final Logger logger = LoggerFactory.getLogger(HedwigClientImpl.class);
- // Empty Topic List
- private ConcurrentLinkedQueue<ByteString> EMPTY_TOPIC_LIST =
- new ConcurrentLinkedQueue<ByteString>();
-
- // Global counter used for generating unique transaction ID's for
- // publish and subscribe requests
- protected final AtomicLong globalCounter = new AtomicLong();
- // Static String constants
- protected static final String COLON = ":";
-
// The Netty socket factory for making connections to the server.
protected final ChannelFactory socketFactory;
// Whether the socket factory is one we created or is owned by whoever
// instantiated us.
protected boolean ownChannelFactory = false;
- // PipelineFactory to create netty client channels to the appropriate server
- private ClientChannelPipelineFactory pipelineFactory;
-
- // Concurrent Map to store the mapping from the Topic to the Host.
- // This could change over time since servers can drop mastership of topics
- // for load balancing or failover. If a server host ever goes down, we'd
- // also want to remove all topic mappings the host was responsible for.
- // The second Map is used as the inverted version of the first one.
- protected final ConcurrentMap<ByteString, InetSocketAddress> topic2Host = new ConcurrentHashMap<ByteString, InetSocketAddress>();
- private final ConcurrentMap<InetSocketAddress, ConcurrentLinkedQueue<ByteString>> host2Topics =
- new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<ByteString>>();
-
- // Each client instantiation will have a Timer for running recurring
- // threads. One such timer task thread to is to timeout long running
- // PubSubRequests that are waiting for an ack response from the server.
- private final Timer clientTimer = new Timer(true);
-
- // Boolean indicating if the client is running or has stopped.
- // Once we stop the client, we should sidestep all of the connect,
- // write callback and channel disconnected logic.
- private boolean isStopped = false;
+ // channel manager manages all the channels established by the client
+ protected final HChannelManager channelManager;
private HedwigSubscriber sub;
private final HedwigPublisher pub;
private final ClientConfiguration cfg;
- private final MessageConsumeCallback consumeCb;
- private SslClientContextFactory sslFactory = null;
public static Client create(ClientConfiguration cfg) {
return new HedwigClientImpl(cfg);
@@ -109,7 +63,8 @@ public class HedwigClientImpl implements
// Base constructor that takes in a Configuration object.
// This will create its own client socket channel factory.
protected HedwigClientImpl(ClientConfiguration cfg) {
- this(cfg, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
+ this(cfg, new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
ownChannelFactory = true;
}
@@ -118,23 +73,20 @@ public class HedwigClientImpl implements
protected HedwigClientImpl(ClientConfiguration cfg, ChannelFactory socketFactory) {
this.cfg = cfg;
this.socketFactory = socketFactory;
+ channelManager = new SimpleHChannelManager(cfg, socketFactory);
+
pub = new HedwigPublisher(this);
sub = new HedwigSubscriber(this);
- pipelineFactory = new ClientChannelPipelineFactory(this);
- consumeCb = new MessageConsumeCallback(this);
- if (cfg.isSSLEnabled()) {
- sslFactory = new SslClientContextFactory(cfg);
- }
- // Schedule all of the client timer tasks. Currently we only have the
- // Request Timeout task.
- clientTimer.schedule(new PubSubRequestTimeoutTask(), 0, cfg.getTimeoutThreadRunInterval());
}
- // Public getters for the various components of a client.
public ClientConfiguration getConfiguration() {
return cfg;
}
+ public HChannelManager getHChannelManager() {
+ return channelManager;
+ }
+
public HedwigSubscriber getSubscriber() {
return sub;
}
@@ -149,96 +101,14 @@ public class HedwigClientImpl implements
return pub;
}
- public MessageConsumeCallback getConsumeCallback() {
- return consumeCb;
- }
-
- public SslClientContextFactory getSslFactory() {
- return sslFactory;
- }
-
- // We need to deal with the possible problem of a PubSub request being
- // written to successfully to the server host but for some reason, the
- // ack message back never comes. What could happen is that the VoidCallback
- // stored in the ResponseHandler.txn2PublishData map will never be called.
- // We should have a configured timeout so if that passes from the time a
- // write was successfully done to the server, we can fail this async PubSub
- // transaction. The caller could possibly redo the transaction if needed at
- // a later time. Creating a timeout cleaner TimerTask to do this here.
- class PubSubRequestTimeoutTask extends TimerTask {
- /**
- * Implement the TimerTask's abstract run method.
- */
- @Override
- public void run() {
- logger.debug("Running the PubSubRequest Timeout Task");
- // Loop through all outstanding PubSubData requests and check if
- // the requestWriteTime has timed out compared to the current time.
- long curTime = MathUtils.now();
- long timeoutInterval = cfg.getServerAckResponseTimeout();
-
- // First check the ResponseHandlers associated with cached
- // channels in HedwigPublisher.host2Channel. This stores the
- // channels used for Publish and Unsubscribe requests.
- for (Channel channel : pub.host2Channel.values()) {
- ResponseHandler responseHandler = null;
- try {
- responseHandler = getResponseHandlerFromChannel(channel);
- } catch (NoResponseHandlerException e) {
- logger.warn("No response handler found for channel" + channel + " in the retry timeout task.", e);
- continue;
- }
- for (PubSubData pubSubData : responseHandler.txn2PubSubData.values()) {
- checkPubSubDataToTimeOut(pubSubData, responseHandler, curTime, timeoutInterval);
- }
- }
- // Now do the same for the cached channels in
- // HedwigSubscriber.topicSubscriber2Channel. This stores the
- // channels used exclusively for Subscribe requests.
- for (Channel channel : sub.topicSubscriber2Channel.values()) {
- ResponseHandler responseHandler = null;
- try {
- responseHandler = getResponseHandlerFromChannel(channel);
- } catch (NoResponseHandlerException e) {
- logger.warn("No response handler found for channel" + channel + " in the retry timeout task.", e);
- continue;
- }
- for (PubSubData pubSubData : responseHandler.txn2PubSubData.values()) {
- checkPubSubDataToTimeOut(pubSubData, responseHandler, curTime, timeoutInterval);
- }
- }
- }
-
- private void checkPubSubDataToTimeOut(PubSubData pubSubData, ResponseHandler responseHandler, long curTime,
- long timeoutInterval) {
- if (curTime > pubSubData.requestWriteTime + timeoutInterval) {
- // Current PubSubRequest has timed out so remove it from the
- // ResponseHandler's map and invoke the VoidCallback's
- // operationFailed method.
- logger.error("Current PubSubRequest has timed out for pubSubData: " + pubSubData);
- responseHandler.txn2PubSubData.remove(pubSubData.txnId);
- pubSubData.getCallback().operationFailed(pubSubData.context,
- new UncertainStateException("Server ack response never received so PubSubRequest has timed out!"));
- }
- }
- }
-
// When we are done with the client, this is a clean way to gracefully close
// all channels/sockets created by the client and to also release all
// resources used by netty.
public void close() {
logger.info("Stopping the client!");
- // Set the client boolean flag to indicate the client has stopped.
- isStopped = true;
- // Stop the timer and all timer task threads.
- clientTimer.cancel();
-
- pub.close();
- sub.close();
-
- // Clear out all Maps.
- topic2Host.clear();
- host2Topics.clear();
+
+ // close channel manager to release all channels
+ channelManager.close();
// Release resources used by the ChannelFactory on the client if we are
// the owner that created it.
@@ -248,170 +118,4 @@ public class HedwigClientImpl implements
logger.info("Completed stopping the client!");
}
- /**
- * This is a helper method to do the connect attempt to the server given the
- * inputted host/port. This can be used to connect to the default server
- * host/port which is the VIP. That will pick a server in the cluster at
- * random to connect to for the initial PubSub attempt (with redirect logic
- * being done at the server side). Additionally, this could be called after
- * the client makes an initial PubSub attempt at a server, and is redirected
- * to the one that is responsible for the topic. Once the connect to the
- * server is done, we will perform the corresponding PubSub write on that
- * channel.
- *
- * @param pubSubData
- * PubSub call's data wrapper object
- * @param serverHost
- * Input server host to connect to of type InetSocketAddress
- */
- public void doConnect(PubSubData pubSubData, InetSocketAddress serverHost) {
- logger.debug("Connecting to host: {} with pubSubData: {}", serverHost, pubSubData);
- // Set up the ClientBootStrap so we can create a new Channel connection
- // to the server.
- ClientBootstrap bootstrap = new ClientBootstrap(socketFactory);
- bootstrap.setPipelineFactory(pipelineFactory);
- bootstrap.setOption("tcpNoDelay", true);
- bootstrap.setOption("keepAlive", true);
-
- // Start the connection attempt to the input server host.
- ChannelFuture future = bootstrap.connect(serverHost);
- future.addListener(new ConnectCallback(pubSubData, serverHost, this));
- }
-
- /**
- * Helper method to store the topic2Host mapping in the HedwigClient cache
- * map. This method is assumed to be called when we've done a successful
- * connection to the correct server topic master.
- *
- * @param pubSubData
- * PubSub wrapper data
- * @param channel
- * Netty Channel
- */
- protected void storeTopic2HostMapping(PubSubData pubSubData, Channel channel) {
- // Retrieve the server host that we've connected to and store the
- // mapping from the topic to this host. For all other non-redirected
- // server statuses, we consider that as a successful connection to the
- // correct topic master.
- InetSocketAddress host = getHostFromChannel(channel);
- InetSocketAddress existingHost = topic2Host.get(pubSubData.topic);
- if (existingHost != null && existingHost.equals(host)) {
- // Entry in map exists for the topic but it is the same as the
- // current host. In this case there is nothing to do.
- return;
- }
-
- // Store the relevant mappings for this topic and host combination.
- if (topic2Host.putIfAbsent(pubSubData.topic, host) == null) {
- if (logger.isDebugEnabled())
- logger.debug("Stored info for topic: " + pubSubData.topic.toStringUtf8() + ", old host: "
- + existingHost + ", new host: " + host);
- ConcurrentLinkedQueue<ByteString> topicsForHost = host2Topics.get(host);
- if (topicsForHost == null) {
- ConcurrentLinkedQueue<ByteString> newTopicsList = new ConcurrentLinkedQueue<ByteString>();
- topicsForHost = host2Topics.putIfAbsent(host, newTopicsList);
- if (topicsForHost == null) {
- topicsForHost = newTopicsList;
- }
- }
- topicsForHost.add(pubSubData.topic);
- }
- }
-
- /**
- * Helper static method to get the String Hostname:Port from a netty
- * Channel. Assumption is that the netty Channel was originally created with
- * an InetSocketAddress. This is true with the Hedwig netty implementation.
- *
- * @param channel
- * Netty channel to extract the hostname and port from.
- * @return String representation of the Hostname:Port from the Netty Channel
- */
- public static InetSocketAddress getHostFromChannel(Channel channel) {
- return (InetSocketAddress) channel.getRemoteAddress();
- }
-
- /**
- * Helper static method to get the ResponseHandler instance from a Channel
- * via the ChannelPipeline it is associated with. The assumption is that the
- * last ChannelHandler tied to the ChannelPipeline is the ResponseHandler.
- *
- * @param channel
- * Channel we are retrieving the ResponseHandler instance for
- * @return ResponseHandler Instance tied to the Channel's Pipeline
- * @throws NoResponseHandlerException if the response handler found for the channel is null.
- */
- public static ResponseHandler getResponseHandlerFromChannel(Channel channel) throws NoResponseHandlerException {
- if (null == channel) {
- throw new NoResponseHandlerException("Received a null value for the channel. Cannot retrieve the response handler");
- }
- ResponseHandler handler = (ResponseHandler) channel.getPipeline().getLast();
- if (null == handler) {
- throw new NoResponseHandlerException("Could not retrieve the response handler from the channel's pipeline.");
- }
- return handler;
- }
-
- // Public getter for entries in the topic2Host Map.
- public InetSocketAddress getHostForTopic(ByteString topic) {
- return topic2Host.get(topic);
- }
-
- // If a server host goes down or the channel to it gets disconnected,
- // we want to clear out all relevant cached information. We'll
- // need to remove all of the topic mappings that the host was
- // responsible for.
- public void clearAllTopicsForHost(InetSocketAddress host) {
- logger.debug("Clearing all topics for host: {}", host);
- // For each of the topics that the host was responsible for,
- // remove it from the topic2Host mapping.
- ConcurrentLinkedQueue<ByteString> topicsForHost = host2Topics.get(host);
- if (topicsForHost != null) {
- for (ByteString topic : topicsForHost) {
- if (logger.isDebugEnabled())
- logger.debug("Removing mapping for topic: " + topic.toStringUtf8() + " from host: " + host);
- topic2Host.remove(topic, host);
- }
- // Now it is safe to remove the host2Topics mapping entry.
- host2Topics.remove(host, topicsForHost);
- }
- }
-
- // If a subscribe channel goes down, the topic might have moved.
- // We only clear out that topic for the host and not all cached information.
- public void clearHostForTopic(ByteString topic, InetSocketAddress host) {
- if (logger.isDebugEnabled()) {
- logger.debug("Clearing topic: " + topic.toStringUtf8() + " for host: "
- + host);
- }
- if (topic2Host.remove(topic, host)) {
- if (logger.isDebugEnabled()) {
- logger.debug("Removed topic to host mapping for topic: " + topic.toStringUtf8()
- + " and host: " + host);
- }
- }
- ConcurrentLinkedQueue<ByteString> topicsForHost = host2Topics.get(host);
- if (null != topicsForHost && topicsForHost.remove(topic)) {
- if (logger.isDebugEnabled()) {
- logger.debug("Removed topic: " + topic.toStringUtf8() + " from host: " + host);
- }
- if (topicsForHost.isEmpty()) {
- // remove only topic list is empty
- host2Topics.remove(host, EMPTY_TOPIC_LIST);
- }
- }
- }
-
- // Public getter to see if the client has been stopped.
- public boolean hasStopped() {
- return isStopped;
- }
-
- // Public getter to get the client's Timer object.
- // This is so we can reuse this and not have to create multiple Timer
- // objects.
- public Timer getClientTimer() {
- return clientTimer;
- }
-
}