You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/27 01:52:20 UTC
svn commit: r1390777 [3/4] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/java/org/apache/hedwig/client/data/
hedwig-client/src/main/java/org/apache/hedwig/client/handlers/
hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig...
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,621 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
+import org.apache.hedwig.client.handlers.MessageConsumeCallback;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+import org.apache.hedwig.client.netty.CleanupChannelMap;
+import org.apache.hedwig.client.netty.HChannel;
+import org.apache.hedwig.client.netty.HChannelManager;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.client.netty.SubscriptionEventEmitter;
+import org.apache.hedwig.client.ssl.SslClientContextFactory;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.filter.ClientMessageFilter;
+import org.apache.hedwig.protocol.PubSubProtocol;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageHeader;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
+
+/**
+ * Basic HChannel Manager Implementation
+ */
+public abstract class AbstractHChannelManager implements HChannelManager {
+
+ private static Logger logger = LoggerFactory.getLogger(AbstractHChannelManager.class);
+
+ // Empty Topic List
+ private final static Set<ByteString> EMPTY_TOPIC_SET =
+ new HashSet<ByteString>();
+
+ // Boolean indicating if the channel manager is running or has been closed.
+ // Once we stop the manager, we should sidestep all of the connect, write callback
+ // and channel disconnected logic.
+ protected boolean closed = false;
+ protected final ReentrantReadWriteLock closedLock =
+ new ReentrantReadWriteLock();
+
+ // Global counter used for generating unique transaction ID's for
+ // publish and subscribe requests
+ protected final AtomicLong globalCounter = new AtomicLong();
+
+ // Concurrent Map to store the mapping from the Topic to the Host.
+ // This could change over time since servers can drop mastership of topics
+ // for load balancing or failover. If a server host ever goes down, we'd
+ // also want to remove all topic mappings the host was responsible for.
+ // The second Map is used as the inverted version of the first one.
+ protected final ConcurrentMap<ByteString, InetSocketAddress> topic2Host =
+ new ConcurrentHashMap<ByteString, InetSocketAddress>();
+ // The inverse mapping is used only when clearing all topics. For performance
+ // consideration, we don't guarantee host2Topics to be consistent with
+ // topic2Host. it would be better to not rely on this mapping for anything
+ // significant.
+ protected final ConcurrentMap<InetSocketAddress, Set<ByteString>> host2Topics =
+ new ConcurrentHashMap<InetSocketAddress, Set<ByteString>>();
+
+ // This channels will be used for publish and unsubscribe requests
+ protected final CleanupChannelMap<InetSocketAddress> host2NonSubscriptionChannels =
+ new CleanupChannelMap<InetSocketAddress>();
+
+ private final ClientConfiguration cfg;
+ // The Netty socket factory for making connections to the server.
+ protected final ChannelFactory socketFactory;
+ // PipelineFactory to create non-subscription netty channels to the appropriate server
+ private final ClientChannelPipelineFactory nonSubscriptionChannelPipelineFactory;
+ // ssl context factory
+ private SslClientContextFactory sslFactory = null;
+
+ // default server channel
+ private final HChannel defaultServerChannel;
+
+ // Each client instantiation will have a Timer for running recurring
+ // threads. One such timer task thread to is to timeout long running
+ // PubSubRequests that are waiting for an ack response from the server.
+ private final Timer clientTimer = new Timer(true);
+ // a common consume callback for all consume requests.
+ private final MessageConsumeCallback consumeCb;
+ // A event emitter to emit subscription events
+ private final SubscriptionEventEmitter eventEmitter;
+
+ protected AbstractHChannelManager(ClientConfiguration cfg,
+ ChannelFactory socketFactory) {
+ this.cfg = cfg;
+ this.socketFactory = socketFactory;
+ this.nonSubscriptionChannelPipelineFactory =
+ new NonSubscriptionChannelPipelineFactory(cfg, this);
+
+ // create a default server channel
+ defaultServerChannel =
+ new DefaultServerChannel(cfg.getDefaultServerHost(), this);
+
+ if (cfg.isSSLEnabled()) {
+ sslFactory = new SslClientContextFactory(cfg);
+ }
+
+ consumeCb = new MessageConsumeCallback(cfg, this);
+ eventEmitter = new SubscriptionEventEmitter();
+
+ // Schedule Request Timeout task.
+ clientTimer.schedule(new PubSubRequestTimeoutTask(), 0,
+ cfg.getTimeoutThreadRunInterval());
+ }
+
+ @Override
+ public SubscriptionEventEmitter getSubscriptionEventEmitter() {
+ return eventEmitter;
+ }
+
+ public MessageConsumeCallback getConsumeCallback() {
+ return consumeCb;
+ }
+
+ public SslClientContextFactory getSslFactory() {
+ return sslFactory;
+ }
+
+ protected ChannelFactory getChannelFactory() {
+ return socketFactory;
+ }
+
+ protected ClientChannelPipelineFactory getNonSubscriptionChannelPipelineFactory() {
+ return this.nonSubscriptionChannelPipelineFactory;
+ }
+
+ protected abstract ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory();
+
+ @Override
+ public void schedule(final TimerTask task, final long delay) {
+ this.closedLock.readLock().lock();
+ try {
+ if (closed) {
+ logger.warn("Task {} is not scheduled due to the channel manager is closed.",
+ task);
+ return;
+ }
+ clientTimer.schedule(task, delay);
+ } finally {
+ this.closedLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void submitOpAfterDelay(final PubSubData pubSubData, final long delay) {
+ this.closedLock.readLock().lock();
+ try {
+ if (closed) {
+ pubSubData.getCallback().operationFailed(pubSubData.context,
+ new ServiceDownException("Client has been closed."));
+ return;
+ }
+ clientTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ logger.debug("Submit request {} in {} ms later.",
+ va(pubSubData, delay));
+ submitOp(pubSubData);
+ }
+ }, delay);
+ } finally {
+ closedLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void submitOp(PubSubData pubSubData) {
+ HChannel hChannel;
+ if (OperationType.PUBLISH.equals(pubSubData.operationType) ||
+ OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) {
+ hChannel = getNonSubscriptionChannelByTopic(pubSubData.topic);
+ } else {
+ TopicSubscriber ts = new TopicSubscriber(pubSubData.topic,
+ pubSubData.subscriberId);
+ hChannel = getSubscriptionChannelByTopicSubscriber(ts);
+ }
+ // no channel found to submit pubsub data
+ // choose the default server
+ if (null == hChannel) {
+ hChannel = defaultServerChannel;
+ }
+ hChannel.submitOp(pubSubData);
+ }
+
+ @Override
+ public void redirectToHost(PubSubData pubSubData, InetSocketAddress host) {
+ logger.debug("Submit operation {} to host {}.",
+ va(pubSubData, host));
+ HChannel hChannel;
+ if (OperationType.PUBLISH.equals(pubSubData.operationType) ||
+ OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) {
+ hChannel = getNonSubscriptionChannel(host);
+ if (null == hChannel) {
+ // create a channel to connect to specified host
+ hChannel = createAndStoreNonSubscriptionChannel(host);
+ }
+ } else {
+ hChannel = getSubscriptionChannel(host);
+ if (null == hChannel) {
+ // create a subscription channel to specified host
+ hChannel = createAndStoreSubscriptionChannel(host);
+ }
+ }
+ // no channel found to submit pubsub data
+ // choose the default server
+ if (null == hChannel) {
+ hChannel = defaultServerChannel;
+ }
+ hChannel.submitOp(pubSubData);
+ }
+
+ void submitOpThruChannel(PubSubData pubSubData, Channel channel) {
+ logger.debug("Submit operation {} to thru channel {}.",
+ va(pubSubData, channel));
+ HChannel hChannel;
+ if (OperationType.PUBLISH.equals(pubSubData.operationType) ||
+ OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) {
+ hChannel = createAndStoreNonSubscriptionChannel(channel);
+ } else {
+ hChannel = createAndStoreSubscriptionChannel(channel);
+ }
+ hChannel.submitOp(pubSubData);
+ }
+
+ @Override
+ public void submitOpToDefaultServer(PubSubData pubSubData) {
+ logger.debug("Submit operation {} to default server {}.",
+ va(pubSubData, defaultServerChannel));
+ defaultServerChannel.submitOp(pubSubData);
+ }
+
+ // Synchronized method to store the host2Channel mapping (if it doesn't
+ // exist yet). Retrieve the hostname info from the Channel created via the
+ // RemoteAddress tied to it.
+ private HChannel createAndStoreNonSubscriptionChannel(Channel channel) {
+ InetSocketAddress host = NetUtils.getHostFromChannel(channel);
+ HChannel newHChannel = new HChannelImpl(host, channel, this,
+ getNonSubscriptionChannelPipelineFactory());
+ return storeNonSubscriptionChannel(host, newHChannel);
+ }
+
+ private HChannel createAndStoreNonSubscriptionChannel(InetSocketAddress host) {
+ HChannel newHChannel = new HChannelImpl(host, this,
+ getNonSubscriptionChannelPipelineFactory());
+ return storeNonSubscriptionChannel(host, newHChannel);
+ }
+
+ private HChannel storeNonSubscriptionChannel(InetSocketAddress host,
+ HChannel newHChannel) {
+ return host2NonSubscriptionChannels.addChannel(host, newHChannel);
+ }
+
+ /**
+ * Is there a {@link HChannel} existed for a given host.
+ *
+ * @param host
+ * Target host address.
+ */
+ private HChannel getNonSubscriptionChannel(InetSocketAddress host) {
+ return host2NonSubscriptionChannels.getChannel(host);
+ }
+
+ /**
+ * Get a non-subscription channel for a given <code>topic</code>.
+ *
+ * @param topic
+ * Topic Name
+ * @return if <code>topic</code>'s owner is unknown, return null.
+ * if <code>topic</code>'s owner is know and there is a channel
+ * existed before, return the existed channel, otherwise created
+ * a new one.
+ */
+ private HChannel getNonSubscriptionChannelByTopic(ByteString topic) {
+ InetSocketAddress host = topic2Host.get(topic);
+ if (null == host) {
+ // we don't know where is the topic
+ return null;
+ } else {
+ // we had know which server owned the topic
+ HChannel channel = getNonSubscriptionChannel(host);
+ if (null == channel) {
+ // create a channel to connect to specified host
+ channel = createAndStoreNonSubscriptionChannel(host);
+ }
+ return channel;
+ }
+ }
+
+ /**
+ * Handle the disconnected event from a non-subscription {@link HChannel}.
+ *
+ * @param host
+ * Which host is disconnected.
+ * @param channel
+ * The underlying established channel.
+ */
+ protected void onNonSubscriptionChannelDisconnected(InetSocketAddress host,
+ Channel channel) {
+ // Only remove the Channel from the mapping if this current
+ // disconnected channel is the same as the cached entry.
+ // Due to race concurrency situations, it is possible to
+ // create multiple channels to the same host for publish
+ // and unsubscribe requests.
+ HChannel hChannel = host2NonSubscriptionChannels.getChannel(host);
+ if (null == hChannel) {
+ return;
+ }
+ Channel underlyingChannel = hChannel.getChannel();
+ if (null == underlyingChannel ||
+ !underlyingChannel.equals(channel)) {
+ return;
+ }
+ logger.info("NonSubscription Channel {} to {} disconnected.",
+ va(channel, host));
+ // remove existed channel
+ if (host2NonSubscriptionChannels.removeChannel(host, hChannel)) {
+ clearAllTopicsForHost(host);
+ }
+ }
+
+ /**
+ * Create and store a subscription {@link HChannel} thru the underlying established
+ * <code>channel</code>
+ *
+ * @param channel
+ * The underlying established subscription channel.
+ */
+ protected abstract HChannel createAndStoreSubscriptionChannel(Channel channel);
+
+ /**
+ * Create and store a subscription {@link HChannel} to target host.
+ *
+ * @param host
+ * Target host address.
+ */
+ protected abstract HChannel createAndStoreSubscriptionChannel(InetSocketAddress host);
+
+ /**
+ * Is there a subscription {@link HChannel} existed for a given host.
+ *
+ * @param host
+ * Target host address.
+ */
+ protected abstract HChannel getSubscriptionChannel(InetSocketAddress host);
+
+ /**
+ * Get a subscription channel for a given <code>topicSubscriber</code>.
+ *
+ * @param topicSubscriber
+ * Topic Subscriber
+ * @return if <code>topic</code>'s owner is unknown, return null.
+ * if <code>topic</code>'s owner is know and there is a channel
+ * existed before, return the existed channel, otherwise created
+ * a new one for the <code>topicSubscriber</code>.
+ */
+ protected abstract HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber topicSubscriber);
+
+ /**
+ * Handle the disconnected event from a subscription {@link HChannel}.
+ *
+ * @param host
+ * Which host is disconnected.
+ * @param channel
+ * The underlying established channel.
+ */
+ protected abstract void onSubscriptionChannelDisconnected(InetSocketAddress host,
+ Channel channel);
+
+ private void sendConsumeRequest(final TopicSubscriber topicSubscriber,
+ final MessageSeqId messageSeqId,
+ final Channel channel) {
+ PubSubRequest.Builder pubsubRequestBuilder =
+ NetUtils.buildConsumeRequest(nextTxnId(), topicSubscriber, messageSeqId);
+
+ // For Consume requests, we will send them from the client in a fire and
+ // forget manner. We are not expecting the server to send back an ack
+ // response so no need to register this in the ResponseHandler. There
+ // are no callbacks to invoke since this isn't a client initiated
+ // action. Instead, just have a future listener that will log an error
+ // message if there was a problem writing the consume request.
+ logger.debug("Writing a Consume request to host: {} with messageSeqId: {} for {}",
+ va(NetUtils.getHostFromChannel(channel), messageSeqId, topicSubscriber));
+ ChannelFuture future = channel.write(pubsubRequestBuilder.build());
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.error("Error writing a Consume request to host: {} with messageSeqId: {} for {}",
+ va(NetUtils.getHostFromChannel(channel),
+ messageSeqId, topicSubscriber));
+ }
+ }
+ });
+ }
+
+ /**
+ * Helper method to store the topic2Host mapping in the channel manager cache
+ * map. This method is assumed to be called when we've done a successful
+ * connection to the correct server topic master.
+ *
+ * @param topic
+ * Topic Name
+ * @param host
+ * Host Address
+ */
+ protected void storeTopic2HostMapping(ByteString topic, InetSocketAddress host) {
+ InetSocketAddress oldHost = topic2Host.putIfAbsent(topic, host);
+ if (null != oldHost && oldHost.equals(host)) {
+ // Entry in map exists for the topic but it is the same as the
+ // current host. In this case there is nothing to do.
+ return;
+ }
+
+ if (null != oldHost) {
+ if (topic2Host.replace(topic, oldHost, host)) {
+ // Store the relevant mappings for this topic and host combination.
+ logger.debug("Storing info for topic: {}, old host: {}, new host: {}.",
+ va(topic.toStringUtf8(), oldHost, host));
+ clearHostForTopic(topic, oldHost);
+ } else {
+ logger.warn("Ownership of topic: {} has been changed from {} to {} when storeing host: {}",
+ va(topic.toStringUtf8(), oldHost, topic2Host.get(topic), host));
+ return;
+ }
+ } else {
+ logger.debug("Storing info for topic: {}, host: {}.",
+ va(topic.toStringUtf8(), host));
+ }
+ Set<ByteString> topicsForHost = host2Topics.get(host);
+ if (null == topicsForHost) {
+ Set<ByteString> newTopicsSet = new HashSet<ByteString>();
+ topicsForHost = host2Topics.putIfAbsent(host, newTopicsSet);
+ if (null == topicsForHost) {
+ topicsForHost = newTopicsSet;
+ }
+ }
+ synchronized (topicsForHost) {
+ // check whether the ownership changed, since it might happened
+ // after replace succeed
+ if (host.equals(topic2Host.get(topic))) {
+ topicsForHost.add(topic);
+ }
+ }
+ }
+
+ // If a server host goes down or the channel to it gets disconnected,
+ // we want to clear out all relevant cached information. We'll
+ // need to remove all of the topic mappings that the host was
+ // responsible for.
+ protected void clearAllTopicsForHost(InetSocketAddress host) {
+ logger.debug("Clearing all topics for host: {}", host);
+ // For each of the topics that the host was responsible for,
+ // remove it from the topic2Host mapping.
+ Set<ByteString> topicsForHost = host2Topics.get(host);
+ if (null != topicsForHost) {
+ synchronized (topicsForHost) {
+ for (ByteString topic : topicsForHost) {
+ logger.debug("Removing mapping for topic: {} from host: {}.",
+ va(topic.toStringUtf8(), host));
+ topic2Host.remove(topic, host);
+ }
+ }
+ // Now it is safe to remove the host2Topics mapping entry.
+ host2Topics.remove(host, topicsForHost);
+ }
+ }
+
+ // If a subscribe channel goes down, the topic might have moved.
+ // We only clear out that topic for the host and not all cached information.
+ public void clearHostForTopic(ByteString topic, InetSocketAddress host) {
+ logger.debug("Clearing topic: {} from host: {}.",
+ va(topic.toStringUtf8(), host));
+ if (topic2Host.remove(topic, host)) {
+ logger.debug("Removed topic to host mapping for topic: {} and host: {}.",
+ va(topic.toStringUtf8(), host));
+ }
+ Set<ByteString> topicsForHost = host2Topics.get(host);
+ if (null != topicsForHost) {
+ boolean removed;
+ synchronized (topicsForHost) {
+ removed = topicsForHost.remove(topic);
+ }
+ if (removed) {
+ logger.debug("Removed topic: {} from host: {}.",
+ topic.toStringUtf8(), host);
+ if (topicsForHost.isEmpty()) {
+ // remove only topic list is empty
+ host2Topics.remove(host, EMPTY_TOPIC_SET);
+ }
+ }
+ }
+ }
+
+ @Override
+ public long nextTxnId() {
+ return globalCounter.incrementAndGet();
+ }
+
+ // We need to deal with the possible problem of a PubSub request being
+ // written to successfully to the server host but for some reason, the
+ // ack message back never comes. What could happen is that the VoidCallback
+ // stored in the ResponseHandler.txn2PublishData map will never be called.
+ // We should have a configured timeout so if that passes from the time a
+ // write was successfully done to the server, we can fail this async PubSub
+ // transaction. The caller could possibly redo the transaction if needed at
+ // a later time. Creating a timeout cleaner TimerTask to do this here.
+ class PubSubRequestTimeoutTask extends TimerTask {
+ /**
+ * Implement the TimerTask's abstract run method.
+ */
+ @Override
+ public void run() {
+ if (isClosed()) {
+ return;
+ }
+ logger.debug("Running the PubSubRequest Timeout Task");
+ // First check those non-subscription channels
+ for (HChannel channel : host2NonSubscriptionChannels.getChannels()) {
+ try {
+ HChannelHandler channelHandler =
+ HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel());
+ channelHandler.checkTimeoutRequests();
+ } catch (NoResponseHandlerException nrhe) {
+ continue;
+ }
+ }
+ // Then check those subscription channels
+ checkTimeoutRequestsOnSubscriptionChannels();
+ }
+ }
+
+ /**
+ * Chekout the pub/sub requests on subscription channels.
+ */
+ protected abstract void checkTimeoutRequestsOnSubscriptionChannels();
+
+ @Override
+ public boolean isClosed() {
+ closedLock.readLock().lock();
+ try {
+ return closed;
+ } finally {
+ closedLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Close all subscription channels when close channel manager.
+ */
+ protected abstract void closeSubscriptionChannels();
+
+ @Override
+ public void close() {
+ logger.info("Shutting down the channels manager.");
+ closedLock.writeLock().lock();
+ try {
+ // Not first time to close
+ if (closed) {
+ return;
+ }
+ closed = true;
+ } finally {
+ closedLock.writeLock().unlock();
+ }
+ clientTimer.cancel();
+ // Clear all existed channels
+ host2NonSubscriptionChannels.close();
+
+ // clear all subscription channels
+ closeSubscriptionChannels();
+
+ // Clear out all Maps
+ topic2Host.clear();
+ host2Topics.clear();
+ }
+
+}
Copied: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java (from r1390401, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java?p2=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java&p1=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java&r1=1390401&r2=1390777&rev=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java Wed Sep 26 23:52:18 2012
@@ -15,7 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hedwig.client.netty;
+package org.apache.hedwig.client.netty.impl;
+
+import java.util.Map;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -26,14 +28,26 @@ import org.jboss.netty.handler.codec.pro
import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
import org.jboss.netty.handler.ssl.SslHandler;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.handlers.AbstractResponseHandler;
import org.apache.hedwig.protocol.PubSubProtocol;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+
+public abstract class ClientChannelPipelineFactory implements ChannelPipelineFactory {
-public class ClientChannelPipelineFactory implements ChannelPipelineFactory {
+ protected ClientConfiguration cfg;
+ protected AbstractHChannelManager channelManager;
+
+ public ClientChannelPipelineFactory(ClientConfiguration cfg,
+ AbstractHChannelManager channelManager) {
+ this.cfg = cfg;
+ this.channelManager = channelManager;
+ }
- private HedwigClientImpl client;
+ protected abstract Map<OperationType, AbstractResponseHandler> createResponseHandlers();
- public ClientChannelPipelineFactory(HedwigClientImpl client) {
- this.client = client;
+ private HChannelHandler createHChannelHandler() {
+ return new HChannelHandler(cfg, channelManager, createResponseHandlers());
}
// Retrieve a ChannelPipeline from the factory.
@@ -41,17 +55,17 @@ public class ClientChannelPipelineFactor
// Create a new ChannelPipline using the factory method from the
// Channels helper class.
ChannelPipeline pipeline = Channels.pipeline();
- if (client.getSslFactory() != null) {
- pipeline.addLast("ssl", new SslHandler(client.getSslFactory().getEngine()));
+ if (channelManager.getSslFactory() != null) {
+ pipeline.addLast("ssl", new SslHandler(channelManager.getSslFactory().getEngine()));
}
- pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(client.getConfiguration()
- .getMaximumMessageSize(), 0, 4, 0, 4));
+ pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(
+ cfg.getMaximumMessageSize(), 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
pipeline.addLast("protobufdecoder", new ProtobufDecoder(PubSubProtocol.PubSubResponse.getDefaultInstance()));
pipeline.addLast("protobufencoder", new ProtobufEncoder());
- pipeline.addLast("responsehandler", new ResponseHandler(client));
+ pipeline.addLast("responsehandler", createHChannelHandler());
return pipeline;
}
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl;
+
+import java.net.InetSocketAddress;
+
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import static org.apache.hedwig.util.VarArgs.va;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handle requests sent to default hub server. <b>DefaultServerChannel</b> would not
+ * be used as a channel to send requests directly. It just takes the responsibility to
+ * connect to the default server. After the underlying netty channel is established,
+ * it would call {@link HChannelManager#submitOpThruChannel()} to send requests thru
+ * the underlying netty channel.
+ */
+class DefaultServerChannel extends HChannelImpl {
+
+ private static Logger logger = LoggerFactory.getLogger(DefaultServerChannel.class);
+
+ DefaultServerChannel(InetSocketAddress host, AbstractHChannelManager channelManager) {
+ super(host, channelManager);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[DefaultServer: ").append(host).append("]");
+ return sb.toString();
+ }
+
+ @Override
+ public void submitOp(final PubSubData pubSubData) {
+ // for each pub/sub request sent to default hub server
+ // we would establish a fresh connection for it
+ ClientChannelPipelineFactory pipelineFactory;
+ if (OperationType.PUBLISH.equals(pubSubData.operationType) ||
+ OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) {
+ pipelineFactory = channelManager.getNonSubscriptionChannelPipelineFactory();
+ } else {
+ pipelineFactory = channelManager.getSubscriptionChannelPipelineFactory();
+ }
+ ChannelFuture future = connect(host, pipelineFactory);
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ // If the channel has been closed, there is no need to proceed with any callback
+ // logic here.
+ if (closed) {
+ future.getChannel().close();
+ return;
+ }
+
+ // Check if the connection to the server was done successfully.
+ if (!future.isSuccess()) {
+ logger.error("Error connecting to host {}.", host);
+ future.getChannel().close();
+
+ retryOrFailOp(pubSubData);
+ // Finished with failure logic so just return.
+ return;
+ }
+ logger.debug("Connected to host {} for pubSubData: {}",
+ va(host, pubSubData));
+ channelManager.submitOpThruChannel(pubSubData, future.getChannel());
+ }
+ });
+ }
+
+}
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.client.handlers.AbstractResponseHandler;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+import org.apache.hedwig.exceptions.PubSubException.UncertainStateException;
+import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import static org.apache.hedwig.util.VarArgs.va;
+
+@ChannelPipelineCoverage("all")
+public class HChannelHandler extends SimpleChannelHandler {
+
+ private static Logger logger = LoggerFactory.getLogger(HChannelHandler.class);
+
+ // Concurrent Map to store for each async PubSub request, the txn ID
+ // and the corresponding PubSub call's data which stores the VoidCallback to
+ // invoke when we receive a PubSub ack response from the server.
+ // This is specific to this instance of the HChannelHandler which is
+ // tied to a specific netty Channel Pipeline.
+ private final ConcurrentMap<Long, PubSubData> txn2PubSubData =
+ new ConcurrentHashMap<Long, PubSubData>();
+
+ // Boolean indicating if we closed the channel this HChannelHandler is
+ // attached to explicitly or not. If so, we do not need to do the
+ // channel disconnected logic here.
+ private volatile boolean channelClosedExplicitly = false;
+
+ private final AbstractHChannelManager channelManager;
+ private final ClientConfiguration cfg;
+
+ private final Map<OperationType, AbstractResponseHandler> handlers;
+ private final SubscribeResponseHandler subHandler;
+
+ public HChannelHandler(ClientConfiguration cfg,
+ AbstractHChannelManager channelManager,
+ Map<OperationType, AbstractResponseHandler> handlers) {
+ this.cfg = cfg;
+ this.channelManager = channelManager;
+ this.handlers = handlers;
+ subHandler = (SubscribeResponseHandler) handlers.get(OperationType.SUBSCRIBE);
+ }
+
+ public SubscribeResponseHandler getSubscribeResponseHandler() {
+ return subHandler;
+ }
+
+ public void removeTxn(long txnId) {
+ txn2PubSubData.remove(txnId);
+ }
+
+ public void addTxn(long txnId, PubSubData pubSubData) {
+ txn2PubSubData.put(txnId, pubSubData);
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ // If the Message is not a PubSubResponse, just send it upstream and let
+ // something else handle it.
+ if (!(e.getMessage() instanceof PubSubResponse)) {
+ ctx.sendUpstream(e);
+ return;
+ }
+ // Retrieve the PubSubResponse from the Message that was sent by the
+ // server.
+ PubSubResponse response = (PubSubResponse) e.getMessage();
+ logger.debug("Response received from host: {}, response: {}.",
+ va(NetUtils.getHostFromChannel(ctx.getChannel()), response));
+
+ // Determine if this PubSubResponse is an ack response for a PubSub
+ // Request or if it is a message being pushed to the client subscriber.
+ if (response.hasMessage()) {
+ // Subscribed messages being pushed to the client so handle/consume
+ // it and return.
+ if (null == subHandler) {
+ logger.error("Received message from a non-subscription channel : {}",
+ response);
+ } else {
+ subHandler.handleSubscribeMessage(response);
+ }
+ return;
+ }
+
+ // Response is an ack to a prior PubSubRequest so first retrieve the
+ // PubSub data for this txn.
+ PubSubData pubSubData = txn2PubSubData.remove(response.getTxnId());
+
+ // Validate that the PubSub data for this txn is stored. If not, just
+ // log an error message and return since we don't know how to handle
+ // this.
+ if (pubSubData == null) {
+ logger.error("PubSub Data was not found for PubSubResponse: {}", response);
+ return;
+ }
+
+ // Store the topic2Host mapping if this wasn't a server redirect. We'll
+ // assume that if the server was able to have an open Channel connection
+ // to the client, and responded with an ack message other than the
+ // NOT_RESPONSIBLE_FOR_TOPIC one, it is the correct topic master.
+ if (!response.getStatusCode().equals(StatusCode.NOT_RESPONSIBLE_FOR_TOPIC)) {
+ // Retrieve the server host that we've connected to and store the
+ // mapping from the topic to this host. For all other non-redirected
+ // server statuses, we consider that as a successful connection to the
+ // correct topic master.
+ InetSocketAddress host = NetUtils.getHostFromChannel(ctx.getChannel());
+ channelManager.storeTopic2HostMapping(pubSubData.topic, host);
+ }
+
+ // Depending on the operation type, call the appropriate handler.
+ logger.debug("Handling a {} response: {}, pubSubData: {}, host: {}.",
+ va(pubSubData.operationType, response, pubSubData, ctx.getChannel()));
+ AbstractResponseHandler respHandler = handlers.get(pubSubData.operationType);
+ if (null == respHandler) {
+ // The above are the only expected PubSubResponse messages received
+ // from the server for the various client side requests made.
+ logger.error("Response received from server is for an unhandled operation {}, txnId: {}.",
+ va(pubSubData.operationType, response.getTxnId()));
+ pubSubData.getCallback().operationFailed(pubSubData.context,
+ new UnexpectedConditionException("Can't find response handler for operation "
+ + pubSubData.operationType));
+ return;
+ }
+ respHandler.handleResponse(response, pubSubData, ctx.getChannel());
+ }
+
+ public void checkTimeoutRequests() {
+ long curTime = System.currentTimeMillis();
+ long timeoutInterval = cfg.getServerAckResponseTimeout();
+ for (PubSubData pubSubData : txn2PubSubData.values()) {
+ checkTimeoutRequest(pubSubData, curTime, timeoutInterval);
+ }
+ }
+
+ private void checkTimeoutRequest(PubSubData pubSubData,
+ long curTime, long timeoutInterval) {
+ if (curTime > pubSubData.requestWriteTime + timeoutInterval) {
+ // Current PubSubRequest has timed out so remove it from the
+ // ResponseHandler's map and invoke the VoidCallback's
+ // operationFailed method.
+ logger.error("Current PubSubRequest has timed out for pubSubData: " + pubSubData);
+ txn2PubSubData.remove(pubSubData.txnId);
+ pubSubData.getCallback().operationFailed(pubSubData.context,
+ new UncertainStateException("Server ack response never received so PubSubRequest has timed out!"));
+ }
+ }
+
+ // Logic to deal with what happens when a Channel to a server host is
+ // disconnected.
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ // If this channel was closed explicitly by the client code,
+ // we do not need to do any of this logic. This could happen
+ // for redundant Publish channels created or redirected subscribe
+ // channels that are not used anymore or when we shutdown the
+ // client and manually close all of the open channels.
+ // Also don't do any of the disconnect logic if the client has stopped.
+ if (channelClosedExplicitly || channelManager.isClosed()) {
+ return;
+ }
+
+ // Make sure the host retrieved is not null as there could be some weird
+ // channel disconnect events happening during a client shutdown.
+ // If it is, just return as there shouldn't be anything we need to do.
+ InetSocketAddress host = NetUtils.getHostFromChannel(ctx.getChannel());
+ if (host == null) {
+ return;
+ }
+
+ logger.info("Channel {} was disconnected to host {}.",
+ va(ctx.getChannel(), host));
+
+ // If this Channel was used for Publish and Unsubscribe flows, just
+ // remove it from the HewdigPublisher's host2Channel map. We will
+ // re-establish a Channel connection to that server when the next
+ // publish/unsubscribe request to a topic that the server owns occurs.
+
+ // Now determine what type of operation this channel was used for.
+ if (null == subHandler) {
+ channelManager.onNonSubscriptionChannelDisconnected(host, ctx.getChannel());
+ } else {
+ channelManager.onSubscriptionChannelDisconnected(host, ctx.getChannel());
+ }
+
+ // Finally, all of the PubSubRequests that are still waiting for an ack
+ // response from the server need to be removed and timed out. Invoke the
+ // operationFailed callbacks on all of them. Use the
+ // UncertainStateException since the server did receive the request but
+ // we're not sure of the state of the request since the ack response was
+ // never received.
+ for (PubSubData pubSubData : txn2PubSubData.values()) {
+ logger.debug("Channel disconnected so invoking the operationFailed callback for pubSubData: {}",
+ pubSubData);
+ pubSubData.getCallback().operationFailed(pubSubData.context, new UncertainStateException(
+ "Server ack response never received before server connection disconnected!"));
+ }
+ txn2PubSubData.clear();
+ }
+
+ // Logic to deal with what happens when a Channel to a server host is
+ // connected. This is needed if the client is using an SSL port to
+ // communicate with the server. If so, we need to do the SSL handshake here
+ // when the channel is first connected.
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ // No need to initiate the SSL handshake if we are closing this channel
+ // explicitly or the client has been stopped.
+ if (cfg.isSSLEnabled() && !channelClosedExplicitly && !channelManager.isClosed()) {
+ logger.debug("Initiating the SSL handshake");
+ ctx.getPipeline().get(SslHandler.class).handshake(e.getChannel());
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ logger.error("Exception caught on client channel", e.getCause());
+ e.getChannel().close();
+ }
+
+ public void closeExplicitly() {
+ // TODO: BOOKKEEPER-350 : Handle consume buffering, etc here - in a different patch
+ channelClosedExplicitly = true;
+ }
+}
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayDeque;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import com.google.protobuf.ByteString;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
+import org.apache.hedwig.client.netty.HChannel;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.util.HedwigSocketAddress;
+import static org.apache.hedwig.util.VarArgs.va;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provide a wrapper over netty channel for Hedwig operations.
+ */
+public class HChannelImpl implements HChannel {
+
+ private static Logger logger = LoggerFactory.getLogger(HChannelImpl.class);
+
+ enum State {
+ DISCONNECTED,
+ CONNECTING,
+ CONNECTED,
+ };
+
+ InetSocketAddress host;
+ final AbstractHChannelManager channelManager;
+ final ClientChannelPipelineFactory pipelineFactory;
+ volatile Channel channel;
+ volatile State state;
+
+ // Indicates whether the channel is closed or not.
+ volatile boolean closed = false;
+ // Queue the pubsub requests when the channel is not connected.
+ Queue<PubSubData> pendingOps = new ArrayDeque<PubSubData>();
+
+ /**
+ * Create a un-established channel with provided target <code>host</code>.
+ *
+ * @param host
+ * Target host address.
+ * @param channelManager
+ * Channel manager manages the channels.
+ */
+ protected HChannelImpl(InetSocketAddress host, AbstractHChannelManager channelManager) {
+ this(host, channelManager, null);
+ }
+
+ public HChannelImpl(InetSocketAddress host, AbstractHChannelManager channelManager,
+ ClientChannelPipelineFactory pipelineFactory) {
+ this(host, null, channelManager, pipelineFactory);
+ state = State.DISCONNECTED;
+ }
+
+ /**
+ * Create a <code>HChannel</code> with an established netty channel.
+ *
+ * @param host
+ * Target host address.
+ * @param channel
+ * Established Netty channel.
+ * @param channelManager
+ * Channel manager manages the channels.
+ */
+ public HChannelImpl(InetSocketAddress host, Channel channel,
+ AbstractHChannelManager channelManager,
+ ClientChannelPipelineFactory pipelineFactory) {
+ this.host = host;
+ this.channel = channel;
+ this.channelManager = channelManager;
+ this.pipelineFactory = pipelineFactory;
+ state = State.CONNECTED;
+ }
+
+ @Override
+ public void submitOp(PubSubData pubSubData) {
+ boolean doOpNow = false;
+
+ // common case without lock first
+ if (null != channel && State.CONNECTED == state) {
+ doOpNow = true;
+ } else {
+ synchronized (this) {
+ // check channel & state again under lock
+ if (null != channel && State.CONNECTED == state) {
+ doOpNow = true;
+ } else {
+ // if reached here, channel is either null (first connection attempt),
+ // or the channel is disconnected. Connection attempt is still in progress,
+ // queue up this op. Op will be executed when connection attempt either
+ // fails or succeeds
+ pendingOps.add(pubSubData);
+ }
+ }
+ if (!doOpNow) {
+ // start connection attempt to server
+ connect();
+ }
+ }
+ if (doOpNow) {
+ executeOpAfterConnected(pubSubData);
+ }
+ }
+
+ /**
+ * Execute pub/sub operation after the underlying channel is connected.
+ *
+ * @param pubSubData
+ * Pub/Sub Operation
+ */
+ private void executeOpAfterConnected(PubSubData pubSubData) {
+ PubSubRequest.Builder reqBuilder =
+ NetUtils.buildPubSubRequest(channelManager.nextTxnId(), pubSubData);
+ writePubSubRequest(pubSubData, reqBuilder.build());
+ }
+
+ @Override
+ public Channel getChannel() {
+ return channel;
+ }
+
+ private void writePubSubRequest(PubSubData pubSubData, PubSubRequest pubSubRequest) {
+ if (closed || null == channel || State.CONNECTED != state) {
+ retryOrFailOp(pubSubData);
+ return;
+ }
+
+ // Before we do the write, store this information into the
+ // ResponseHandler so when the server responds, we know what
+ // appropriate Callback Data to invoke for the given txn ID.
+ try {
+ getHChannelHandlerFromChannel(channel)
+ .addTxn(pubSubData.txnId, pubSubData);
+ } catch (NoResponseHandlerException nrhe) {
+ logger.warn("No Channel Handler found for channel {} when writing request."
+ + " It might already disconnect.", channel);
+ return;
+ }
+
+ // Finally, write the pub/sub request through the Channel.
+ logger.debug("Writing a {} request to host: {} for pubSubData: {}.",
+ va(pubSubData.operationType, host, pubSubData));
+ ChannelFuture future = channel.write(pubSubRequest);
+ future.addListener(new WriteCallback(pubSubData, channelManager));
+ }
+
+ /**
+ * Re-submit operation to default server or fail it.
+ *
+ * @param pubSubData
+ * Pub/Sub Operation
+ */
+ protected void retryOrFailOp(PubSubData pubSubData) {
+ // if we were not able to connect to the host, it could be down
+ ByteString hostString = ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(host));
+ if (pubSubData.connectFailedServers != null &&
+ pubSubData.connectFailedServers.contains(hostString)) {
+ // We've already tried to connect to this host before so just
+ // invoke the operationFailed callback.
+ logger.error("Error connecting to host {} more than once so fail the request: {}",
+ va(host, pubSubData));
+ pubSubData.getCallback().operationFailed(pubSubData.context,
+ new CouldNotConnectException("Could not connect to host: " + host));
+ } else {
+ logger.error("Retry to connect to default hub server again for pubSubData: {}",
+ pubSubData);
+ // Keep track of this current server that we failed to connect
+ // to but retry the request on the default server host/VIP.
+ if (pubSubData.connectFailedServers == null) {
+ pubSubData.connectFailedServers = new LinkedList<ByteString>();
+ }
+ pubSubData.connectFailedServers.add(hostString);
+ channelManager.submitOpToDefaultServer(pubSubData);
+ }
+ }
+
+ private void onChannelConnected(ChannelFuture future) {
+ Queue<PubSubData> oldPendingOps;
+ synchronized (this) {
+ // if the channel is closed by client, do nothing
+ if (closed) {
+ future.getChannel().close();
+ return;
+ }
+ state = State.CONNECTED;
+ channel = future.getChannel();
+ host = NetUtils.getHostFromChannel(channel);
+ oldPendingOps = pendingOps;
+ pendingOps = new ArrayDeque<PubSubData>();
+ }
+ for (PubSubData op : oldPendingOps) {
+ executeOpAfterConnected(op);
+ }
+ }
+
+ private void onChannelConnectFailure() {
+ Queue<PubSubData> oldPendingOps;
+ synchronized (this) {
+ state = State.DISCONNECTED;
+ channel = null;
+ oldPendingOps = pendingOps;
+ pendingOps = new ArrayDeque<PubSubData>();
+ }
+ for (PubSubData op : oldPendingOps) {
+ retryOrFailOp(op);
+ }
+ }
+
+ private void connect() {
+ synchronized (this) {
+ if (State.CONNECTING == state ||
+ State.CONNECTED == state) {
+ return;
+ }
+ state = State.CONNECTING;
+ }
+ // Start the connection attempt to the input server host.
+ ChannelFuture future = connect(host, pipelineFactory);
+ future.addListener(new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ // If the channel has been closed, there is no need to proceed with any
+ // callback logic here.
+ if (closed) {
+ future.getChannel().close();
+ return;
+ }
+
+ if (!future.isSuccess()) {
+ logger.error("Error connecting to host {}.", host);
+ future.getChannel().close();
+
+ // if we were not able to connect to the host, it could be down.
+ onChannelConnectFailure();
+ return;
+ }
+ logger.debug("Connected to server {}.", host);
+ // Now that we have connected successfully to the server, execute all queueing
+ // requests.
+ onChannelConnected(future);
+ }
+
+ });
+ }
+
+ /**
+ * This is a helper method to do the connect attempt to the server given the
+ * inputted host/port. This can be used to connect to the default server
+ * host/port which is the VIP. That will pick a server in the cluster at
+ * random to connect to for the initial PubSub attempt (with redirect logic
+ * being done at the server side). Additionally, this could be called after
+ * the client makes an initial PubSub attempt at a server, and is redirected
+ * to the one that is responsible for the topic. Once the connect to the
+ * server is done, we will perform the corresponding PubSub write on that
+ * channel.
+ *
+ * @param serverHost
+ * Input server host to connect to of type InetSocketAddress
+ * @param pipelineFactory
+ * PipelineFactory to create response handler to handle responses from
+ * underlying channel.
+ */
+ protected ChannelFuture connect(InetSocketAddress serverHost,
+ ClientChannelPipelineFactory pipelineFactory) {
+ logger.debug("Connecting to host {} ...", serverHost);
+ // Set up the ClientBootStrap so we can create a new Channel connection
+ // to the server.
+ ClientBootstrap bootstrap = new ClientBootstrap(channelManager.getChannelFactory());
+ bootstrap.setPipelineFactory(pipelineFactory);
+ bootstrap.setOption("tcpNoDelay", true);
+ bootstrap.setOption("keepAlive", true);
+
+ // Start the connection attempt to the input server host.
+ return bootstrap.connect(serverHost);
+ }
+
+ @Override
+ public void close(boolean wait) {
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ }
+ if (null == channel) {
+ return;
+ }
+ try {
+ getHChannelHandlerFromChannel(channel).closeExplicitly();
+ } catch (NoResponseHandlerException nrhe) {
+ logger.warn("No channel handler found for channel {} when closing it.",
+ channel);
+ }
+ if (wait) {
+ channel.close().awaitUninterruptibly();
+ } else {
+ channel.close();
+ }
+ channel = null;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[HChannel: host - ").append(host)
+ .append(", channel - ").append(channel)
+ .append(", pending reqs - ").append(pendingOps.size())
+ .append(", closed - ").append(closed).append("]");
+ return sb.toString();
+ }
+
+ @Override
+ public void close() {
+ close(false);
+ }
+
+ /**
+ * Helper static method to get the ResponseHandler instance from a Channel
+ * via the ChannelPipeline it is associated with. The assumption is that the
+ * last ChannelHandler tied to the ChannelPipeline is the ResponseHandler.
+ *
+ * @param channel
+ * Channel we are retrieving the ResponseHandler instance for
+ * @return ResponseHandler Instance tied to the Channel's Pipeline
+ */
+ public static HChannelHandler getHChannelHandlerFromChannel(Channel channel)
+ throws NoResponseHandlerException {
+ if (null == channel) {
+ throw new NoResponseHandlerException("Received a null value for the channel. Cannot retrieve the response handler");
+ }
+
+ HChannelHandler handler = (HChannelHandler) channel.getPipeline().getLast();
+ if (null == handler) {
+ throw new NoResponseHandlerException("Could not retrieve the response handler from the channel's pipeline.");
+ }
+ return handler;
+ }
+
+}
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.handlers.AbstractResponseHandler;
+import org.apache.hedwig.client.handlers.PublishResponseHandler;
+import org.apache.hedwig.client.handlers.UnsubscribeResponseHandler;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+
+public class NonSubscriptionChannelPipelineFactory extends ClientChannelPipelineFactory {
+
+ public NonSubscriptionChannelPipelineFactory(ClientConfiguration cfg,
+ AbstractHChannelManager channelManager) {
+ super(cfg, channelManager);
+ }
+
+ @Override
+ protected Map<OperationType, AbstractResponseHandler> createResponseHandlers() {
+ Map<OperationType, AbstractResponseHandler> handlers =
+ new HashMap<OperationType, AbstractResponseHandler>();
+ handlers.put(OperationType.PUBLISH,
+ new PublishResponseHandler(cfg, channelManager));
+ handlers.put(OperationType.UNSUBSCRIBE,
+ new UnsubscribeResponseHandler(cfg, channelManager));
+ return handlers;
+ }
+
+}
Copied: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java (from r1390401, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java?p2=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java&p1=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java&r1=1390401&r2=1390777&rev=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java Wed Sep 26 23:52:18 2012
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hedwig.client.netty;
+package org.apache.hedwig.client.netty.impl;
import java.net.InetSocketAddress;
import java.util.LinkedList;
@@ -27,8 +27,9 @@ import org.jboss.netty.channel.ChannelFu
import org.jboss.netty.channel.ChannelFutureListener;
import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.netty.HChannelManager;
+import org.apache.hedwig.client.netty.NetUtils;
import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
import org.apache.hedwig.util.HedwigSocketAddress;
@@ -38,38 +39,41 @@ public class WriteCallback implements Ch
// Private member variables
private PubSubData pubSubData;
- private final HedwigClientImpl client;
- private final ClientConfiguration cfg;
+ private final HChannelManager channelManager;
// Constructor
- public WriteCallback(PubSubData pubSubData, HedwigClientImpl client) {
+ public WriteCallback(PubSubData pubSubData,
+ HChannelManager channelManager) {
super();
this.pubSubData = pubSubData;
- this.client = client;
- this.cfg = client.getConfiguration();
+ this.channelManager = channelManager;
}
public void operationComplete(ChannelFuture future) throws Exception {
// If the client has stopped, there is no need to proceed
// with any callback logic here.
- if (client.hasStopped()) {
+ if (channelManager.isClosed()) {
future.getChannel().close();
return;
}
// When the write operation to the server is done, we just need to check
// if it was successful or not.
- InetSocketAddress host = HedwigClientImpl.getHostFromChannel(future.getChannel());
+ InetSocketAddress host = NetUtils.getHostFromChannel(future.getChannel());
if (!future.isSuccess()) {
- logger.error("Error writing on channel to host: " + host);
+ logger.error("Error writing on channel to host: {}", host);
// On a write failure for a PubSubRequest, we also want to remove
// the saved txnId to PubSubData in the ResponseHandler. These
// requests will not receive an ack response from the server
// so there is no point storing that information there anymore.
try {
- HedwigClientImpl.getResponseHandlerFromChannel(future.getChannel()).txn2PubSubData.remove(pubSubData.txnId);
+ HChannelHandler channelHandler =
+ HChannelImpl.getHChannelHandlerFromChannel(future.getChannel());
+ channelHandler.removeTxn(pubSubData.txnId);
+ channelHandler.closeExplicitly();
} catch (NoResponseHandlerException e) {
- // We just couldn't remove the transaction ID's mapping. The handler was null, so this has been reset anyway.
+ // We just couldn't remove the transaction ID's mapping.
+ // The handler was null, so this has been reset anyway.
logger.warn("Could not find response handler to remove txnId mapping to pubsub data. Ignoring.");
}
@@ -93,7 +97,7 @@ public class WriteCallback implements Ch
if (pubSubData.writeFailedServers == null)
pubSubData.writeFailedServers = new LinkedList<ByteString>();
pubSubData.writeFailedServers.add(hostString);
- client.doConnect(pubSubData, cfg.getDefaultServerHost());
+ channelManager.submitOpToDefaultServer(pubSubData);
}
} else {
// Now that the write to the server is done, we have to wait for it
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl.simple;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+
+import org.apache.hedwig.client.netty.CleanupChannelMap;
+import org.apache.hedwig.client.netty.HChannel;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
+import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
+import org.apache.hedwig.client.netty.impl.HChannelHandler;
+import org.apache.hedwig.client.netty.impl.HChannelImpl;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.filter.ClientMessageFilter;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
+
+/**
+ * Simple HChannel Manager which establish a connection for each subscription.
+ */
+public class SimpleHChannelManager extends AbstractHChannelManager {
+
+ private static Logger logger = LoggerFactory.getLogger(SimpleHChannelManager.class);
+
+ // Concurrent Map to store the cached Channel connections on the client side
+ // to a server host for a given Topic + SubscriberId combination. For each
+ // TopicSubscriber, we want a unique Channel connection to the server for
+ // it. We can also get the ResponseHandler tied to the Channel via the
+ // Channel Pipeline.
+ protected final CleanupChannelMap<TopicSubscriber> topicSubscriber2Channel;
+
+ // Concurrent Map to store Message handler for each topic + sub id combination.
+ // Store it here instead of in SubscriberResponseHandler as we don't want to lose the handler
+ // user set when connection is recovered
+ protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler
+ = new ConcurrentHashMap<TopicSubscriber, MessageHandler>();
+
+ // PipelineFactory to create subscription netty channels to the appropriate server
+ private final ClientChannelPipelineFactory subscriptionChannelPipelineFactory;
+
+ public SimpleHChannelManager(ClientConfiguration cfg,
+ ChannelFactory socketFactory) {
+ super(cfg, socketFactory);
+ topicSubscriber2Channel = new CleanupChannelMap<TopicSubscriber>();
+ this.subscriptionChannelPipelineFactory =
+ new SimpleSubscriptionChannelPipelineFactory(cfg, this);
+ }
+
+ @Override
+ protected ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory() {
+ return subscriptionChannelPipelineFactory;
+ }
+
+ @Override
+ protected HChannel createAndStoreSubscriptionChannel(Channel channel) {
+ // for simple channel, we don't store subscription channel now
+ // we store it until we received success response
+ InetSocketAddress host = NetUtils.getHostFromChannel(channel);
+ return new HChannelImpl(host, channel, this,
+ getSubscriptionChannelPipelineFactory());
+ }
+
+ @Override
+ protected HChannel createAndStoreSubscriptionChannel(InetSocketAddress host) {
+ // for simple channel, we don't store subscription channel now
+ // we store it until we received success response
+ return new HChannelImpl(host, this,
+ getSubscriptionChannelPipelineFactory());
+ }
+
+ protected HChannel storeSubscriptionChannel(TopicSubscriber topicSubscriber,
+ Channel channel) {
+ InetSocketAddress host = NetUtils.getHostFromChannel(channel);
+ HChannel newHChannel = new HChannelImpl(host, channel, this,
+ getSubscriptionChannelPipelineFactory());
+ return topicSubscriber2Channel.addChannel(topicSubscriber, newHChannel);
+ }
+
+ @Override
+ protected HChannel getSubscriptionChannel(InetSocketAddress host) {
+ return null;
+ }
+
+ @Override
+ protected HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber subscriber) {
+ HChannel channel = topicSubscriber2Channel.getChannel(subscriber);
+ if (null != channel) {
+ // there is no channel established for this subscription
+ return channel;
+ } else {
+ InetSocketAddress host = topic2Host.get(subscriber.getTopic());
+ if (null == host) {
+ return null;
+ } else {
+ channel = getSubscriptionChannel(host);
+ if (null == channel) {
+ channel = createAndStoreSubscriptionChannel(host);
+ }
+ return channel;
+ }
+ }
+ }
+
+ @Override
+ protected void onSubscriptionChannelDisconnected(InetSocketAddress host,
+ Channel channel) {
+ logger.info("Subscription Channel {} disconnected from {}.",
+ va(channel, host));
+ try {
+ // get hchannel handler
+ HChannelHandler channelHandler =
+ HChannelImpl.getHChannelHandlerFromChannel(channel);
+ channelHandler.getSubscribeResponseHandler()
+ .onChannelDisconnected(host, channel);
+ } catch (NoResponseHandlerException nrhe) {
+ logger.warn("No Channel Handler found for channel {} when it disconnected.",
+ channel);
+ }
+ }
+
+ @Override
+ public SubscribeResponseHandler getSubscribeResponseHandler(TopicSubscriber topicSubscriber) {
+ HChannel hChannel = topicSubscriber2Channel.getChannel(topicSubscriber);
+ if (null == hChannel) {
+ return null;
+ }
+ Channel channel = hChannel.getChannel();
+ if (null == channel) {
+ return null;
+ }
+ try {
+ HChannelHandler channelHandler =
+ HChannelImpl.getHChannelHandlerFromChannel(channel);
+ return channelHandler.getSubscribeResponseHandler();
+ } catch (NoResponseHandlerException nrhe) {
+ logger.warn("No Channel Handler found for channel {}, topic subscriber {}.",
+ channel, topicSubscriber);
+ return null;
+ }
+
+ }
+
+ @Override
+ public void startDelivery(TopicSubscriber topicSubscriber,
+ MessageHandler messageHandler)
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+ startDelivery(topicSubscriber, messageHandler, false);
+ }
+
+ protected void restartDelivery(TopicSubscriber topicSubscriber)
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+ startDelivery(topicSubscriber, null, true);
+ }
+
+ private void startDelivery(TopicSubscriber topicSubscriber,
+ MessageHandler messageHandler, boolean restart)
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+ // Make sure we know about this topic subscription on the client side
+ // exists. The assumption is that the client should have in memory the
+ // Channel created for the TopicSubscriber once the server has sent
+ // an ack response to the initial subscribe request.
+ SubscribeResponseHandler subscribeResponseHandler =
+ getSubscribeResponseHandler(topicSubscriber);
+ if (null == subscribeResponseHandler ||
+ !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
+ logger.error("Client is not yet subscribed to {}.", topicSubscriber);
+ throw new ClientNotSubscribedException("Client is not yet subscribed to "
+ + topicSubscriber);
+ }
+
+ MessageHandler existedMsgHandler = topicSubscriber2MessageHandler.get(topicSubscriber);
+ if (restart) {
+ // restart using existing msg handler
+ messageHandler = existedMsgHandler;
+ } else {
+ // some has started delivery but not stop it
+ if (null != existedMsgHandler) {
+ throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber);
+ }
+ if (messageHandler != null) {
+ if (null != topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) {
+ throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber);
+ }
+ }
+ }
+
+ // tell subscribe response handler to start delivering messages for topicSubscriber
+ subscribeResponseHandler.startDelivery(topicSubscriber, messageHandler);
+ }
+
+ public void stopDelivery(TopicSubscriber topicSubscriber)
+ throws ClientNotSubscribedException {
+ // Make sure we know that this topic subscription on the client side
+ // exists. The assumption is that the client should have in memory the
+ // Channel created for the TopicSubscriber once the server has sent
+ // an ack response to the initial subscribe request.
+ SubscribeResponseHandler subscribeResponseHandler =
+ getSubscribeResponseHandler(topicSubscriber);
+ if (null == subscribeResponseHandler ||
+ !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
+ logger.error("Client is not yet subscribed to {}.", topicSubscriber);
+ throw new ClientNotSubscribedException("Client is not yet subscribed to "
+ + topicSubscriber);
+ }
+
+ // tell subscribe response handler to stop delivering messages for a given topic subscriber
+ topicSubscriber2MessageHandler.remove(topicSubscriber);
+ subscribeResponseHandler.stopDelivery(topicSubscriber);
+ }
+
+
+ @Override
+ public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
+ final Callback<ResponseBody> callback,
+ final Object context) {
+ HChannel hChannel = topicSubscriber2Channel.removeChannel(topicSubscriber);
+ if (null == hChannel) {
+ logger.warn("Trying to close a subscription when we don't have a subscribe channel cached for {}",
+ topicSubscriber);
+ callback.operationFinished(context, (ResponseBody)null);
+ return;
+ }
+
+ Channel channel = hChannel.getChannel();
+ if (null == channel) {
+ callback.operationFinished(context, (ResponseBody)null);
+ return;
+ }
+
+ try {
+ HChannelImpl.getHChannelHandlerFromChannel(channel).closeExplicitly();
+ } catch (NoResponseHandlerException nrhe) {
+ logger.warn("No Channel Handler found when closing {}'s channel {}.",
+ channel, topicSubscriber);
+ }
+ ChannelFuture future = channel.close();
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.error("Failed to close the subscription channel for {}",
+ topicSubscriber);
+ callback.operationFailed(context, new ServiceDownException(
+ "Failed to close the subscription channel for " + topicSubscriber));
+ } else {
+ callback.operationFinished(context, (ResponseBody)null);
+ }
+ }
+ });
+ }
+
+ @Override
+ protected void checkTimeoutRequestsOnSubscriptionChannels() {
+ // timeout task may be started before constructing topicSubscriber2Channel
+ if (null == topicSubscriber2Channel) {
+ return;
+ }
+ for (HChannel channel : topicSubscriber2Channel.getChannels()) {
+ try {
+ HChannelHandler channelHandler =
+ HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel());
+ channelHandler.checkTimeoutRequests();
+ } catch (NoResponseHandlerException nrhe) {
+ continue;
+ }
+ }
+ }
+
+ @Override
+ protected void closeSubscriptionChannels() {
+ topicSubscriber2Channel.close();
+ }
+}