You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/08/19 23:25:22 UTC
svn commit: r987314 [6/16] - in /hadoop/zookeeper/trunk: ./
src/contrib/hedwig/ src/contrib/hedwig/client/
src/contrib/hedwig/client/src/ src/contrib/hedwig/client/src/main/
src/contrib/hedwig/client/src/main/cpp/
src/contrib/hedwig/client/src/main/cpp...
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.handlers.MessageConsumeCallback;
+import org.apache.hedwig.client.ssl.SslClientContextFactory;
+import org.apache.hedwig.exceptions.PubSubException.UncertainStateException;
+
+/**
+ * This is a top level Hedwig Client class that encapsulates the common
+ * functionality needed for both Publish and Subscribe operations.
+ *
+ */
+public class HedwigClient {
+
+ private static final Logger logger = Logger.getLogger(HedwigClient.class);
+
+ // Global counter used for generating unique transaction ID's for
+ // publish and subscribe requests
+ protected final AtomicLong globalCounter = new AtomicLong();
+ // Static String constants
+ protected static final String COLON = ":";
+
+ // The Netty socket factory for making connections to the server.
+ protected final ChannelFactory socketFactory;
+ // Whether the socket factory is one we created or is owned by whoever
+ // instantiated us.
+ protected boolean ownChannelFactory = false;
+
+ // PipelineFactory to create netty client channels to the appropriate server
+ private ClientChannelPipelineFactory pipelineFactory;
+
+ // Concurrent Map to store the mapping from the Topic to the Host.
+ // This could change over time since servers can drop mastership of topics
+ // for load balancing or failover. If a server host ever goes down, we'd
+ // also want to remove all topic mappings the host was responsible for.
+ // The second Map is used as the inverted version of the first one.
+ protected final ConcurrentMap<ByteString, InetSocketAddress> topic2Host = new ConcurrentHashMap<ByteString, InetSocketAddress>();
+ private final ConcurrentMap<InetSocketAddress, List<ByteString>> host2Topics = new ConcurrentHashMap<InetSocketAddress, List<ByteString>>();
+
+ // Each client instantiation will have a Timer for running recurring
+ // threads. One such timer task thread to is to timeout long running
+ // PubSubRequests that are waiting for an ack response from the server.
+ private final Timer clientTimer = new Timer(true);
+
+ // Boolean indicating if the client is running or has stopped.
+ // Once we stop the client, we should sidestep all of the connect,
+ // write callback and channel disconnected logic.
+ private boolean isStopped = false;
+
+ private HedwigSubscriber sub;
+ private final HedwigPublisher pub;
+ private final ClientConfiguration cfg;
+ private final MessageConsumeCallback consumeCb;
+ private SslClientContextFactory sslFactory = null;
+
+ // Base constructor that takes in a Configuration object.
+ // This will create its own client socket channel factory.
+ public HedwigClient(ClientConfiguration cfg) {
+ this(cfg, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
+ ownChannelFactory = true;
+ }
+
+ // Constructor that takes in a Configuration object and a ChannelFactory
+ // that has already been instantiated by the caller.
+ public HedwigClient(ClientConfiguration cfg, ChannelFactory socketFactory) {
+ this.cfg = cfg;
+ this.socketFactory = socketFactory;
+ pub = new HedwigPublisher(this);
+ sub = new HedwigSubscriber(this);
+ pipelineFactory = new ClientChannelPipelineFactory(this);
+ consumeCb = new MessageConsumeCallback(this);
+ if (cfg.isSSLEnabled()) {
+ sslFactory = new SslClientContextFactory(cfg);
+ }
+ // Schedule all of the client timer tasks. Currently we only have the
+ // Request Timeout task.
+ clientTimer.schedule(new PubSubRequestTimeoutTask(), 0, cfg.getTimeoutThreadRunInterval());
+ }
+
+ // Public getters for the various components of a client.
+ public ClientConfiguration getConfiguration() {
+ return cfg;
+ }
+
+ public HedwigSubscriber getSubscriber() {
+ return sub;
+ }
+
+ // Protected method to set the subscriber. This is needed currently for hub
+ // versions of the client subscriber.
+ protected void setSubscriber(HedwigSubscriber sub) {
+ this.sub = sub;
+ }
+
+ public HedwigPublisher getPublisher() {
+ return pub;
+ }
+
+ public MessageConsumeCallback getConsumeCallback() {
+ return consumeCb;
+ }
+
+ public SslClientContextFactory getSslFactory() {
+ return sslFactory;
+ }
+
+ // We need to deal with the possible problem of a PubSub request being
+ // written to successfully to the server host but for some reason, the
+ // ack message back never comes. What could happen is that the VoidCallback
+ // stored in the ResponseHandler.txn2PublishData map will never be called.
+ // We should have a configured timeout so if that passes from the time a
+ // write was successfully done to the server, we can fail this async PubSub
+ // transaction. The caller could possibly redo the transaction if needed at
+ // a later time. Creating a timeout cleaner TimerTask to do this here.
+ class PubSubRequestTimeoutTask extends TimerTask {
+ /**
+ * Implement the TimerTask's abstract run method.
+ */
+ @Override
+ public void run() {
+ if (logger.isDebugEnabled())
+ logger.debug("Running the PubSubRequest Timeout Task");
+ // Loop through all outstanding PubSubData requests and check if
+ // the requestWriteTime has timed out compared to the current time.
+ long curTime = System.currentTimeMillis();
+ long timeoutInterval = cfg.getServerAckResponseTimeout();
+
+ // First check the ResponseHandlers associated with cached
+ // channels in HedwigPublisher.host2Channel. This stores the
+ // channels used for Publish and Unsubscribe requests.
+ for (Channel channel : pub.host2Channel.values()) {
+ ResponseHandler responseHandler = getResponseHandlerFromChannel(channel);
+ for (PubSubData pubSubData : responseHandler.txn2PubSubData.values()) {
+ checkPubSubDataToTimeOut(pubSubData, responseHandler, curTime, timeoutInterval);
+ }
+ }
+ // Now do the same for the cached channels in
+ // HedwigSubscriber.topicSubscriber2Channel. This stores the
+ // channels used exclusively for Subscribe requests.
+ for (Channel channel : sub.topicSubscriber2Channel.values()) {
+ ResponseHandler responseHandler = getResponseHandlerFromChannel(channel);
+ for (PubSubData pubSubData : responseHandler.txn2PubSubData.values()) {
+ checkPubSubDataToTimeOut(pubSubData, responseHandler, curTime, timeoutInterval);
+ }
+ }
+ }
+
+ private void checkPubSubDataToTimeOut(PubSubData pubSubData, ResponseHandler responseHandler, long curTime,
+ long timeoutInterval) {
+ if (curTime > pubSubData.requestWriteTime + timeoutInterval) {
+ // Current PubSubRequest has timed out so remove it from the
+ // ResponseHandler's map and invoke the VoidCallback's
+ // operationFailed method.
+ logger.error("Current PubSubRequest has timed out for pubSubData: " + pubSubData);
+ responseHandler.txn2PubSubData.remove(pubSubData.txnId);
+ pubSubData.callback.operationFailed(pubSubData.context, new UncertainStateException(
+ "Server ack response never received so PubSubRequest has timed out!"));
+ }
+ }
+ }
+
+ // When we are done with the client, this is a clean way to gracefully close
+ // all channels/sockets created by the client and to also release all
+ // resources used by netty.
+ public void stop() {
+ logger.info("Stopping the client!");
+ // Set the client boolean flag to indicate the client has stopped.
+ isStopped = true;
+ // Stop the timer and all timer task threads.
+ clientTimer.cancel();
+ // Close all of the open Channels.
+ for (Channel channel : pub.host2Channel.values()) {
+ getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
+ channel.close().awaitUninterruptibly();
+ }
+ for (Channel channel : sub.topicSubscriber2Channel.values()) {
+ getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
+ channel.close().awaitUninterruptibly();
+ }
+ // Clear out all Maps.
+ topic2Host.clear();
+ host2Topics.clear();
+ pub.host2Channel.clear();
+ sub.topicSubscriber2Channel.clear();
+ // Release resources used by the ChannelFactory on the client if we are
+ // the owner that created it.
+ if (ownChannelFactory) {
+ socketFactory.releaseExternalResources();
+ }
+ logger.info("Completed stopping the client!");
+ }
+
+ /**
+ * This is a helper method to do the connect attempt to the server given the
+ * inputted host/port. This can be used to connect to the default server
+ * host/port which is the VIP. That will pick a server in the cluster at
+ * random to connect to for the initial PubSub attempt (with redirect logic
+ * being done at the server side). Additionally, this could be called after
+ * the client makes an initial PubSub attempt at a server, and is redirected
+ * to the one that is responsible for the topic. Once the connect to the
+ * server is done, we will perform the corresponding PubSub write on that
+ * channel.
+ *
+ * @param pubSubData
+ * PubSub call's data wrapper object
+ * @param serverHost
+ * Input server host to connect to of type InetSocketAddress
+ */
+ public void doConnect(PubSubData pubSubData, InetSocketAddress serverHost) {
+ if (logger.isDebugEnabled())
+ logger.debug("Connecting to host: " + serverHost + " with pubSubData: " + pubSubData);
+ // Set up the ClientBootStrap so we can create a new Channel connection
+ // to the server.
+ ClientBootstrap bootstrap = new ClientBootstrap(socketFactory);
+ bootstrap.setPipelineFactory(pipelineFactory);
+ bootstrap.setOption("tcpNoDelay", true);
+ bootstrap.setOption("keepAlive", true);
+
+ // Start the connection attempt to the input server host.
+ ChannelFuture future = bootstrap.connect(serverHost);
+ future.addListener(new ConnectCallback(pubSubData, serverHost, this));
+ }
+
+ /**
+ * Helper method to store the topic2Host mapping in the HedwigClient cache
+ * map. This method is assumed to be called when we've done a successful
+ * connection to the correct server topic master.
+ *
+ * @param pubSubData
+ * PubSub wrapper data
+ * @param channel
+ * Netty Channel
+ */
+ protected void storeTopic2HostMapping(PubSubData pubSubData, Channel channel) {
+ // Retrieve the server host that we've connected to and store the
+ // mapping from the topic to this host. For all other non-redirected
+ // server statuses, we consider that as a successful connection to the
+ // correct topic master.
+ InetSocketAddress host = getHostFromChannel(channel);
+ if (topic2Host.containsKey(pubSubData.topic) && topic2Host.get(pubSubData.topic).equals(host)) {
+ // Entry in map exists for the topic but it is the same as the
+ // current host. In this case there is nothing to do.
+ return;
+ }
+
+ // Store the relevant mappings for this topic and host combination.
+ if (logger.isDebugEnabled())
+ logger.debug("Storing info for topic: " + pubSubData.topic.toStringUtf8() + ", old host: "
+ + topic2Host.get(pubSubData.topic) + ", new host: " + host);
+ topic2Host.put(pubSubData.topic, host);
+ if (host2Topics.containsKey(host)) {
+ host2Topics.get(host).add(pubSubData.topic);
+ } else {
+ LinkedList<ByteString> topicsList = new LinkedList<ByteString>();
+ topicsList.add(pubSubData.topic);
+ host2Topics.put(host, topicsList);
+ }
+ }
+
+ /**
+ * Helper static method to get the String Hostname:Port from a netty
+ * Channel. Assumption is that the netty Channel was originally created with
+ * an InetSocketAddress. This is true with the Hedwig netty implementation.
+ *
+ * @param channel
+ * Netty channel to extract the hostname and port from.
+ * @return String representation of the Hostname:Port from the Netty Channel
+ */
+ public static InetSocketAddress getHostFromChannel(Channel channel) {
+ return (InetSocketAddress) channel.getRemoteAddress();
+ }
+
+ /**
+ * Helper static method to get the ResponseHandler instance from a Channel
+ * via the ChannelPipeline it is associated with. The assumption is that the
+ * last ChannelHandler tied to the ChannelPipeline is the ResponseHandler.
+ *
+ * @param channel
+ * Channel we are retrieving the ResponseHandler instance for
+ * @return ResponseHandler Instance tied to the Channel's Pipeline
+ */
+ public static ResponseHandler getResponseHandlerFromChannel(Channel channel) {
+ return (ResponseHandler) channel.getPipeline().getLast();
+ }
+
+ // Public getter for entries in the topic2Host Map.
+ public InetSocketAddress getHostForTopic(ByteString topic) {
+ return topic2Host.get(topic);
+ }
+
+ // If a server host goes down or the channel to it gets disconnected,
+ // we want to clear out all relevant cached information. We'll
+ // need to remove all of the topic mappings that the host was
+ // responsible for.
+ public void clearAllTopicsForHost(InetSocketAddress host) {
+ if (logger.isDebugEnabled())
+ logger.debug("Clearing all topics for host: " + host);
+ // For each of the topics that the host was responsible for,
+ // remove it from the topic2Host mapping.
+ if (host2Topics.containsKey(host)) {
+ for (ByteString topic : host2Topics.get(host)) {
+ if (logger.isDebugEnabled())
+ logger.debug("Removing mapping for topic: " + topic.toStringUtf8() + " from host: " + host);
+ topic2Host.remove(topic);
+ }
+ // Now it is safe to remove the host2Topics mapping entry.
+ host2Topics.remove(host);
+ }
+ }
+
+ // Public getter to see if the client has been stopped.
+ public boolean hasStopped() {
+ return isStopped;
+ }
+
+ // Public getter to get the client's Timer object.
+ // This is so we can reuse this and not have to create multiple Timer
+ // objects.
+ public Timer getClientTimer() {
+ return clientTimer;
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.handlers.PubSubCallback;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PublishRequest;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * This is the Hedwig Netty specific implementation of the Publisher interface.
+ *
+ */
+public class HedwigPublisher implements Publisher {
+
+ private static Logger logger = Logger.getLogger(HedwigPublisher.class);
+
+ // Concurrent Map to store the mappings for a given Host (Hostname:Port) to
+ // the Channel that has been established for it previously. This channel
+ // will be used whenever we publish on a topic that the server is the master
+ // of currently. The channels used here will only be used for publish and
+ // unsubscribe requests.
+ protected final ConcurrentMap<InetSocketAddress, Channel> host2Channel = new ConcurrentHashMap<InetSocketAddress, Channel>();
+
+ private final HedwigClient client;
+ private final ClientConfiguration cfg;
+
+ protected HedwigPublisher(HedwigClient client) {
+ this.client = client;
+ this.cfg = client.getConfiguration();
+ }
+
+ public void publish(ByteString topic, Message msg) throws CouldNotConnectException, ServiceDownException {
+ if (logger.isDebugEnabled())
+ logger.debug("Calling a sync publish for topic: " + topic.toStringUtf8() + ", msg: " + msg);
+ PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null, null, null);
+ synchronized (pubSubData) {
+ PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
+ asyncPublish(topic, msg, pubSubCallback, null);
+ try {
+ while (!pubSubData.isDone)
+ pubSubData.wait();
+ } catch (InterruptedException e) {
+ throw new ServiceDownException("Interrupted Exception while waiting for async publish call");
+ }
+ // Check from the PubSubCallback if it was successful or not.
+ if (!pubSubCallback.getIsCallSuccessful()) {
+ // See what the exception was that was thrown when the operation
+ // failed.
+ PubSubException failureException = pubSubCallback.getFailureException();
+ if (failureException == null) {
+ // This should not happen as the operation failed but a null
+ // PubSubException was passed. Log a warning message but
+ // throw a generic ServiceDownException.
+ logger.error("Sync Publish operation failed but no PubSubException was passed!");
+ throw new ServiceDownException("Server ack response to publish request is not successful");
+ }
+ // For the expected exceptions that could occur, just rethrow
+ // them.
+ else if (failureException instanceof CouldNotConnectException) {
+ throw (CouldNotConnectException) failureException;
+ } else if (failureException instanceof ServiceDownException) {
+ throw (ServiceDownException) failureException;
+ } else {
+ // For other types of PubSubExceptions, just throw a generic
+ // ServiceDownException but log a warning message.
+ logger.error("Unexpected exception type when a sync publish operation failed: " + failureException);
+ throw new ServiceDownException("Server ack response to publish request is not successful");
+ }
+ }
+ }
+ }
+
+ public void asyncPublish(ByteString topic, Message msg, Callback<Void> callback, Object context) {
+ if (logger.isDebugEnabled())
+ logger.debug("Calling an async publish for topic: " + topic.toStringUtf8() + ", msg: " + msg);
+ // Check if we already have a Channel connection set up to the server
+ // for the given Topic.
+ PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null, callback, context);
+ if (client.topic2Host.containsKey(topic)) {
+ InetSocketAddress host = client.topic2Host.get(topic);
+ if (host2Channel.containsKey(host)) {
+ // We already have the Channel connection for the server host so
+ // do the publish directly. We will deal with redirect logic
+ // later on if that server is no longer the current host for
+ // the topic.
+ doPublish(pubSubData, host2Channel.get(host));
+ } else {
+ // We have a mapping for the topic to host but don't have a
+ // Channel for that server. This can happen if the Channel
+ // is disconnected for some reason. Do the connect then to
+ // the specified server host to create a new Channel connection.
+ client.doConnect(pubSubData, host);
+ }
+ } else {
+ // Server host for the given topic is not known yet so use the
+ // default server host/port as defined in the configs. This should
+ // point to the server VIP which would redirect to a random server
+ // (which might not be the server hosting the topic).
+ client.doConnect(pubSubData, cfg.getDefaultServerHost());
+ }
+ }
+
+ /**
+ * This is a helper method to write the actual publish message once the
+ * client is connected to the server and a Channel is available.
+ *
+ * @param pubSubData
+ * Publish call's data wrapper object
+ * @param channel
+ * Netty I/O channel for communication between the client and
+ * server
+ */
+ protected void doPublish(PubSubData pubSubData, Channel channel) {
+ // Create a PubSubRequest
+ PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
+ pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
+ pubsubRequestBuilder.setType(OperationType.PUBLISH);
+ if (pubSubData.triedServers != null && pubSubData.triedServers.size() > 0) {
+ pubsubRequestBuilder.addAllTriedServers(pubSubData.triedServers);
+ }
+ long txnId = client.globalCounter.incrementAndGet();
+ pubsubRequestBuilder.setTxnId(txnId);
+ pubsubRequestBuilder.setShouldClaim(pubSubData.shouldClaim);
+ pubsubRequestBuilder.setTopic(pubSubData.topic);
+
+ // Now create the PublishRequest
+ PublishRequest.Builder publishRequestBuilder = PublishRequest.newBuilder();
+
+ publishRequestBuilder.setMsg(pubSubData.msg);
+
+ // Set the PublishRequest into the outer PubSubRequest
+ pubsubRequestBuilder.setPublishRequest(publishRequestBuilder);
+
+ // Update the PubSubData with the txnId and the requestWriteTime
+ pubSubData.txnId = txnId;
+ pubSubData.requestWriteTime = System.currentTimeMillis();
+
+ // Before we do the write, store this information into the
+ // ResponseHandler so when the server responds, we know what
+ // appropriate Callback Data to invoke for the given txn ID.
+ HedwigClient.getResponseHandlerFromChannel(channel).txn2PubSubData.put(txnId, pubSubData);
+
+ // Finally, write the Publish request through the Channel.
+ if (logger.isDebugEnabled())
+ logger.debug("Writing a Publish request to host: " + HedwigClient.getHostFromChannel(channel)
+ + " for pubSubData: " + pubSubData);
+ ChannelFuture future = channel.write(pubsubRequestBuilder.build());
+ future.addListener(new WriteCallback(pubSubData, client));
+ }
+
+ // Synchronized method to store the host2Channel mapping (if it doesn't
+ // exist yet). Retrieve the hostname info from the Channel created via the
+ // RemoteAddress tied to it.
+ protected synchronized void storeHost2ChannelMapping(Channel channel) {
+ InetSocketAddress host = HedwigClient.getHostFromChannel(channel);
+ if (!host2Channel.containsKey(host)) {
+ if (logger.isDebugEnabled())
+ logger.debug("Storing a new Channel mapping for host: " + host);
+ host2Channel.put(host, channel);
+ } else {
+ // If we've reached here, that means we already have a Channel
+ // mapping for the given host. This should ideally not happen
+ // and it means we are creating another Channel to a server host
+ // to publish on when we could have used an existing one. This could
+ // happen due to a race condition if initially multiple concurrent
+ // threads are publishing on the same topic and no Channel exists
+ // currently to the server. We are not synchronizing this initial
+ // creation of Channels to a given host for performance.
+ // Another possible way to have redundant Channels created is if
+ // a new topic is being published to, we connect to the default
+ // server host which should be a VIP that redirects to a "real"
+ // server host. Since we don't know beforehand what is the full
+ // set of server hosts, we could be redirected to a server that
+ // we already have a channel connection to from a prior existing
+ // topic. Close these redundant channels as they won't be used.
+ if (logger.isDebugEnabled())
+ logger.debug("Channel mapping to host: " + host + " already exists so no need to store it.");
+ HedwigClient.getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
+ channel.close();
+ }
+ }
+
+ // Public getter for entries in the host2Channel Map.
+ // This is used for classes that need this information but are not in the
+ // same classpath.
+ public Channel getChannelForHost(InetSocketAddress host) {
+ return host2Channel.get(host);
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,585 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
+import org.apache.hedwig.client.handlers.PubSubCallback;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * This is the Hedwig Netty specific implementation of the Subscriber interface.
+ *
+ */
+public class HedwigSubscriber implements Subscriber {
+
+ private static Logger logger = Logger.getLogger(HedwigSubscriber.class);
+
+ // Concurrent Map to store the cached Channel connections on the client side
+ // to a server host for a given Topic + SubscriberId combination. For each
+ // TopicSubscriber, we want a unique Channel connection to the server for
+ // it. We can also get the ResponseHandler tied to the Channel via the
+ // Channel Pipeline.
+ protected final ConcurrentMap<TopicSubscriber, Channel> topicSubscriber2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
+
+ protected final HedwigClient client;
+ protected final ClientConfiguration cfg;
+
+ public HedwigSubscriber(HedwigClient client) {
+ this.client = client;
+ this.cfg = client.getConfiguration();
+ }
+
+ // Private method that holds the common logic for doing synchronous
+ // Subscribe or Unsubscribe requests. This is for code reuse since these
+ // two flows are very similar. The assumption is that the input
+ // OperationType is either SUBSCRIBE or UNSUBSCRIBE.
+ private void subUnsub(ByteString topic, ByteString subscriberId, OperationType operationType,
+ CreateOrAttach createOrAttach) throws CouldNotConnectException, ClientAlreadySubscribedException,
+ ClientNotSubscribedException, ServiceDownException {
+ if (logger.isDebugEnabled())
+ logger.debug("Calling a sync subUnsub request for topic: " + topic.toStringUtf8() + ", subscriberId: "
+ + subscriberId.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: "
+ + createOrAttach);
+ PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, createOrAttach, null, null);
+ synchronized (pubSubData) {
+ PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
+ asyncSubUnsub(topic, subscriberId, pubSubCallback, null, operationType, createOrAttach);
+ try {
+ while (!pubSubData.isDone)
+ pubSubData.wait();
+ } catch (InterruptedException e) {
+ throw new ServiceDownException("Interrupted Exception while waiting for async subUnsub call");
+ }
+ // Check from the PubSubCallback if it was successful or not.
+ if (!pubSubCallback.getIsCallSuccessful()) {
+ // See what the exception was that was thrown when the operation
+ // failed.
+ PubSubException failureException = pubSubCallback.getFailureException();
+ if (failureException == null) {
+ // This should not happen as the operation failed but a null
+ // PubSubException was passed. Log a warning message but
+ // throw a generic ServiceDownException.
+ logger.error("Sync SubUnsub operation failed but no PubSubException was passed!");
+ throw new ServiceDownException("Server ack response to SubUnsub request is not successful");
+ }
+ // For the expected exceptions that could occur, just rethrow
+ // them.
+ else if (failureException instanceof CouldNotConnectException)
+ throw (CouldNotConnectException) failureException;
+ else if (failureException instanceof ClientAlreadySubscribedException)
+ throw (ClientAlreadySubscribedException) failureException;
+ else if (failureException instanceof ClientNotSubscribedException)
+ throw (ClientNotSubscribedException) failureException;
+ else if (failureException instanceof ServiceDownException)
+ throw (ServiceDownException) failureException;
+ else {
+ logger.error("Unexpected PubSubException thrown: " + failureException.toString());
+ // Throw a generic ServiceDownException but wrap the
+ // original PubSubException within it.
+ throw new ServiceDownException(failureException);
+ }
+ }
+ }
+ }
+
+ // Private method that holds the common logic for doing asynchronous
+ // Subscribe or Unsubscribe requests. This is for code reuse since these two
+ // flows are very similar. The assumption is that the input OperationType is
+ // either SUBSCRIBE or UNSUBSCRIBE.
+ private void asyncSubUnsub(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object context,
+ OperationType operationType, CreateOrAttach createOrAttach) {
+ if (logger.isDebugEnabled())
+ logger.debug("Calling an async subUnsub request for topic: " + topic.toStringUtf8() + ", subscriberId: "
+ + subscriberId.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: "
+ + createOrAttach);
+ // Check if we know which server host is the master for the topic we are
+ // subscribing to.
+ PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, createOrAttach, callback,
+ context);
+ if (client.topic2Host.containsKey(topic)) {
+ InetSocketAddress host = client.topic2Host.get(topic);
+ if (operationType.equals(OperationType.UNSUBSCRIBE) && client.getPublisher().host2Channel.containsKey(host)) {
+ // For unsubscribes, we can reuse the channel connections to the
+ // server host that are cached for publishes. For publish and
+ // unsubscribe flows, we will thus use the same Channels and
+ // will cache and store them during the ConnectCallback.
+ doSubUnsub(pubSubData, client.getPublisher().host2Channel.get(host));
+ } else {
+ // We know which server host is the master for the topic so
+ // connect to that first. For subscribes, we want a new channel
+ // connection each time for the TopicSubscriber. If the
+ // TopicSubscriber is already connected and subscribed,
+ // we assume the server will respond with an appropriate status
+ // indicating this. For unsubscribes, it is possible that the
+ // client is subscribed to the topic already but does not
+ // have a Channel connection yet to the server host. e.g. Client
+ // goes down and comes back up but client side soft state memory
+ // does not have the netty Channel connection anymore.
+ client.doConnect(pubSubData, host);
+ }
+ } else {
+ // Server host for the given topic is not known yet so use the
+ // default server host/port as defined in the configs. This should
+ // point to the server VIP which would redirect to a random server
+ // (which might not be the server hosting the topic).
+ client.doConnect(pubSubData, cfg.getDefaultServerHost());
+ }
+ }
+
+ public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
+ throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
+ InvalidSubscriberIdException {
+ subscribe(topic, subscriberId, mode, false);
+ }
+
+ protected void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, boolean isHub)
+ throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
+ InvalidSubscriberIdException {
+ // Validate that the format of the subscriberId is valid either as a
+ // local or hub subscriber.
+ if (!isValidSubscriberId(subscriberId, isHub)) {
+ throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8()
+ + ", isHub: " + isHub);
+ }
+ try {
+ subUnsub(topic, subscriberId, OperationType.SUBSCRIBE, mode);
+ } catch (ClientNotSubscribedException e) {
+ logger.error("Unexpected Exception thrown: " + e.toString());
+ // This exception should never be thrown here. But just in case,
+ // throw a generic ServiceDownException but wrap the original
+ // Exception within it.
+ throw new ServiceDownException(e);
+ }
+ }
+
+ public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
+ Object context) {
+ asyncSubscribe(topic, subscriberId, mode, callback, context, false);
+ }
+
+ protected void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode,
+ Callback<Void> callback, Object context, boolean isHub) {
+ // Validate that the format of the subscriberId is valid either as a
+ // local or hub subscriber.
+ if (!isValidSubscriberId(subscriberId, isHub)) {
+ callback.operationFailed(context, new ServiceDownException(new InvalidSubscriberIdException(
+ "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
+ return;
+ }
+ asyncSubUnsub(topic, subscriberId, callback, context, OperationType.SUBSCRIBE, mode);
+ }
+
+ public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
+ ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException {
+ unsubscribe(topic, subscriberId, false);
+ }
+
+ protected void unsubscribe(ByteString topic, ByteString subscriberId, boolean isHub)
+ throws CouldNotConnectException, ClientNotSubscribedException, ServiceDownException,
+ InvalidSubscriberIdException {
+ // Validate that the format of the subscriberId is valid either as a
+ // local or hub subscriber.
+ if (!isValidSubscriberId(subscriberId, isHub)) {
+ throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8()
+ + ", isHub: " + isHub);
+ }
+ // Synchronously close the subscription on the client side. Even
+ // if the unsubscribe request to the server errors out, we won't be
+ // delivering messages for this subscription to the client. The client
+ // can later retry the unsubscribe request to the server so they are
+ // "fully" unsubscribed from the given topic.
+ closeSubscription(topic, subscriberId);
+ try {
+ subUnsub(topic, subscriberId, OperationType.UNSUBSCRIBE, null);
+ } catch (ClientAlreadySubscribedException e) {
+ logger.error("Unexpected Exception thrown: " + e.toString());
+ // This exception should never be thrown here. But just in case,
+ // throw a generic ServiceDownException but wrap the original
+ // Exception within it.
+ throw new ServiceDownException(e);
+ }
+ }
+
+ public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId, final Callback<Void> callback,
+ final Object context) {
+ asyncUnsubscribe(topic, subscriberId, callback, context, false);
+ }
+
+ protected void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
+ final Callback<Void> callback, final Object context, boolean isHub) {
+ // Validate that the format of the subscriberId is valid either as a
+ // local or hub subscriber.
+ if (!isValidSubscriberId(subscriberId, isHub)) {
+ callback.operationFailed(context, new ServiceDownException(new InvalidSubscriberIdException(
+ "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
+ return;
+ }
+ // Asynchronously close the subscription. On the callback to that
+ // operation once it completes, post the async unsubscribe request.
+ asyncCloseSubscription(topic, subscriberId, new Callback<Void>() {
+ @Override
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ asyncSubUnsub(topic, subscriberId, callback, context, OperationType.UNSUBSCRIBE, null);
+ }
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ callback.operationFailed(context, exception);
+ }
+ }, null);
+ }
+
+ // This is a helper method to determine if a subscriberId is valid as either
+ // a hub or local subscriber
+ private boolean isValidSubscriberId(ByteString subscriberId, boolean isHub) {
+ if ((isHub && !SubscriptionStateUtils.isHubSubscriber(subscriberId))
+ || (!isHub && SubscriptionStateUtils.isHubSubscriber(subscriberId)))
+ return false;
+ else
+ return true;
+ }
+
+ public void consume(ByteString topic, ByteString subscriberId, MessageSeqId messageSeqId)
+ throws ClientNotSubscribedException {
+ if (logger.isDebugEnabled())
+ logger.debug("Calling consume for topic: " + topic.toStringUtf8() + ", subscriberId: "
+ + subscriberId.toStringUtf8() + ", messageSeqId: " + messageSeqId);
+ TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
+ // Check that this topic subscription on the client side exists.
+ if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
+ throw new ClientNotSubscribedException(
+ "Cannot send consume message since client is not subscribed to topic: " + topic.toStringUtf8()
+ + ", subscriberId: " + subscriberId.toStringUtf8());
+ }
+ PubSubData pubSubData = new PubSubData(topic, null, subscriberId, OperationType.CONSUME, null, null, null);
+ // Send the consume message to the server using the same subscribe
+ // channel that the topic subscription uses.
+ doConsume(pubSubData, topicSubscriber2Channel.get(topicSubscriber), messageSeqId);
+ }
+
+ /**
+ * This is a helper method to write the actual subscribe/unsubscribe message
+ * once the client is connected to the server and a Channel is available.
+ *
+ * @param pubSubData
+ * Subscribe/Unsubscribe call's data wrapper object. We assume
+ * that the operationType field is either SUBSCRIBE or
+ * UNSUBSCRIBE.
+ * @param channel
+ * Netty I/O channel for communication between the client and
+ * server
+ */
+ protected void doSubUnsub(PubSubData pubSubData, Channel channel) {
+ // Create a PubSubRequest
+ PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
+ pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
+ pubsubRequestBuilder.setType(pubSubData.operationType);
+ if (pubSubData.triedServers != null && pubSubData.triedServers.size() > 0) {
+ pubsubRequestBuilder.addAllTriedServers(pubSubData.triedServers);
+ }
+ long txnId = client.globalCounter.incrementAndGet();
+ pubsubRequestBuilder.setTxnId(txnId);
+ pubsubRequestBuilder.setShouldClaim(pubSubData.shouldClaim);
+ pubsubRequestBuilder.setTopic(pubSubData.topic);
+
+ // Create either the Subscribe or Unsubscribe Request
+ if (pubSubData.operationType.equals(OperationType.SUBSCRIBE)) {
+ // Create the SubscribeRequest
+ SubscribeRequest.Builder subscribeRequestBuilder = SubscribeRequest.newBuilder();
+ subscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
+ subscribeRequestBuilder.setCreateOrAttach(pubSubData.createOrAttach);
+ // For now, all subscribes should wait for all cross-regional
+ // subscriptions to be established before returning.
+ subscribeRequestBuilder.setSynchronous(true);
+
+ // Set the SubscribeRequest into the outer PubSubRequest
+ pubsubRequestBuilder.setSubscribeRequest(subscribeRequestBuilder);
+ } else {
+ // Create the UnSubscribeRequest
+ UnsubscribeRequest.Builder unsubscribeRequestBuilder = UnsubscribeRequest.newBuilder();
+ unsubscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
+
+ // Set the UnsubscribeRequest into the outer PubSubRequest
+ pubsubRequestBuilder.setUnsubscribeRequest(unsubscribeRequestBuilder);
+ }
+
+ // Update the PubSubData with the txnId and the requestWriteTime
+ pubSubData.txnId = txnId;
+ pubSubData.requestWriteTime = System.currentTimeMillis();
+
+ // Before we do the write, store this information into the
+ // ResponseHandler so when the server responds, we know what
+ // appropriate Callback Data to invoke for the given txn ID.
+ HedwigClient.getResponseHandlerFromChannel(channel).txn2PubSubData.put(txnId, pubSubData);
+
+ // Finally, write the Subscribe request through the Channel.
+ if (logger.isDebugEnabled())
+ logger.debug("Writing a SubUnsub request to host: " + HedwigClient.getHostFromChannel(channel)
+ + " for pubSubData: " + pubSubData);
+ ChannelFuture future = channel.write(pubsubRequestBuilder.build());
+ future.addListener(new WriteCallback(pubSubData, client));
+ }
+
+ /**
+ * This is a helper method to write a consume message to the server after a
+ * subscribe Channel connection is made to the server and messages are being
+ * consumed by the client.
+ *
+ * @param pubSubData
+ * Consume call's data wrapper object. We assume that the
+ * operationType field is CONSUME.
+ * @param channel
+ * Netty I/O channel for communication between the client and
+ * server
+ * @param messageSeqId
+ * Message Seq ID for the latest/last message the client has
+ * consumed.
+ */
+ public void doConsume(final PubSubData pubSubData, final Channel channel, final MessageSeqId messageSeqId) {
+ // Create a PubSubRequest
+ PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
+ pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
+ pubsubRequestBuilder.setType(OperationType.CONSUME);
+ long txnId = client.globalCounter.incrementAndGet();
+ pubsubRequestBuilder.setTxnId(txnId);
+ pubsubRequestBuilder.setTopic(pubSubData.topic);
+
+ // Create the ConsumeRequest
+ ConsumeRequest.Builder consumeRequestBuilder = ConsumeRequest.newBuilder();
+ consumeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
+ consumeRequestBuilder.setMsgId(messageSeqId);
+
+ // Set the ConsumeRequest into the outer PubSubRequest
+ pubsubRequestBuilder.setConsumeRequest(consumeRequestBuilder);
+
+ // For Consume requests, we will send them from the client in a fire and
+ // forget manner. We are not expecting the server to send back an ack
+ // response so no need to register this in the ResponseHandler. There
+ // are no callbacks to invoke since this isn't a client initiated
+ // action. Instead, just have a future listener that will log an error
+ // message if there was a problem writing the consume request.
+ if (logger.isDebugEnabled())
+ logger.debug("Writing a Consume request to host: " + HedwigClient.getHostFromChannel(channel)
+ + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
+ ChannelFuture future = channel.write(pubsubRequestBuilder.build());
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.error("Error writing a Consume request to host: " + HedwigClient.getHostFromChannel(channel)
+ + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
+ }
+ }
+ });
+
+ }
+
+ public boolean hasSubscription(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
+ ServiceDownException {
+ // The subscription type of info should be stored on the server end, not
+ // the client side. Eventually, the server will have the Subscription
+ // Manager part that ties into Zookeeper to manage this info.
+ // Commenting out these type of API's related to that here for now until
+ // this data is available on the server. Will figure out what the
+ // correct way to contact the server to get this info is then.
+ // The client side just has soft memory state for client subscription
+ // information.
+ return topicSubscriber2Channel.containsKey(new TopicSubscriber(topic, subscriberId));
+ }
+
+ public List<ByteString> getSubscriptionList(ByteString subscriberId) throws CouldNotConnectException,
+ ServiceDownException {
+ // Same as the previous hasSubscription method, this data should reside
+ // on the server end, not the client side.
+ return null;
+ }
+
+ public void startDelivery(final ByteString topic, final ByteString subscriberId, MessageHandler messageHandler)
+ throws ClientNotSubscribedException {
+ if (logger.isDebugEnabled())
+ logger.debug("Starting delivery for topic: " + topic.toStringUtf8() + ", subscriberId: "
+ + subscriberId.toStringUtf8());
+ TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
+ // Make sure we know about this topic subscription on the client side
+ // exists. The assumption is that the client should have in memory the
+ // Channel created for the TopicSubscriber once the server has sent
+ // an ack response to the initial subscribe request.
+ if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
+ logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: "
+ + subscriberId.toStringUtf8());
+ throw new ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8()
+ + ", subscriberId: " + subscriberId.toStringUtf8());
+ }
+
+ // Register the MessageHandler with the subscribe Channel's
+ // Response Handler.
+ Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
+ HedwigClient.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
+ .setMessageHandler(messageHandler);
+ // Now make the TopicSubscriber Channel readable (it is set to not be
+ // readable when the initial subscription is done). Note that this is an
+ // asynchronous call. If this fails (not likely), the futureListener
+ // will just log an error message for now.
+ ChannelFuture future = topicSubscriberChannel.setReadable(true);
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.error("Unable to make subscriber Channel readable in startDelivery call for topic: "
+ + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+ }
+ }
+ });
+ }
+
+ public void stopDelivery(final ByteString topic, final ByteString subscriberId) throws ClientNotSubscribedException {
+ if (logger.isDebugEnabled())
+ logger.debug("Stopping delivery for topic: " + topic.toStringUtf8() + ", subscriberId: "
+ + subscriberId.toStringUtf8());
+ TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
+ // Make sure we know that this topic subscription on the client side
+ // exists. The assumption is that the client should have in memory the
+ // Channel created for the TopicSubscriber once the server has sent
+ // an ack response to the initial subscribe request.
+ if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
+ logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: "
+ + subscriberId.toStringUtf8());
+ throw new ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8()
+ + ", subscriberId: " + subscriberId.toStringUtf8());
+ }
+
+ // Unregister the MessageHandler for the subscribe Channel's
+ // Response Handler.
+ Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
+ HedwigClient.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
+ .setMessageHandler(null);
+ // Now make the TopicSubscriber channel not-readable. This will buffer
+ // up messages if any are sent from the server. Note that this is an
+ // asynchronous call. If this fails (not likely), the futureListener
+ // will just log an error message for now.
+ ChannelFuture future = topicSubscriberChannel.setReadable(false);
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.error("Unable to make subscriber Channel not readable in stopDelivery call for topic: "
+ + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+ }
+ }
+ });
+ }
+
+ public void closeSubscription(ByteString topic, ByteString subscriberId) throws ServiceDownException {
+ PubSubData pubSubData = new PubSubData(topic, null, subscriberId, null, null, null, null);
+ synchronized (pubSubData) {
+ PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
+ asyncCloseSubscription(topic, subscriberId, pubSubCallback, null);
+ try {
+ while (!pubSubData.isDone)
+ pubSubData.wait();
+ } catch (InterruptedException e) {
+ throw new ServiceDownException("Interrupted Exception while waiting for asyncCloseSubscription call");
+ }
+ // Check from the PubSubCallback if it was successful or not.
+ if (!pubSubCallback.getIsCallSuccessful()) {
+ throw new ServiceDownException("Exception while trying to close the subscription for topic: "
+ + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+ }
+ }
+ }
+
+ public void asyncCloseSubscription(final ByteString topic, final ByteString subscriberId,
+ final Callback<Void> callback, final Object context) {
+ if (logger.isDebugEnabled())
+ logger.debug("Closing subscription asynchronously for topic: " + topic.toStringUtf8() + ", subscriberId: "
+ + subscriberId.toStringUtf8());
+ TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
+ if (topicSubscriber2Channel.containsKey(topicSubscriber)) {
+ // Remove all cached references for the TopicSubscriber
+ Channel channel = topicSubscriber2Channel.get(topicSubscriber);
+ topicSubscriber2Channel.remove(topicSubscriber);
+ // Close the subscribe channel asynchronously.
+ HedwigClient.getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
+ ChannelFuture future = channel.close();
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.error("Failed to close the subscription channel for topic: " + topic.toStringUtf8()
+ + ", subscriberId: " + subscriberId.toStringUtf8());
+ callback.operationFailed(context, new ServiceDownException(
+ "Failed to close the subscription channel for topic: " + topic.toStringUtf8()
+ + ", subscriberId: " + subscriberId.toStringUtf8()));
+ } else {
+ callback.operationFinished(context, null);
+ }
+ }
+ });
+ } else {
+ logger.warn("Trying to close a subscription when we don't have a subscribe channel cached for topic: "
+ + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+ callback.operationFinished(context, null);
+ }
+ }
+
+ // Public getter and setters for entries in the topic2Host Map.
+ // This is used for classes that need this information but are not in the
+ // same classpath.
+ public Channel getChannelForTopic(TopicSubscriber topic) {
+ return topicSubscriber2Channel.get(topic);
+ }
+
+ public void setChannelForTopic(TopicSubscriber topic, Channel channel) {
+ topicSubscriber2Channel.put(topic, channel);
+ }
+
+ public void removeChannelForTopic(TopicSubscriber topic) {
+ topicSubscriber2Channel.remove(topic);
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.exceptions.ServerRedirectLoopException;
+import org.apache.hedwig.client.exceptions.TooManyServerRedirectsException;
+import org.apache.hedwig.client.handlers.PublishResponseHandler;
+import org.apache.hedwig.client.handlers.SubscribeReconnectCallback;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+import org.apache.hedwig.client.handlers.UnsubscribeResponseHandler;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.exceptions.PubSubException.UncertainStateException;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+@ChannelPipelineCoverage("all")
+public class ResponseHandler extends SimpleChannelHandler {
+
+ private static Logger logger = Logger.getLogger(ResponseHandler.class);
+
+ // Concurrent Map to store for each async PubSub request, the txn ID
+ // and the corresponding PubSub call's data which stores the VoidCallback to
+ // invoke when we receive a PubSub ack response from the server.
+ // This is specific to this instance of the ResponseHandler which is
+ // tied to a specific netty Channel Pipeline.
+ protected final ConcurrentMap<Long, PubSubData> txn2PubSubData = new ConcurrentHashMap<Long, PubSubData>();
+
+ // Boolean indicating if we closed the channel this ResponseHandler is
+ // attached to explicitly or not. If so, we do not need to do the
+ // channel disconnected logic here.
+ public boolean channelClosedExplicitly = false;
+
+ private final HedwigClient client;
+ private final HedwigPublisher pub;
+ private final HedwigSubscriber sub;
+ private final ClientConfiguration cfg;
+
+ private final PublishResponseHandler pubHandler;
+ private final SubscribeResponseHandler subHandler;
+ private final UnsubscribeResponseHandler unsubHandler;
+
+ public ResponseHandler(HedwigClient client) {
+ this.client = client;
+ this.sub = client.getSubscriber();
+ this.pub = client.getPublisher();
+ this.cfg = client.getConfiguration();
+ this.pubHandler = new PublishResponseHandler(this);
+ this.subHandler = new SubscribeResponseHandler(this);
+ this.unsubHandler = new UnsubscribeResponseHandler(this);
+ }
+
+ // Public getters needed for the private members
+ public HedwigClient getClient() {
+ return client;
+ }
+
+ public HedwigSubscriber getSubscriber() {
+ return sub;
+ }
+
+ public ClientConfiguration getConfiguration() {
+ return cfg;
+ }
+
+ public SubscribeResponseHandler getSubscribeResponseHandler() {
+ return subHandler;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ // If the Message is not a PubSubResponse, just send it upstream and let
+ // something else handle it.
+ if (!(e.getMessage() instanceof PubSubResponse)) {
+ ctx.sendUpstream(e);
+ }
+ // Retrieve the PubSubResponse from the Message that was sent by the
+ // server.
+ PubSubResponse response = (PubSubResponse) e.getMessage();
+ if (logger.isDebugEnabled())
+ logger.debug("Response received from host: " + HedwigClient.getHostFromChannel(ctx.getChannel())
+ + ", response: " + response);
+
+ // Determine if this PubSubResponse is an ack response for a PubSub
+ // Request or if it is a message being pushed to the client subscriber.
+ if (response.hasMessage()) {
+ // Subscribed messages being pushed to the client so handle/consume
+ // it and return.
+ subHandler.handleSubscribeMessage(response);
+ return;
+ }
+
+ // Response is an ack to a prior PubSubRequest so first retrieve the
+ // PubSub data for this txn.
+ PubSubData pubSubData = txn2PubSubData.containsKey(response.getTxnId()) ? txn2PubSubData.get(response
+ .getTxnId()) : null;
+ // Validate that the PubSub data for this txn is stored. If not, just
+ // log an error message and return since we don't know how to handle
+ // this.
+ if (pubSubData == null) {
+ logger.error("PubSub Data was not found for PubSubResponse: " + response);
+ return;
+ }
+
+ // Now that we've retrieved the PubSubData for this specific Txn ID, we
+ // can remove it from the Map.
+ txn2PubSubData.remove(response.getTxnId());
+
+ // Store the topic2Host mapping if this wasn't a server redirect. We'll
+ // assume that if the server was able to have an open Channel connection
+ // to the client, and responded with an ack message other than the
+ // NOT_RESPONSIBLE_FOR_TOPIC one, it is the correct topic master.
+ if (!response.getStatusCode().equals(StatusCode.NOT_RESPONSIBLE_FOR_TOPIC)) {
+ client.storeTopic2HostMapping(pubSubData, ctx.getChannel());
+ }
+
+ // Depending on the operation type, call the appropriate handler.
+ switch (pubSubData.operationType) {
+ case PUBLISH:
+ pubHandler.handlePublishResponse(response, pubSubData, ctx.getChannel());
+ break;
+ case SUBSCRIBE:
+ subHandler.handleSubscribeResponse(response, pubSubData, ctx.getChannel());
+ break;
+ case UNSUBSCRIBE:
+ unsubHandler.handleUnsubscribeResponse(response, pubSubData, ctx.getChannel());
+ break;
+ default:
+ // The above are the only expected PubSubResponse messages received
+ // from the server for the various client side requests made.
+ logger.error("Response received from server is for an unhandled operation type, txnId: "
+ + response.getTxnId() + ", operationType: " + pubSubData.operationType);
+ }
+ }
+
+ /**
+ * Logic to repost a PubSubRequest when the server responds with a redirect
+ * indicating they are not the topic master.
+ *
+ * @param response
+ * PubSubResponse from the server for the redirect
+ * @param pubSubData
+ * PubSubData for the original PubSubRequest made
+ * @param channel
+ * Channel Channel we used to make the original PubSubRequest
+ * @throws Exception
+ * Throws an exception if there was an error in doing the
+ * redirect repost of the PubSubRequest
+ */
+ public void handleRedirectResponse(PubSubResponse response, PubSubData pubSubData, Channel channel)
+ throws Exception {
+ if (logger.isDebugEnabled())
+ logger.debug("Handling a redirect from host: " + HedwigClient.getHostFromChannel(channel) + ", response: "
+ + response + ", pubSubData: " + pubSubData);
+ // In this case, the PubSub request was done to a server that is not
+ // responsible for the topic. First make sure that we haven't
+ // exceeded the maximum number of server redirects.
+ int curNumServerRedirects = (pubSubData.triedServers == null) ? 0 : pubSubData.triedServers.size();
+ if (curNumServerRedirects >= cfg.getMaximumServerRedirects()) {
+ // We've already exceeded the maximum number of server redirects
+ // so consider this as an error condition for the client.
+ // Invoke the operationFailed callback and just return.
+ if (logger.isDebugEnabled())
+ logger.debug("Exceeded the number of server redirects (" + curNumServerRedirects + ") so error out.");
+ pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
+ new TooManyServerRedirectsException("Already reached max number of redirects: "
+ + curNumServerRedirects)));
+ return;
+ }
+
+ // We will redirect and try to connect to the correct server
+ // stored in the StatusMsg of the response. First store the
+ // server that we sent the PubSub request to for the topic.
+ ByteString triedServer = ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(HedwigClient
+ .getHostFromChannel(channel)));
+ if (pubSubData.triedServers == null)
+ pubSubData.triedServers = new LinkedList<ByteString>();
+ pubSubData.shouldClaim = true;
+ pubSubData.triedServers.add(triedServer);
+
+ // Now get the redirected server host (expected format is
+ // Hostname:Port:SSLPort) from the server's response message. If one is
+ // not given for some reason, then redirect to the default server
+ // host/VIP to repost the request.
+ String statusMsg = response.getStatusMsg();
+ InetSocketAddress redirectedHost;
+ if (statusMsg != null && statusMsg.length() > 0) {
+ if (cfg.isSSLEnabled()) {
+ redirectedHost = new HedwigSocketAddress(statusMsg).getSSLSocketAddress();
+ } else {
+ redirectedHost = new HedwigSocketAddress(statusMsg).getSocketAddress();
+ }
+ } else {
+ redirectedHost = cfg.getDefaultServerHost();
+ }
+
+ // Make sure the redirected server is not one we've already attempted
+ // already before in this PubSub request.
+ if (pubSubData.triedServers.contains(ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(redirectedHost)))) {
+ logger.error("We've already sent this PubSubRequest before to redirectedHost: " + redirectedHost
+ + ", pubSubData: " + pubSubData);
+ pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
+ new ServerRedirectLoopException("Already made the request before to redirected host: "
+ + redirectedHost)));
+ return;
+ }
+
+ // Check if we already have a Channel open to the redirected server
+ // host.
+ boolean redirectedHostChannelExists = pub.host2Channel.containsKey(redirectedHost) ? true : false;
+ if (pubSubData.operationType.equals(OperationType.SUBSCRIBE) || !redirectedHostChannelExists) {
+ // We don't have an existing channel to the redirected host OR this
+ // is a redirected Subscribe request. For Subscribe requests, we
+ // always want to create a new unique Channel connection to the
+ // topic master server for the TopicSubscriber.
+ client.doConnect(pubSubData, redirectedHost);
+ } else {
+ // For Publish and Unsubscribe requests, we can just post the
+ // request again directly on the existing cached redirected host
+ // channel.
+ if (pubSubData.operationType.equals(OperationType.PUBLISH)) {
+ pub.doPublish(pubSubData, pub.host2Channel.get(redirectedHost));
+ } else if (pubSubData.operationType.equals(OperationType.UNSUBSCRIBE)) {
+ sub.doSubUnsub(pubSubData, pub.host2Channel.get(redirectedHost));
+ }
+ }
+ }
+
+ // Logic to deal with what happens when a Channel to a server host is
+ // disconnected.
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ // If this channel was closed explicitly by the client code,
+ // we do not need to do any of this logic. This could happen
+ // for redundant Publish channels created or redirected subscribe
+ // channels that are not used anymore or when we shutdown the
+ // client and manually close all of the open channels.
+ // Also don't do any of the disconnect logic if the client has stopped.
+ if (channelClosedExplicitly || client.hasStopped())
+ return;
+
+ // Make sure the host retrieved is not null as there could be some weird
+ // channel disconnect events happening during a client shutdown.
+ // If it is, just return as there shouldn't be anything we need to do.
+ InetSocketAddress host = HedwigClient.getHostFromChannel(ctx.getChannel());
+ logger.warn("Channel was disconnected to host: " + host);
+ if (host == null)
+ return;
+
+ // If this Channel was used for Publish and Unsubscribe flows, just
+ // remove it from the HewdigPublisher's host2Channel map. We will
+ // re-establish a Channel connection to that server when the next
+ // publish/unsubscribe request to a topic that the server owns occurs.
+ PubSubData origSubData = subHandler.getOrigSubData();
+
+ // Now determine what type of operation this channel was used for.
+ if (origSubData == null) {
+ // Only remove the Channel from the mapping if this current
+ // disconnected channel is the same as the cached entry.
+ // Due to race concurrency situations, it is possible to
+ // create multiple channels to the same host for publish
+ // and unsubscribe requests.
+ if (pub.host2Channel.containsKey(host) && pub.host2Channel.get(host).equals(ctx.getChannel())) {
+ if (logger.isDebugEnabled())
+ logger.debug("Disconnected channel for host: " + host
+ + " was for Publish/Unsubscribe requests so remove all references to it.");
+ pub.host2Channel.remove(host);
+ client.clearAllTopicsForHost(host);
+ }
+ } else {
+ // Subscribe channel disconnected so first close and clear all
+ // cached Channel data set up for this topic subscription.
+ sub.closeSubscription(origSubData.topic, origSubData.subscriberId);
+ client.clearAllTopicsForHost(host);
+ // Since the connection to the server host that was responsible
+ // for the topic died, we are not sure about the state of that
+ // server. Resend the original subscribe request data to the default
+ // server host/VIP. Also clear out all of the servers we've
+ // contacted or attempted to from this request as we are starting a
+ // "fresh" subscribe request.
+ origSubData.clearServersList();
+ // Set a new type of VoidCallback for this async call. We need this
+ // hook so after the subscribe reconnect has completed, delivery for
+ // that topic subscriber should also be restarted (if it was that
+ // case before the channel disconnect).
+ origSubData.callback = new SubscribeReconnectCallback(origSubData, client, subHandler.getMessageHandler());
+ origSubData.context = null;
+ if (logger.isDebugEnabled())
+ logger.debug("Disconnected subscribe channel so reconnect with origSubData: " + origSubData);
+ client.doConnect(origSubData, cfg.getDefaultServerHost());
+ }
+
+ // Finally, all of the PubSubRequests that are still waiting for an ack
+ // response from the server need to be removed and timed out. Invoke the
+ // operationFailed callbacks on all of them. Use the
+ // UncertainStateException since the server did receive the request but
+ // we're not sure of the state of the request since the ack response was
+ // never received.
+ for (PubSubData pubSubData : txn2PubSubData.values()) {
+ if (logger.isDebugEnabled())
+ logger.debug("Channel disconnected so invoking the operationFailed callback for pubSubData: "
+ + pubSubData);
+ pubSubData.callback.operationFailed(pubSubData.context, new UncertainStateException(
+ "Server ack response never received before server connection disconnected!"));
+ }
+ txn2PubSubData.clear();
+ }
+
+ // Logic to deal with what happens when a Channel to a server host is
+ // connected. This is needed if the client is using an SSL port to
+ // communicate with the server. If so, we need to do the SSL handshake here
+ // when the channel is first connected.
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ // No need to initiate the SSL handshake if we are closing this channel
+ // explicitly or the client has been stopped.
+ if (cfg.isSSLEnabled() && !channelClosedExplicitly && !client.hasStopped()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Initiating the SSL handshake");
+ }
+ ctx.getPipeline().get(SslHandler.class).handshake(e.getChannel());
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ e.getCause().printStackTrace();
+ e.getChannel().close();
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public class WriteCallback implements ChannelFutureListener {
+
+ private static Logger logger = Logger.getLogger(WriteCallback.class);
+
+ // Private member variables
+ private PubSubData pubSubData;
+ private final HedwigClient client;
+ private final ClientConfiguration cfg;
+
+ // Constructor
+ public WriteCallback(PubSubData pubSubData, HedwigClient client) {
+ super();
+ this.pubSubData = pubSubData;
+ this.client = client;
+ this.cfg = client.getConfiguration();
+ }
+
+ public void operationComplete(ChannelFuture future) throws Exception {
+ // If the client has stopped, there is no need to proceed
+ // with any callback logic here.
+ if (client.hasStopped())
+ return;
+
+ // When the write operation to the server is done, we just need to check
+ // if it was successful or not.
+ InetSocketAddress host = HedwigClient.getHostFromChannel(future.getChannel());
+ if (!future.isSuccess()) {
+ logger.error("Error writing on channel to host: " + host);
+ // On a write failure for a PubSubRequest, we also want to remove
+ // the saved txnId to PubSubData in the ResponseHandler. These
+ // requests will not receive an ack response from the server
+ // so there is no point storing that information there anymore.
+ HedwigClient.getResponseHandlerFromChannel(future.getChannel()).txn2PubSubData.remove(pubSubData.txnId);
+
+ // If we were not able to write on the channel to the server host,
+ // the host could have died or something is wrong with the channel
+ // connection where we can connect to the host, but not write to it.
+ ByteString hostString = (host == null) ? null : ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(host));
+ if (pubSubData.writeFailedServers != null && pubSubData.writeFailedServers.contains(hostString)) {
+ // We've already tried to write to this server previously and
+ // failed, so invoke the operationFailed callback.
+ logger.error("Error writing to host more than once so just invoke the operationFailed callback!");
+ pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
+ "Error while writing message to server: " + hostString));
+ } else {
+ if (logger.isDebugEnabled())
+ logger.debug("Try to send the PubSubRequest again to the default server host/VIP for pubSubData: "
+ + pubSubData);
+ // Keep track of this current server that we failed to write to
+ // but retry the request on the default server host/VIP.
+ if (pubSubData.writeFailedServers == null)
+ pubSubData.writeFailedServers = new LinkedList<ByteString>();
+ pubSubData.writeFailedServers.add(hostString);
+ client.doConnect(pubSubData, cfg.getDefaultServerHost());
+ }
+ } else {
+ // Now that the write to the server is done, we have to wait for it
+ // to respond. The ResponseHandler will take care of the ack
+ // response from the server before we can determine if the async
+ // PubSub call has really completed successfully or not.
+ if (logger.isDebugEnabled())
+ logger.debug("Successfully wrote to host: " + host + " for pubSubData: " + pubSubData);
+ }
+ }
+
+}