You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/08/19 23:25:22 UTC

svn commit: r987314 [6/16] - in /hadoop/zookeeper/trunk: ./ src/contrib/hedwig/ src/contrib/hedwig/client/ src/contrib/hedwig/client/src/ src/contrib/hedwig/client/src/main/ src/contrib/hedwig/client/src/main/cpp/ src/contrib/hedwig/client/src/main/cpp...

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.handlers.MessageConsumeCallback;
+import org.apache.hedwig.client.ssl.SslClientContextFactory;
+import org.apache.hedwig.exceptions.PubSubException.UncertainStateException;
+
+/**
+ * This is a top level Hedwig Client class that encapsulates the common
+ * functionality needed for both Publish and Subscribe operations.
+ * 
+ */
+public class HedwigClient {
+
+    private static final Logger logger = Logger.getLogger(HedwigClient.class);
+
+    // Global counter used for generating unique transaction ID's for
+    // publish and subscribe requests
+    protected final AtomicLong globalCounter = new AtomicLong();
+    // Static String constants
+    protected static final String COLON = ":";
+
+    // The Netty socket factory for making connections to the server.
+    protected final ChannelFactory socketFactory;
+    // Whether the socket factory is one we created or is owned by whoever
+    // instantiated us.
+    protected boolean ownChannelFactory = false;
+
+    // PipelineFactory to create netty client channels to the appropriate server
+    private ClientChannelPipelineFactory pipelineFactory;
+
+    // Concurrent Map to store the mapping from the Topic to the Host.
+    // This could change over time since servers can drop mastership of topics
+    // for load balancing or failover. If a server host ever goes down, we'd
+    // also want to remove all topic mappings the host was responsible for.
+    // The second Map is used as the inverted version of the first one.
+    protected final ConcurrentMap<ByteString, InetSocketAddress> topic2Host = new ConcurrentHashMap<ByteString, InetSocketAddress>();
+    private final ConcurrentMap<InetSocketAddress, List<ByteString>> host2Topics = new ConcurrentHashMap<InetSocketAddress, List<ByteString>>();
+
+    // Each client instantiation will have a Timer for running recurring
+    // threads. One such timer task thread to is to timeout long running
+    // PubSubRequests that are waiting for an ack response from the server.
+    private final Timer clientTimer = new Timer(true);
+
+    // Boolean indicating if the client is running or has stopped.
+    // Once we stop the client, we should sidestep all of the connect,
+    // write callback and channel disconnected logic.
+    private boolean isStopped = false;
+
+    private HedwigSubscriber sub;
+    private final HedwigPublisher pub;
+    private final ClientConfiguration cfg;
+    private final MessageConsumeCallback consumeCb;
+    private SslClientContextFactory sslFactory = null;
+
+    // Base constructor that takes in a Configuration object.
+    // This will create its own client socket channel factory.
+    public HedwigClient(ClientConfiguration cfg) {
+        this(cfg, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
+        ownChannelFactory = true;
+    }
+
+    // Constructor that takes in a Configuration object and a ChannelFactory
+    // that has already been instantiated by the caller.
+    public HedwigClient(ClientConfiguration cfg, ChannelFactory socketFactory) {
+        this.cfg = cfg;
+        this.socketFactory = socketFactory;
+        pub = new HedwigPublisher(this);
+        sub = new HedwigSubscriber(this);
+        pipelineFactory = new ClientChannelPipelineFactory(this);
+        consumeCb = new MessageConsumeCallback(this);
+        if (cfg.isSSLEnabled()) {
+            sslFactory = new SslClientContextFactory(cfg);
+        }
+        // Schedule all of the client timer tasks. Currently we only have the
+        // Request Timeout task.
+        clientTimer.schedule(new PubSubRequestTimeoutTask(), 0, cfg.getTimeoutThreadRunInterval());
+    }
+
+    // Public getters for the various components of a client.
+    public ClientConfiguration getConfiguration() {
+        return cfg;
+    }
+
+    public HedwigSubscriber getSubscriber() {
+        return sub;
+    }
+
+    // Protected method to set the subscriber. This is needed currently for hub
+    // versions of the client subscriber.
+    protected void setSubscriber(HedwigSubscriber sub) {
+        this.sub = sub;
+    }
+
+    public HedwigPublisher getPublisher() {
+        return pub;
+    }
+
+    public MessageConsumeCallback getConsumeCallback() {
+        return consumeCb;
+    }
+
+    public SslClientContextFactory getSslFactory() {
+        return sslFactory;
+    }
+
+    // We need to deal with the possible problem of a PubSub request being
+    // written to successfully to the server host but for some reason, the
+    // ack message back never comes. What could happen is that the VoidCallback
+    // stored in the ResponseHandler.txn2PublishData map will never be called.
+    // We should have a configured timeout so if that passes from the time a
+    // write was successfully done to the server, we can fail this async PubSub
+    // transaction. The caller could possibly redo the transaction if needed at
+    // a later time. Creating a timeout cleaner TimerTask to do this here.
+    class PubSubRequestTimeoutTask extends TimerTask {
+        /**
+         * Implement the TimerTask's abstract run method.
+         */
+        @Override
+        public void run() {
+            if (logger.isDebugEnabled())
+                logger.debug("Running the PubSubRequest Timeout Task");
+            // Loop through all outstanding PubSubData requests and check if
+            // the requestWriteTime has timed out compared to the current time.
+            long curTime = System.currentTimeMillis();
+            long timeoutInterval = cfg.getServerAckResponseTimeout();
+
+            // First check the ResponseHandlers associated with cached
+            // channels in HedwigPublisher.host2Channel. This stores the
+            // channels used for Publish and Unsubscribe requests.
+            for (Channel channel : pub.host2Channel.values()) {
+                ResponseHandler responseHandler = getResponseHandlerFromChannel(channel);
+                for (PubSubData pubSubData : responseHandler.txn2PubSubData.values()) {
+                    checkPubSubDataToTimeOut(pubSubData, responseHandler, curTime, timeoutInterval);
+                }
+            }
+            // Now do the same for the cached channels in
+            // HedwigSubscriber.topicSubscriber2Channel. This stores the
+            // channels used exclusively for Subscribe requests.
+            for (Channel channel : sub.topicSubscriber2Channel.values()) {
+                ResponseHandler responseHandler = getResponseHandlerFromChannel(channel);
+                for (PubSubData pubSubData : responseHandler.txn2PubSubData.values()) {
+                    checkPubSubDataToTimeOut(pubSubData, responseHandler, curTime, timeoutInterval);
+                }
+            }
+        }
+
+        private void checkPubSubDataToTimeOut(PubSubData pubSubData, ResponseHandler responseHandler, long curTime,
+                long timeoutInterval) {
+            if (curTime > pubSubData.requestWriteTime + timeoutInterval) {
+                // Current PubSubRequest has timed out so remove it from the
+                // ResponseHandler's map and invoke the VoidCallback's
+                // operationFailed method.
+                logger.error("Current PubSubRequest has timed out for pubSubData: " + pubSubData);
+                responseHandler.txn2PubSubData.remove(pubSubData.txnId);
+                pubSubData.callback.operationFailed(pubSubData.context, new UncertainStateException(
+                        "Server ack response never received so PubSubRequest has timed out!"));
+            }
+        }
+    }
+
+    // When we are done with the client, this is a clean way to gracefully close
+    // all channels/sockets created by the client and to also release all
+    // resources used by netty.
+    public void stop() {
+        logger.info("Stopping the client!");
+        // Set the client boolean flag to indicate the client has stopped.
+        isStopped = true;
+        // Stop the timer and all timer task threads.
+        clientTimer.cancel();
+        // Close all of the open Channels.
+        for (Channel channel : pub.host2Channel.values()) {
+            getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
+            channel.close().awaitUninterruptibly();
+        }
+        for (Channel channel : sub.topicSubscriber2Channel.values()) {
+            getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
+            channel.close().awaitUninterruptibly();
+        }
+        // Clear out all Maps.
+        topic2Host.clear();
+        host2Topics.clear();
+        pub.host2Channel.clear();
+        sub.topicSubscriber2Channel.clear();
+        // Release resources used by the ChannelFactory on the client if we are
+        // the owner that created it.
+        if (ownChannelFactory) {
+            socketFactory.releaseExternalResources();
+        }
+        logger.info("Completed stopping the client!");
+    }
+
+    /**
+     * This is a helper method to do the connect attempt to the server given the
+     * inputted host/port. This can be used to connect to the default server
+     * host/port which is the VIP. That will pick a server in the cluster at
+     * random to connect to for the initial PubSub attempt (with redirect logic
+     * being done at the server side). Additionally, this could be called after
+     * the client makes an initial PubSub attempt at a server, and is redirected
+     * to the one that is responsible for the topic. Once the connect to the
+     * server is done, we will perform the corresponding PubSub write on that
+     * channel.
+     * 
+     * @param pubSubData
+     *            PubSub call's data wrapper object
+     * @param serverHost
+     *            Input server host to connect to of type InetSocketAddress
+     */
+    public void doConnect(PubSubData pubSubData, InetSocketAddress serverHost) {
+        if (logger.isDebugEnabled())
+            logger.debug("Connecting to host: " + serverHost + " with pubSubData: " + pubSubData);
+        // Set up the ClientBootStrap so we can create a new Channel connection
+        // to the server.
+        ClientBootstrap bootstrap = new ClientBootstrap(socketFactory);
+        bootstrap.setPipelineFactory(pipelineFactory);
+        bootstrap.setOption("tcpNoDelay", true);
+        bootstrap.setOption("keepAlive", true);
+
+        // Start the connection attempt to the input server host.
+        ChannelFuture future = bootstrap.connect(serverHost);
+        future.addListener(new ConnectCallback(pubSubData, serverHost, this));
+    }
+
+    /**
+     * Helper method to store the topic2Host mapping in the HedwigClient cache
+     * map. This method is assumed to be called when we've done a successful
+     * connection to the correct server topic master.
+     * 
+     * @param pubSubData
+     *            PubSub wrapper data
+     * @param channel
+     *            Netty Channel
+     */
+    protected void storeTopic2HostMapping(PubSubData pubSubData, Channel channel) {
+        // Retrieve the server host that we've connected to and store the
+        // mapping from the topic to this host. For all other non-redirected
+        // server statuses, we consider that as a successful connection to the
+        // correct topic master.
+        InetSocketAddress host = getHostFromChannel(channel);
+        if (topic2Host.containsKey(pubSubData.topic) && topic2Host.get(pubSubData.topic).equals(host)) {
+            // Entry in map exists for the topic but it is the same as the
+            // current host. In this case there is nothing to do.
+            return;
+        }
+
+        // Store the relevant mappings for this topic and host combination.
+        if (logger.isDebugEnabled())
+            logger.debug("Storing info for topic: " + pubSubData.topic.toStringUtf8() + ", old host: "
+                    + topic2Host.get(pubSubData.topic) + ", new host: " + host);
+        topic2Host.put(pubSubData.topic, host);
+        if (host2Topics.containsKey(host)) {
+            host2Topics.get(host).add(pubSubData.topic);
+        } else {
+            LinkedList<ByteString> topicsList = new LinkedList<ByteString>();
+            topicsList.add(pubSubData.topic);
+            host2Topics.put(host, topicsList);
+        }
+    }
+
+    /**
+     * Helper static method to get the String Hostname:Port from a netty
+     * Channel. Assumption is that the netty Channel was originally created with
+     * an InetSocketAddress. This is true with the Hedwig netty implementation.
+     * 
+     * @param channel
+     *            Netty channel to extract the hostname and port from.
+     * @return String representation of the Hostname:Port from the Netty Channel
+     */
+    public static InetSocketAddress getHostFromChannel(Channel channel) {
+        return (InetSocketAddress) channel.getRemoteAddress();
+    }
+
+    /**
+     * Helper static method to get the ResponseHandler instance from a Channel
+     * via the ChannelPipeline it is associated with. The assumption is that the
+     * last ChannelHandler tied to the ChannelPipeline is the ResponseHandler.
+     * 
+     * @param channel
+     *            Channel we are retrieving the ResponseHandler instance for
+     * @return ResponseHandler Instance tied to the Channel's Pipeline
+     */
+    public static ResponseHandler getResponseHandlerFromChannel(Channel channel) {
+        return (ResponseHandler) channel.getPipeline().getLast();
+    }
+
+    // Public getter for entries in the topic2Host Map.
+    public InetSocketAddress getHostForTopic(ByteString topic) {
+        return topic2Host.get(topic);
+    }
+
+    // If a server host goes down or the channel to it gets disconnected,
+    // we want to clear out all relevant cached information. We'll
+    // need to remove all of the topic mappings that the host was
+    // responsible for.
+    public void clearAllTopicsForHost(InetSocketAddress host) {
+        if (logger.isDebugEnabled())
+            logger.debug("Clearing all topics for host: " + host);
+        // For each of the topics that the host was responsible for,
+        // remove it from the topic2Host mapping.
+        if (host2Topics.containsKey(host)) {
+            for (ByteString topic : host2Topics.get(host)) {
+                if (logger.isDebugEnabled())
+                    logger.debug("Removing mapping for topic: " + topic.toStringUtf8() + " from host: " + host);
+                topic2Host.remove(topic);
+            }
+            // Now it is safe to remove the host2Topics mapping entry.
+            host2Topics.remove(host);
+        }
+    }
+
+    // Public getter to see if the client has been stopped.
+    public boolean hasStopped() {
+        return isStopped;
+    }
+
+    // Public getter to get the client's Timer object.
+    // This is so we can reuse this and not have to create multiple Timer
+    // objects.
+    public Timer getClientTimer() {
+        return clientTimer;
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.handlers.PubSubCallback;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PublishRequest;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * This is the Hedwig Netty specific implementation of the Publisher interface.
+ * 
+ */
+public class HedwigPublisher implements Publisher {
+
+    private static Logger logger = Logger.getLogger(HedwigPublisher.class);
+
+    // Concurrent Map to store the mappings for a given Host (Hostname:Port) to
+    // the Channel that has been established for it previously. This channel
+    // will be used whenever we publish on a topic that the server is the master
+    // of currently. The channels used here will only be used for publish and
+    // unsubscribe requests.
+    protected final ConcurrentMap<InetSocketAddress, Channel> host2Channel = new ConcurrentHashMap<InetSocketAddress, Channel>();
+
+    private final HedwigClient client;
+    private final ClientConfiguration cfg;
+
+    protected HedwigPublisher(HedwigClient client) {
+        this.client = client;
+        this.cfg = client.getConfiguration();
+    }
+
+    public void publish(ByteString topic, Message msg) throws CouldNotConnectException, ServiceDownException {
+        if (logger.isDebugEnabled())
+            logger.debug("Calling a sync publish for topic: " + topic.toStringUtf8() + ", msg: " + msg);
+        PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null, null, null);
+        synchronized (pubSubData) {
+            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
+            asyncPublish(topic, msg, pubSubCallback, null);
+            try {
+                while (!pubSubData.isDone)
+                    pubSubData.wait();
+            } catch (InterruptedException e) {
+                throw new ServiceDownException("Interrupted Exception while waiting for async publish call");
+            }
+            // Check from the PubSubCallback if it was successful or not.
+            if (!pubSubCallback.getIsCallSuccessful()) {
+                // See what the exception was that was thrown when the operation
+                // failed.
+                PubSubException failureException = pubSubCallback.getFailureException();
+                if (failureException == null) {
+                    // This should not happen as the operation failed but a null
+                    // PubSubException was passed. Log a warning message but
+                    // throw a generic ServiceDownException.
+                    logger.error("Sync Publish operation failed but no PubSubException was passed!");
+                    throw new ServiceDownException("Server ack response to publish request is not successful");
+                }
+                // For the expected exceptions that could occur, just rethrow
+                // them.
+                else if (failureException instanceof CouldNotConnectException) {
+                    throw (CouldNotConnectException) failureException;
+                } else if (failureException instanceof ServiceDownException) {
+                    throw (ServiceDownException) failureException;
+                } else {
+                    // For other types of PubSubExceptions, just throw a generic
+                    // ServiceDownException but log a warning message.
+                    logger.error("Unexpected exception type when a sync publish operation failed: " + failureException);
+                    throw new ServiceDownException("Server ack response to publish request is not successful");
+                }
+            }
+        }
+    }
+
+    public void asyncPublish(ByteString topic, Message msg, Callback<Void> callback, Object context) {
+        if (logger.isDebugEnabled())
+            logger.debug("Calling an async publish for topic: " + topic.toStringUtf8() + ", msg: " + msg);
+        // Check if we already have a Channel connection set up to the server
+        // for the given Topic.
+        PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null, callback, context);
+        if (client.topic2Host.containsKey(topic)) {
+            InetSocketAddress host = client.topic2Host.get(topic);
+            if (host2Channel.containsKey(host)) {
+                // We already have the Channel connection for the server host so
+                // do the publish directly. We will deal with redirect logic
+                // later on if that server is no longer the current host for
+                // the topic.
+                doPublish(pubSubData, host2Channel.get(host));
+            } else {
+                // We have a mapping for the topic to host but don't have a
+                // Channel for that server. This can happen if the Channel
+                // is disconnected for some reason. Do the connect then to
+                // the specified server host to create a new Channel connection.
+                client.doConnect(pubSubData, host);
+            }
+        } else {
+            // Server host for the given topic is not known yet so use the
+            // default server host/port as defined in the configs. This should
+            // point to the server VIP which would redirect to a random server
+            // (which might not be the server hosting the topic).
+            client.doConnect(pubSubData, cfg.getDefaultServerHost());
+        }
+    }
+
+    /**
+     * This is a helper method to write the actual publish message once the
+     * client is connected to the server and a Channel is available.
+     * 
+     * @param pubSubData
+     *            Publish call's data wrapper object
+     * @param channel
+     *            Netty I/O channel for communication between the client and
+     *            server
+     */
+    protected void doPublish(PubSubData pubSubData, Channel channel) {
+        // Create a PubSubRequest
+        PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
+        pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
+        pubsubRequestBuilder.setType(OperationType.PUBLISH);
+        if (pubSubData.triedServers != null && pubSubData.triedServers.size() > 0) {
+            pubsubRequestBuilder.addAllTriedServers(pubSubData.triedServers);
+        }
+        long txnId = client.globalCounter.incrementAndGet();
+        pubsubRequestBuilder.setTxnId(txnId);
+        pubsubRequestBuilder.setShouldClaim(pubSubData.shouldClaim);
+        pubsubRequestBuilder.setTopic(pubSubData.topic);
+
+        // Now create the PublishRequest
+        PublishRequest.Builder publishRequestBuilder = PublishRequest.newBuilder();
+
+        publishRequestBuilder.setMsg(pubSubData.msg);
+
+        // Set the PublishRequest into the outer PubSubRequest
+        pubsubRequestBuilder.setPublishRequest(publishRequestBuilder);
+
+        // Update the PubSubData with the txnId and the requestWriteTime
+        pubSubData.txnId = txnId;
+        pubSubData.requestWriteTime = System.currentTimeMillis();
+
+        // Before we do the write, store this information into the
+        // ResponseHandler so when the server responds, we know what
+        // appropriate Callback Data to invoke for the given txn ID.
+        HedwigClient.getResponseHandlerFromChannel(channel).txn2PubSubData.put(txnId, pubSubData);
+
+        // Finally, write the Publish request through the Channel.
+        if (logger.isDebugEnabled())
+            logger.debug("Writing a Publish request to host: " + HedwigClient.getHostFromChannel(channel)
+                    + " for pubSubData: " + pubSubData);
+        ChannelFuture future = channel.write(pubsubRequestBuilder.build());
+        future.addListener(new WriteCallback(pubSubData, client));
+    }
+
+    // Synchronized method to store the host2Channel mapping (if it doesn't
+    // exist yet). Retrieve the hostname info from the Channel created via the
+    // RemoteAddress tied to it.
+    protected synchronized void storeHost2ChannelMapping(Channel channel) {
+        InetSocketAddress host = HedwigClient.getHostFromChannel(channel);
+        if (!host2Channel.containsKey(host)) {
+            if (logger.isDebugEnabled())
+                logger.debug("Storing a new Channel mapping for host: " + host);
+            host2Channel.put(host, channel);
+        } else {
+            // If we've reached here, that means we already have a Channel
+            // mapping for the given host. This should ideally not happen
+            // and it means we are creating another Channel to a server host
+            // to publish on when we could have used an existing one. This could
+            // happen due to a race condition if initially multiple concurrent
+            // threads are publishing on the same topic and no Channel exists
+            // currently to the server. We are not synchronizing this initial
+            // creation of Channels to a given host for performance.
+            // Another possible way to have redundant Channels created is if
+            // a new topic is being published to, we connect to the default
+            // server host which should be a VIP that redirects to a "real"
+            // server host. Since we don't know beforehand what is the full
+            // set of server hosts, we could be redirected to a server that
+            // we already have a channel connection to from a prior existing
+            // topic. Close these redundant channels as they won't be used.
+            if (logger.isDebugEnabled())
+                logger.debug("Channel mapping to host: " + host + " already exists so no need to store it.");
+            HedwigClient.getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
+            channel.close();
+        }
+    }
+
+    // Public getter for entries in the host2Channel Map.
+    // This is used for classes that need this information but are not in the
+    // same classpath.
+    public Channel getChannelForHost(InetSocketAddress host) {
+        return host2Channel.get(host);
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,585 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
+import org.apache.hedwig.client.handlers.PubSubCallback;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * This is the Hedwig Netty specific implementation of the Subscriber interface.
+ * 
+ */
+public class HedwigSubscriber implements Subscriber {
+
+    private static Logger logger = Logger.getLogger(HedwigSubscriber.class);
+
+    // Concurrent Map to store the cached Channel connections on the client side
+    // to a server host for a given Topic + SubscriberId combination. For each
+    // TopicSubscriber, we want a unique Channel connection to the server for
+    // it. We can also get the ResponseHandler tied to the Channel via the
+    // Channel Pipeline.
+    protected final ConcurrentMap<TopicSubscriber, Channel> topicSubscriber2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
+
+    protected final HedwigClient client;
+    protected final ClientConfiguration cfg;
+
+    public HedwigSubscriber(HedwigClient client) {
+        this.client = client;
+        this.cfg = client.getConfiguration();
+    }
+
+    // Private method that holds the common logic for doing synchronous
+    // Subscribe or Unsubscribe requests. This is for code reuse since these
+    // two flows are very similar. The assumption is that the input
+    // OperationType is either SUBSCRIBE or UNSUBSCRIBE.
+    private void subUnsub(ByteString topic, ByteString subscriberId, OperationType operationType,
+            CreateOrAttach createOrAttach) throws CouldNotConnectException, ClientAlreadySubscribedException,
+            ClientNotSubscribedException, ServiceDownException {
+        if (logger.isDebugEnabled())
+            logger.debug("Calling a sync subUnsub request for topic: " + topic.toStringUtf8() + ", subscriberId: "
+                    + subscriberId.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: "
+                    + createOrAttach);
+        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, createOrAttach, null, null);
+        synchronized (pubSubData) {
+            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
+            asyncSubUnsub(topic, subscriberId, pubSubCallback, null, operationType, createOrAttach);
+            try {
+                while (!pubSubData.isDone)
+                    pubSubData.wait();
+            } catch (InterruptedException e) {
+                throw new ServiceDownException("Interrupted Exception while waiting for async subUnsub call");
+            }
+            // Check from the PubSubCallback if it was successful or not.
+            if (!pubSubCallback.getIsCallSuccessful()) {
+                // See what the exception was that was thrown when the operation
+                // failed.
+                PubSubException failureException = pubSubCallback.getFailureException();
+                if (failureException == null) {
+                    // This should not happen as the operation failed but a null
+                    // PubSubException was passed. Log a warning message but
+                    // throw a generic ServiceDownException.
+                    logger.error("Sync SubUnsub operation failed but no PubSubException was passed!");
+                    throw new ServiceDownException("Server ack response to SubUnsub request is not successful");
+                }
+                // For the expected exceptions that could occur, just rethrow
+                // them.
+                else if (failureException instanceof CouldNotConnectException)
+                    throw (CouldNotConnectException) failureException;
+                else if (failureException instanceof ClientAlreadySubscribedException)
+                    throw (ClientAlreadySubscribedException) failureException;
+                else if (failureException instanceof ClientNotSubscribedException)
+                    throw (ClientNotSubscribedException) failureException;
+                else if (failureException instanceof ServiceDownException)
+                    throw (ServiceDownException) failureException;
+                else {
+                    logger.error("Unexpected PubSubException thrown: " + failureException.toString());
+                    // Throw a generic ServiceDownException but wrap the
+                    // original PubSubException within it.
+                    throw new ServiceDownException(failureException);
+                }
+            }
+        }
+    }
+
+    // Private method that holds the common logic for doing asynchronous
+    // Subscribe or Unsubscribe requests. This is for code reuse since these two
+    // flows are very similar. The assumption is that the input OperationType is
+    // either SUBSCRIBE or UNSUBSCRIBE.
+    private void asyncSubUnsub(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object context,
+            OperationType operationType, CreateOrAttach createOrAttach) {
+        if (logger.isDebugEnabled())
+            logger.debug("Calling an async subUnsub request for topic: " + topic.toStringUtf8() + ", subscriberId: "
+                    + subscriberId.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: "
+                    + createOrAttach);
+        // Check if we know which server host is the master for the topic we are
+        // subscribing to.
+        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, createOrAttach, callback,
+                context);
+        if (client.topic2Host.containsKey(topic)) {
+            InetSocketAddress host = client.topic2Host.get(topic);
+            if (operationType.equals(OperationType.UNSUBSCRIBE) && client.getPublisher().host2Channel.containsKey(host)) {
+                // For unsubscribes, we can reuse the channel connections to the
+                // server host that are cached for publishes. For publish and
+                // unsubscribe flows, we will thus use the same Channels and
+                // will cache and store them during the ConnectCallback.
+                doSubUnsub(pubSubData, client.getPublisher().host2Channel.get(host));
+            } else {
+                // We know which server host is the master for the topic so
+                // connect to that first. For subscribes, we want a new channel
+                // connection each time for the TopicSubscriber. If the
+                // TopicSubscriber is already connected and subscribed,
+                // we assume the server will respond with an appropriate status
+                // indicating this. For unsubscribes, it is possible that the
+                // client is subscribed to the topic already but does not
+                // have a Channel connection yet to the server host. e.g. Client
+                // goes down and comes back up but client side soft state memory
+                // does not have the netty Channel connection anymore.
+                client.doConnect(pubSubData, host);
+            }
+        } else {
+            // Server host for the given topic is not known yet so use the
+            // default server host/port as defined in the configs. This should
+            // point to the server VIP which would redirect to a random server
+            // (which might not be the server hosting the topic).
+            client.doConnect(pubSubData, cfg.getDefaultServerHost());
+        }
+    }
+
+    public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
+            throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
+            InvalidSubscriberIdException {
+        subscribe(topic, subscriberId, mode, false);
+    }
+
+    protected void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, boolean isHub)
+            throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
+            InvalidSubscriberIdException {
+        // Validate that the format of the subscriberId is valid either as a
+        // local or hub subscriber.
+        if (!isValidSubscriberId(subscriberId, isHub)) {
+            throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8()
+                    + ", isHub: " + isHub);
+        }
+        try {
+            subUnsub(topic, subscriberId, OperationType.SUBSCRIBE, mode);
+        } catch (ClientNotSubscribedException e) {
+            logger.error("Unexpected Exception thrown: " + e.toString());
+            // This exception should never be thrown here. But just in case,
+            // throw a generic ServiceDownException but wrap the original
+            // Exception within it.
+            throw new ServiceDownException(e);
+        }
+    }
+
+    public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
+            Object context) {
+        asyncSubscribe(topic, subscriberId, mode, callback, context, false);
+    }
+
+    protected void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode,
+            Callback<Void> callback, Object context, boolean isHub) {
+        // Validate that the format of the subscriberId is valid either as a
+        // local or hub subscriber.
+        if (!isValidSubscriberId(subscriberId, isHub)) {
+            callback.operationFailed(context, new ServiceDownException(new InvalidSubscriberIdException(
+                    "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
+            return;
+        }
+        asyncSubUnsub(topic, subscriberId, callback, context, OperationType.SUBSCRIBE, mode);
+    }
+
+    public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
+            ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException {
+        unsubscribe(topic, subscriberId, false);
+    }
+
+    protected void unsubscribe(ByteString topic, ByteString subscriberId, boolean isHub)
+            throws CouldNotConnectException, ClientNotSubscribedException, ServiceDownException,
+            InvalidSubscriberIdException {
+        // Validate that the format of the subscriberId is valid either as a
+        // local or hub subscriber.
+        if (!isValidSubscriberId(subscriberId, isHub)) {
+            throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8()
+                    + ", isHub: " + isHub);
+        }
+        // Synchronously close the subscription on the client side. Even
+        // if the unsubscribe request to the server errors out, we won't be
+        // delivering messages for this subscription to the client. The client
+        // can later retry the unsubscribe request to the server so they are
+        // "fully" unsubscribed from the given topic.
+        closeSubscription(topic, subscriberId);
+        try {
+            subUnsub(topic, subscriberId, OperationType.UNSUBSCRIBE, null);
+        } catch (ClientAlreadySubscribedException e) {
+            logger.error("Unexpected Exception thrown: " + e.toString());
+            // This exception should never be thrown here. But just in case,
+            // throw a generic ServiceDownException but wrap the original
+            // Exception within it.
+            throw new ServiceDownException(e);
+        }
+    }
+
+    public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId, final Callback<Void> callback,
+            final Object context) {
+        asyncUnsubscribe(topic, subscriberId, callback, context, false);
+    }
+
+    protected void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
+            final Callback<Void> callback, final Object context, boolean isHub) {
+        // Validate that the format of the subscriberId is valid either as a
+        // local or hub subscriber.
+        if (!isValidSubscriberId(subscriberId, isHub)) {
+            callback.operationFailed(context, new ServiceDownException(new InvalidSubscriberIdException(
+                    "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
+            return;
+        }
+        // Asynchronously close the subscription. On the callback to that
+        // operation once it completes, post the async unsubscribe request.
+        asyncCloseSubscription(topic, subscriberId, new Callback<Void>() {
+            @Override
+            public void operationFinished(Object ctx, Void resultOfOperation) {
+                asyncSubUnsub(topic, subscriberId, callback, context, OperationType.UNSUBSCRIBE, null);
+            }
+
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                callback.operationFailed(context, exception);
+            }
+        }, null);
+    }
+
+    // This is a helper method to determine if a subscriberId is valid as either
+    // a hub or local subscriber
+    private boolean isValidSubscriberId(ByteString subscriberId, boolean isHub) {
+        if ((isHub && !SubscriptionStateUtils.isHubSubscriber(subscriberId))
+                || (!isHub && SubscriptionStateUtils.isHubSubscriber(subscriberId)))
+            return false;
+        else
+            return true;
+    }
+
+    public void consume(ByteString topic, ByteString subscriberId, MessageSeqId messageSeqId)
+            throws ClientNotSubscribedException {
+        if (logger.isDebugEnabled())
+            logger.debug("Calling consume for topic: " + topic.toStringUtf8() + ", subscriberId: "
+                    + subscriberId.toStringUtf8() + ", messageSeqId: " + messageSeqId);
+        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
+        // Check that this topic subscription on the client side exists.
+        if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
+            throw new ClientNotSubscribedException(
+                    "Cannot send consume message since client is not subscribed to topic: " + topic.toStringUtf8()
+                            + ", subscriberId: " + subscriberId.toStringUtf8());
+        }
+        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, OperationType.CONSUME, null, null, null);
+        // Send the consume message to the server using the same subscribe
+        // channel that the topic subscription uses.
+        doConsume(pubSubData, topicSubscriber2Channel.get(topicSubscriber), messageSeqId);
+    }
+
+    /**
+     * This is a helper method to write the actual subscribe/unsubscribe message
+     * once the client is connected to the server and a Channel is available.
+     * 
+     * @param pubSubData
+     *            Subscribe/Unsubscribe call's data wrapper object. We assume
+     *            that the operationType field is either SUBSCRIBE or
+     *            UNSUBSCRIBE.
+     * @param channel
+     *            Netty I/O channel for communication between the client and
+     *            server
+     */
+    protected void doSubUnsub(PubSubData pubSubData, Channel channel) {
+        // Create a PubSubRequest
+        PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
+        pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
+        pubsubRequestBuilder.setType(pubSubData.operationType);
+        if (pubSubData.triedServers != null && pubSubData.triedServers.size() > 0) {
+            pubsubRequestBuilder.addAllTriedServers(pubSubData.triedServers);
+        }
+        long txnId = client.globalCounter.incrementAndGet();
+        pubsubRequestBuilder.setTxnId(txnId);
+        pubsubRequestBuilder.setShouldClaim(pubSubData.shouldClaim);
+        pubsubRequestBuilder.setTopic(pubSubData.topic);
+
+        // Create either the Subscribe or Unsubscribe Request
+        if (pubSubData.operationType.equals(OperationType.SUBSCRIBE)) {
+            // Create the SubscribeRequest
+            SubscribeRequest.Builder subscribeRequestBuilder = SubscribeRequest.newBuilder();
+            subscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
+            subscribeRequestBuilder.setCreateOrAttach(pubSubData.createOrAttach);
+            // For now, all subscribes should wait for all cross-regional
+            // subscriptions to be established before returning.
+            subscribeRequestBuilder.setSynchronous(true);
+
+            // Set the SubscribeRequest into the outer PubSubRequest
+            pubsubRequestBuilder.setSubscribeRequest(subscribeRequestBuilder);
+        } else {
+            // Create the UnSubscribeRequest
+            UnsubscribeRequest.Builder unsubscribeRequestBuilder = UnsubscribeRequest.newBuilder();
+            unsubscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
+
+            // Set the UnsubscribeRequest into the outer PubSubRequest
+            pubsubRequestBuilder.setUnsubscribeRequest(unsubscribeRequestBuilder);
+        }
+
+        // Update the PubSubData with the txnId and the requestWriteTime
+        pubSubData.txnId = txnId;
+        pubSubData.requestWriteTime = System.currentTimeMillis();
+
+        // Before we do the write, store this information into the
+        // ResponseHandler so when the server responds, we know what
+        // appropriate Callback Data to invoke for the given txn ID.
+        HedwigClient.getResponseHandlerFromChannel(channel).txn2PubSubData.put(txnId, pubSubData);
+
+        // Finally, write the Subscribe request through the Channel.
+        if (logger.isDebugEnabled())
+            logger.debug("Writing a SubUnsub request to host: " + HedwigClient.getHostFromChannel(channel)
+                    + " for pubSubData: " + pubSubData);
+        ChannelFuture future = channel.write(pubsubRequestBuilder.build());
+        future.addListener(new WriteCallback(pubSubData, client));
+    }
+
+    /**
+     * This is a helper method to write a consume message to the server after a
+     * subscribe Channel connection is made to the server and messages are being
+     * consumed by the client.
+     * 
+     * @param pubSubData
+     *            Consume call's data wrapper object. We assume that the
+     *            operationType field is CONSUME.
+     * @param channel
+     *            Netty I/O channel for communication between the client and
+     *            server
+     * @param messageSeqId
+     *            Message Seq ID for the latest/last message the client has
+     *            consumed.
+     */
+    public void doConsume(final PubSubData pubSubData, final Channel channel, final MessageSeqId messageSeqId) {
+        // Create a PubSubRequest
+        PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
+        pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
+        pubsubRequestBuilder.setType(OperationType.CONSUME);
+        long txnId = client.globalCounter.incrementAndGet();
+        pubsubRequestBuilder.setTxnId(txnId);
+        pubsubRequestBuilder.setTopic(pubSubData.topic);
+
+        // Create the ConsumeRequest
+        ConsumeRequest.Builder consumeRequestBuilder = ConsumeRequest.newBuilder();
+        consumeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
+        consumeRequestBuilder.setMsgId(messageSeqId);
+
+        // Set the ConsumeRequest into the outer PubSubRequest
+        pubsubRequestBuilder.setConsumeRequest(consumeRequestBuilder);
+
+        // For Consume requests, we will send them from the client in a fire and
+        // forget manner. We are not expecting the server to send back an ack
+        // response so no need to register this in the ResponseHandler. There
+        // are no callbacks to invoke since this isn't a client initiated
+        // action. Instead, just have a future listener that will log an error
+        // message if there was a problem writing the consume request.
+        if (logger.isDebugEnabled())
+            logger.debug("Writing a Consume request to host: " + HedwigClient.getHostFromChannel(channel)
+                    + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
+        ChannelFuture future = channel.write(pubsubRequestBuilder.build());
+        future.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                if (!future.isSuccess()) {
+                    logger.error("Error writing a Consume request to host: " + HedwigClient.getHostFromChannel(channel)
+                            + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);                    
+                }
+            }
+        });
+
+    }
+
+    public boolean hasSubscription(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
+            ServiceDownException {
+        // The subscription type of info should be stored on the server end, not
+        // the client side. Eventually, the server will have the Subscription
+        // Manager part that ties into Zookeeper to manage this info.
+        // Commenting out these type of API's related to that here for now until
+        // this data is available on the server. Will figure out what the
+        // correct way to contact the server to get this info is then.
+        // The client side just has soft memory state for client subscription
+        // information.
+        return topicSubscriber2Channel.containsKey(new TopicSubscriber(topic, subscriberId));
+    }
+
+    public List<ByteString> getSubscriptionList(ByteString subscriberId) throws CouldNotConnectException,
+            ServiceDownException {
+        // Same as the previous hasSubscription method, this data should reside
+        // on the server end, not the client side.
+        return null;
+    }
+
+    public void startDelivery(final ByteString topic, final ByteString subscriberId, MessageHandler messageHandler)
+            throws ClientNotSubscribedException {
+        if (logger.isDebugEnabled())
+            logger.debug("Starting delivery for topic: " + topic.toStringUtf8() + ", subscriberId: "
+                    + subscriberId.toStringUtf8());
+        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
+        // Make sure we know about this topic subscription on the client side
+        // exists. The assumption is that the client should have in memory the
+        // Channel created for the TopicSubscriber once the server has sent
+        // an ack response to the initial subscribe request.
+        if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
+            logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: "
+                    + subscriberId.toStringUtf8());
+            throw new ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8()
+                    + ", subscriberId: " + subscriberId.toStringUtf8());
+        }
+
+        // Register the MessageHandler with the subscribe Channel's
+        // Response Handler.
+        Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
+        HedwigClient.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
+                .setMessageHandler(messageHandler);
+        // Now make the TopicSubscriber Channel readable (it is set to not be
+        // readable when the initial subscription is done). Note that this is an
+        // asynchronous call. If this fails (not likely), the futureListener
+        // will just log an error message for now.
+        ChannelFuture future = topicSubscriberChannel.setReadable(true);
+        future.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                if (!future.isSuccess()) {
+                    logger.error("Unable to make subscriber Channel readable in startDelivery call for topic: "
+                            + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+                }
+            }
+        });
+    }
+
+    public void stopDelivery(final ByteString topic, final ByteString subscriberId) throws ClientNotSubscribedException {
+        if (logger.isDebugEnabled())
+            logger.debug("Stopping delivery for topic: " + topic.toStringUtf8() + ", subscriberId: "
+                    + subscriberId.toStringUtf8());
+        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
+        // Make sure we know that this topic subscription on the client side
+        // exists. The assumption is that the client should have in memory the
+        // Channel created for the TopicSubscriber once the server has sent
+        // an ack response to the initial subscribe request.
+        if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
+            logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: "
+                    + subscriberId.toStringUtf8());
+            throw new ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8()
+                    + ", subscriberId: " + subscriberId.toStringUtf8());
+        }
+
+        // Unregister the MessageHandler for the subscribe Channel's
+        // Response Handler.
+        Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
+        HedwigClient.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
+                .setMessageHandler(null);
+        // Now make the TopicSubscriber channel not-readable. This will buffer
+        // up messages if any are sent from the server. Note that this is an
+        // asynchronous call. If this fails (not likely), the futureListener
+        // will just log an error message for now.
+        ChannelFuture future = topicSubscriberChannel.setReadable(false);
+        future.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                if (!future.isSuccess()) {
+                    logger.error("Unable to make subscriber Channel not readable in stopDelivery call for topic: "
+                            + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+                }
+            }
+        });
+    }
+
+    public void closeSubscription(ByteString topic, ByteString subscriberId) throws ServiceDownException {
+        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, null, null, null, null);
+        synchronized (pubSubData) {
+            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
+            asyncCloseSubscription(topic, subscriberId, pubSubCallback, null);
+            try {
+                while (!pubSubData.isDone)
+                    pubSubData.wait();
+            } catch (InterruptedException e) {
+                throw new ServiceDownException("Interrupted Exception while waiting for asyncCloseSubscription call");
+            }
+            // Check from the PubSubCallback if it was successful or not.
+            if (!pubSubCallback.getIsCallSuccessful()) {
+                throw new ServiceDownException("Exception while trying to close the subscription for topic: "
+                        + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+            }
+        }
+    }
+
+    public void asyncCloseSubscription(final ByteString topic, final ByteString subscriberId,
+            final Callback<Void> callback, final Object context) {
+        if (logger.isDebugEnabled())
+            logger.debug("Closing subscription asynchronously for topic: " + topic.toStringUtf8() + ", subscriberId: "
+                    + subscriberId.toStringUtf8());
+        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
+        if (topicSubscriber2Channel.containsKey(topicSubscriber)) {
+            // Remove all cached references for the TopicSubscriber
+            Channel channel = topicSubscriber2Channel.get(topicSubscriber);
+            topicSubscriber2Channel.remove(topicSubscriber);
+            // Close the subscribe channel asynchronously.
+            HedwigClient.getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
+            ChannelFuture future = channel.close();
+            future.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (!future.isSuccess()) {
+                        logger.error("Failed to close the subscription channel for topic: " + topic.toStringUtf8()
+                                + ", subscriberId: " + subscriberId.toStringUtf8());
+                        callback.operationFailed(context, new ServiceDownException(
+                                "Failed to close the subscription channel for topic: " + topic.toStringUtf8()
+                                        + ", subscriberId: " + subscriberId.toStringUtf8()));
+                    } else {
+                        callback.operationFinished(context, null);
+                    }
+                }
+            });
+        } else {
+            logger.warn("Trying to close a subscription when we don't have a subscribe channel cached for topic: "
+                    + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+            callback.operationFinished(context, null);
+        }
+    }
+
+    // Public getter and setters for entries in the topic2Host Map.
+    // This is used for classes that need this information but are not in the
+    // same classpath.
+    public Channel getChannelForTopic(TopicSubscriber topic) {
+        return topicSubscriber2Channel.get(topic);
+    }
+
+    public void setChannelForTopic(TopicSubscriber topic, Channel channel) {
+        topicSubscriber2Channel.put(topic, channel);
+    }
+
+    public void removeChannelForTopic(TopicSubscriber topic) {
+        topicSubscriber2Channel.remove(topic);
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.exceptions.ServerRedirectLoopException;
+import org.apache.hedwig.client.exceptions.TooManyServerRedirectsException;
+import org.apache.hedwig.client.handlers.PublishResponseHandler;
+import org.apache.hedwig.client.handlers.SubscribeReconnectCallback;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+import org.apache.hedwig.client.handlers.UnsubscribeResponseHandler;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.exceptions.PubSubException.UncertainStateException;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+@ChannelPipelineCoverage("all")
+public class ResponseHandler extends SimpleChannelHandler {
+
+    private static Logger logger = Logger.getLogger(ResponseHandler.class);
+
+    // Concurrent Map to store for each async PubSub request, the txn ID
+    // and the corresponding PubSub call's data which stores the VoidCallback to
+    // invoke when we receive a PubSub ack response from the server.
+    // This is specific to this instance of the ResponseHandler which is
+    // tied to a specific netty Channel Pipeline.
+    protected final ConcurrentMap<Long, PubSubData> txn2PubSubData = new ConcurrentHashMap<Long, PubSubData>();
+
+    // Boolean indicating if we closed the channel this ResponseHandler is
+    // attached to explicitly or not. If so, we do not need to do the
+    // channel disconnected logic here.
+    public boolean channelClosedExplicitly = false;
+
+    private final HedwigClient client;
+    private final HedwigPublisher pub;
+    private final HedwigSubscriber sub;
+    private final ClientConfiguration cfg;
+
+    private final PublishResponseHandler pubHandler;
+    private final SubscribeResponseHandler subHandler;
+    private final UnsubscribeResponseHandler unsubHandler;
+
+    public ResponseHandler(HedwigClient client) {
+        this.client = client;
+        this.sub = client.getSubscriber();
+        this.pub = client.getPublisher();
+        this.cfg = client.getConfiguration();
+        this.pubHandler = new PublishResponseHandler(this);
+        this.subHandler = new SubscribeResponseHandler(this);
+        this.unsubHandler = new UnsubscribeResponseHandler(this);
+    }
+
+    // Public getters needed for the private members
+    public HedwigClient getClient() {
+        return client;
+    }
+
+    public HedwigSubscriber getSubscriber() {
+        return sub;
+    }
+
+    public ClientConfiguration getConfiguration() {
+        return cfg;
+    }
+
+    public SubscribeResponseHandler getSubscribeResponseHandler() {
+        return subHandler;
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+        // If the Message is not a PubSubResponse, just send it upstream and let
+        // something else handle it.
+        if (!(e.getMessage() instanceof PubSubResponse)) {
+            ctx.sendUpstream(e);
+        }
+        // Retrieve the PubSubResponse from the Message that was sent by the
+        // server.
+        PubSubResponse response = (PubSubResponse) e.getMessage();
+        if (logger.isDebugEnabled())
+            logger.debug("Response received from host: " + HedwigClient.getHostFromChannel(ctx.getChannel())
+                    + ", response: " + response);
+
+        // Determine if this PubSubResponse is an ack response for a PubSub
+        // Request or if it is a message being pushed to the client subscriber.
+        if (response.hasMessage()) {
+            // Subscribed messages being pushed to the client so handle/consume
+            // it and return.
+            subHandler.handleSubscribeMessage(response);
+            return;
+        }
+
+        // Response is an ack to a prior PubSubRequest so first retrieve the
+        // PubSub data for this txn.
+        PubSubData pubSubData = txn2PubSubData.containsKey(response.getTxnId()) ? txn2PubSubData.get(response
+                .getTxnId()) : null;
+        // Validate that the PubSub data for this txn is stored. If not, just
+        // log an error message and return since we don't know how to handle
+        // this.
+        if (pubSubData == null) {
+            logger.error("PubSub Data was not found for PubSubResponse: " + response);
+            return;
+        }
+
+        // Now that we've retrieved the PubSubData for this specific Txn ID, we
+        // can remove it from the Map.
+        txn2PubSubData.remove(response.getTxnId());
+
+        // Store the topic2Host mapping if this wasn't a server redirect. We'll
+        // assume that if the server was able to have an open Channel connection
+        // to the client, and responded with an ack message other than the
+        // NOT_RESPONSIBLE_FOR_TOPIC one, it is the correct topic master.
+        if (!response.getStatusCode().equals(StatusCode.NOT_RESPONSIBLE_FOR_TOPIC)) {
+            client.storeTopic2HostMapping(pubSubData, ctx.getChannel());
+        }
+
+        // Depending on the operation type, call the appropriate handler.
+        switch (pubSubData.operationType) {
+        case PUBLISH:
+            pubHandler.handlePublishResponse(response, pubSubData, ctx.getChannel());
+            break;
+        case SUBSCRIBE:
+            subHandler.handleSubscribeResponse(response, pubSubData, ctx.getChannel());
+            break;
+        case UNSUBSCRIBE:
+            unsubHandler.handleUnsubscribeResponse(response, pubSubData, ctx.getChannel());
+            break;
+        default:
+            // The above are the only expected PubSubResponse messages received
+            // from the server for the various client side requests made.
+            logger.error("Response received from server is for an unhandled operation type, txnId: "
+                    + response.getTxnId() + ", operationType: " + pubSubData.operationType);
+        }
+    }
+
+    /**
+     * Logic to repost a PubSubRequest when the server responds with a redirect
+     * indicating they are not the topic master.
+     * 
+     * @param response
+     *            PubSubResponse from the server for the redirect
+     * @param pubSubData
+     *            PubSubData for the original PubSubRequest made
+     * @param channel
+     *            Channel Channel we used to make the original PubSubRequest
+     * @throws Exception
+     *             Throws an exception if there was an error in doing the
+     *             redirect repost of the PubSubRequest
+     */
+    public void handleRedirectResponse(PubSubResponse response, PubSubData pubSubData, Channel channel)
+            throws Exception {
+        if (logger.isDebugEnabled())
+            logger.debug("Handling a redirect from host: " + HedwigClient.getHostFromChannel(channel) + ", response: "
+                    + response + ", pubSubData: " + pubSubData);
+        // In this case, the PubSub request was done to a server that is not
+        // responsible for the topic. First make sure that we haven't
+        // exceeded the maximum number of server redirects.
+        int curNumServerRedirects = (pubSubData.triedServers == null) ? 0 : pubSubData.triedServers.size();
+        if (curNumServerRedirects >= cfg.getMaximumServerRedirects()) {
+            // We've already exceeded the maximum number of server redirects
+            // so consider this as an error condition for the client.
+            // Invoke the operationFailed callback and just return.
+            if (logger.isDebugEnabled())
+                logger.debug("Exceeded the number of server redirects (" + curNumServerRedirects + ") so error out.");
+            pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
+                    new TooManyServerRedirectsException("Already reached max number of redirects: "
+                            + curNumServerRedirects)));
+            return;
+        }
+
+        // We will redirect and try to connect to the correct server
+        // stored in the StatusMsg of the response. First store the
+        // server that we sent the PubSub request to for the topic.
+        ByteString triedServer = ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(HedwigClient
+                .getHostFromChannel(channel)));
+        if (pubSubData.triedServers == null)
+            pubSubData.triedServers = new LinkedList<ByteString>();
+        pubSubData.shouldClaim = true;
+        pubSubData.triedServers.add(triedServer);
+
+        // Now get the redirected server host (expected format is
+        // Hostname:Port:SSLPort) from the server's response message. If one is
+        // not given for some reason, then redirect to the default server
+        // host/VIP to repost the request.
+        String statusMsg = response.getStatusMsg();
+        InetSocketAddress redirectedHost;
+        if (statusMsg != null && statusMsg.length() > 0) {
+            if (cfg.isSSLEnabled()) {
+                redirectedHost = new HedwigSocketAddress(statusMsg).getSSLSocketAddress();
+            } else {
+                redirectedHost = new HedwigSocketAddress(statusMsg).getSocketAddress();
+            }
+        } else {
+            redirectedHost = cfg.getDefaultServerHost();
+        }
+
+        // Make sure the redirected server is not one we've already attempted
+        // already before in this PubSub request.
+        if (pubSubData.triedServers.contains(ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(redirectedHost)))) {
+            logger.error("We've already sent this PubSubRequest before to redirectedHost: " + redirectedHost
+                    + ", pubSubData: " + pubSubData);
+            pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
+                    new ServerRedirectLoopException("Already made the request before to redirected host: "
+                            + redirectedHost)));
+            return;
+        }
+
+        // Check if we already have a Channel open to the redirected server
+        // host.
+        boolean redirectedHostChannelExists = pub.host2Channel.containsKey(redirectedHost) ? true : false;
+        if (pubSubData.operationType.equals(OperationType.SUBSCRIBE) || !redirectedHostChannelExists) {
+            // We don't have an existing channel to the redirected host OR this
+            // is a redirected Subscribe request. For Subscribe requests, we
+            // always want to create a new unique Channel connection to the
+            // topic master server for the TopicSubscriber.
+            client.doConnect(pubSubData, redirectedHost);
+        } else {
+            // For Publish and Unsubscribe requests, we can just post the
+            // request again directly on the existing cached redirected host
+            // channel.
+            if (pubSubData.operationType.equals(OperationType.PUBLISH)) {
+                pub.doPublish(pubSubData, pub.host2Channel.get(redirectedHost));
+            } else if (pubSubData.operationType.equals(OperationType.UNSUBSCRIBE)) {
+                sub.doSubUnsub(pubSubData, pub.host2Channel.get(redirectedHost));
+            }
+        }
+    }
+
+    // Logic to deal with what happens when a Channel to a server host is
+    // disconnected.
+    @Override
+    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+        // If this channel was closed explicitly by the client code,
+        // we do not need to do any of this logic. This could happen
+        // for redundant Publish channels created or redirected subscribe
+        // channels that are not used anymore or when we shutdown the
+        // client and manually close all of the open channels.
+        // Also don't do any of the disconnect logic if the client has stopped.
+        if (channelClosedExplicitly || client.hasStopped())
+            return;
+
+        // Make sure the host retrieved is not null as there could be some weird
+        // channel disconnect events happening during a client shutdown.
+        // If it is, just return as there shouldn't be anything we need to do.
+        InetSocketAddress host = HedwigClient.getHostFromChannel(ctx.getChannel());
+        logger.warn("Channel was disconnected to host: " + host);
+        if (host == null)
+            return;
+
+        // If this Channel was used for Publish and Unsubscribe flows, just
+        // remove it from the HewdigPublisher's host2Channel map. We will
+        // re-establish a Channel connection to that server when the next
+        // publish/unsubscribe request to a topic that the server owns occurs.
+        PubSubData origSubData = subHandler.getOrigSubData();
+
+        // Now determine what type of operation this channel was used for.
+        if (origSubData == null) {
+            // Only remove the Channel from the mapping if this current
+            // disconnected channel is the same as the cached entry.
+            // Due to race concurrency situations, it is possible to
+            // create multiple channels to the same host for publish
+            // and unsubscribe requests.
+            if (pub.host2Channel.containsKey(host) && pub.host2Channel.get(host).equals(ctx.getChannel())) {
+                if (logger.isDebugEnabled())
+                    logger.debug("Disconnected channel for host: " + host
+                            + " was for Publish/Unsubscribe requests so remove all references to it.");
+                pub.host2Channel.remove(host);
+                client.clearAllTopicsForHost(host);
+            }
+        } else {
+            // Subscribe channel disconnected so first close and clear all
+            // cached Channel data set up for this topic subscription.
+            sub.closeSubscription(origSubData.topic, origSubData.subscriberId);
+            client.clearAllTopicsForHost(host);
+            // Since the connection to the server host that was responsible
+            // for the topic died, we are not sure about the state of that
+            // server. Resend the original subscribe request data to the default
+            // server host/VIP. Also clear out all of the servers we've
+            // contacted or attempted to from this request as we are starting a
+            // "fresh" subscribe request.
+            origSubData.clearServersList();
+            // Set a new type of VoidCallback for this async call. We need this
+            // hook so after the subscribe reconnect has completed, delivery for
+            // that topic subscriber should also be restarted (if it was that
+            // case before the channel disconnect).
+            origSubData.callback = new SubscribeReconnectCallback(origSubData, client, subHandler.getMessageHandler());
+            origSubData.context = null;
+            if (logger.isDebugEnabled())
+                logger.debug("Disconnected subscribe channel so reconnect with origSubData: " + origSubData);
+            client.doConnect(origSubData, cfg.getDefaultServerHost());
+        }
+
+        // Finally, all of the PubSubRequests that are still waiting for an ack
+        // response from the server need to be removed and timed out. Invoke the
+        // operationFailed callbacks on all of them. Use the
+        // UncertainStateException since the server did receive the request but
+        // we're not sure of the state of the request since the ack response was
+        // never received.
+        for (PubSubData pubSubData : txn2PubSubData.values()) {
+            if (logger.isDebugEnabled())
+                logger.debug("Channel disconnected so invoking the operationFailed callback for pubSubData: "
+                        + pubSubData);
+            pubSubData.callback.operationFailed(pubSubData.context, new UncertainStateException(
+                    "Server ack response never received before server connection disconnected!"));
+        }
+        txn2PubSubData.clear();
+    }
+
+    // Logic to deal with what happens when a Channel to a server host is
+    // connected. This is needed if the client is using an SSL port to
+    // communicate with the server. If so, we need to do the SSL handshake here
+    // when the channel is first connected.
+    @Override
+    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+        // No need to initiate the SSL handshake if we are closing this channel
+        // explicitly or the client has been stopped.
+        if (cfg.isSSLEnabled() && !channelClosedExplicitly && !client.hasStopped()) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Initiating the SSL handshake");
+            }
+            ctx.getPipeline().get(SslHandler.class).handshake(e.getChannel());
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+        e.getCause().printStackTrace();
+        e.getChannel().close();
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public class WriteCallback implements ChannelFutureListener {
+
+    private static Logger logger = Logger.getLogger(WriteCallback.class);
+
+    // Private member variables
+    private PubSubData pubSubData;
+    private final HedwigClient client;
+    private final ClientConfiguration cfg;
+
+    // Constructor
+    public WriteCallback(PubSubData pubSubData, HedwigClient client) {
+        super();
+        this.pubSubData = pubSubData;
+        this.client = client;
+        this.cfg = client.getConfiguration();
+    }
+
+    public void operationComplete(ChannelFuture future) throws Exception {
+        // If the client has stopped, there is no need to proceed
+        // with any callback logic here.
+        if (client.hasStopped())
+            return;
+        
+        // When the write operation to the server is done, we just need to check
+        // if it was successful or not.
+        InetSocketAddress host = HedwigClient.getHostFromChannel(future.getChannel());
+        if (!future.isSuccess()) {
+            logger.error("Error writing on channel to host: " + host);
+            // On a write failure for a PubSubRequest, we also want to remove
+            // the saved txnId to PubSubData in the ResponseHandler. These
+            // requests will not receive an ack response from the server
+            // so there is no point storing that information there anymore.
+            HedwigClient.getResponseHandlerFromChannel(future.getChannel()).txn2PubSubData.remove(pubSubData.txnId);
+
+            // If we were not able to write on the channel to the server host,
+            // the host could have died or something is wrong with the channel
+            // connection where we can connect to the host, but not write to it.
+            ByteString hostString = (host == null) ? null : ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(host));
+            if (pubSubData.writeFailedServers != null && pubSubData.writeFailedServers.contains(hostString)) {
+                // We've already tried to write to this server previously and
+                // failed, so invoke the operationFailed callback.
+                logger.error("Error writing to host more than once so just invoke the operationFailed callback!");
+                pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
+                        "Error while writing message to server: " + hostString));
+            } else {
+                if (logger.isDebugEnabled())
+                    logger.debug("Try to send the PubSubRequest again to the default server host/VIP for pubSubData: "
+                            + pubSubData);
+                // Keep track of this current server that we failed to write to
+                // but retry the request on the default server host/VIP.
+                if (pubSubData.writeFailedServers == null)
+                    pubSubData.writeFailedServers = new LinkedList<ByteString>();
+                pubSubData.writeFailedServers.add(hostString);
+                client.doConnect(pubSubData, cfg.getDefaultServerHost());
+            }
+        } else {
+            // Now that the write to the server is done, we have to wait for it
+            // to respond. The ResponseHandler will take care of the ack
+            // response from the server before we can determine if the async
+            // PubSub call has really completed successfully or not.
+            if (logger.isDebugEnabled())
+                logger.debug("Successfully wrote to host: " + host + " for pubSubData: " + pubSubData);
+        }
+    }
+
+}