You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/08/19 23:25:22 UTC

svn commit: r987314 [5/16] - in /hadoop/zookeeper/trunk: ./ src/contrib/hedwig/ src/contrib/hedwig/client/ src/contrib/hedwig/client/src/ src/contrib/hedwig/client/src/main/ src/contrib/hedwig/client/src/main/cpp/ src/contrib/hedwig/client/src/main/cpp...

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.benchmark;
+
+import java.io.File;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.Logger;
+import org.jboss.netty.logging.InternalLoggerFactory;
+import org.jboss.netty.logging.Log4JLoggerFactory;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.HedwigPublisher;
+import org.apache.hedwig.client.netty.HedwigSubscriber;
+
+public class HedwigBenchmark implements Callable<Void> {
+    protected static final Logger logger = Logger.getLogger(HedwigBenchmark.class);
+
+    static final String TOPIC_PREFIX = "topic-";
+
+    private final HedwigClient client;
+    private final HedwigPublisher publisher;
+    private final HedwigSubscriber subscriber;
+
+    public HedwigBenchmark(ClientConfiguration cfg) {
+        client = new HedwigClient(cfg);
+        publisher = client.getPublisher();
+        subscriber = client.getSubscriber();
+    }
+
+    static boolean amIResponsibleForTopic(int topicNum, int partitionIndex, int numPartitions) {
+        return topicNum % numPartitions == partitionIndex;
+    }
+
+    @Override
+    public Void call() throws Exception {
+
+        //
+        // Parameters.
+        //
+
+        // What program to run: pub, sub (subscription benchmark), recv.
+        final String mode = System.getProperty("mode","");
+
+        // Number of requests to make (publishes or subscribes).
+        int numTopics = Integer.getInteger("nTopics", 50);
+        int numMessages = Integer.getInteger("nMsgs", 1000);
+        int numRegions = Integer.getInteger("nRegions", 1);
+        int startTopicLabel = Integer.getInteger("startTopicLabel", 0);
+        int partitionIndex = Integer.getInteger("partitionIndex", 0);
+        int numPartitions = Integer.getInteger("nPartitions", 1);
+
+        int replicaIndex = Integer.getInteger("replicaIndex", 0);
+
+        int rate = Integer.getInteger("rate", 0);
+        int nParallel = Integer.getInteger("npar", 100);
+        int msgSize = Integer.getInteger("msgSize", 1024);
+
+        // Number of warmup subscriptions to make.
+        final int nWarmups = Integer.getInteger("nwarmups", 1000);
+
+        if (mode.equals("sub")) {
+            BenchmarkSubscriber benchmarkSub = new BenchmarkSubscriber(numTopics, 0, 1, startTopicLabel, 0, 1,
+                    subscriber, ByteString.copyFromUtf8("mySub"));
+
+            benchmarkSub.warmup(nWarmups);
+            benchmarkSub.call();
+
+        } else if (mode.equals("recv")) {
+
+            BenchmarkSubscriber benchmarkSub = new BenchmarkSubscriber(numTopics, numMessages, numRegions,
+                    startTopicLabel, partitionIndex, numPartitions, subscriber, ByteString.copyFromUtf8("sub-"
+                            + replicaIndex));
+
+            benchmarkSub.call();
+
+        } else if (mode.equals("pub")) {
+            // Offered load in msgs/second.
+            BenchmarkPublisher benchmarkPub = new BenchmarkPublisher(numTopics, numMessages, numRegions,
+                    startTopicLabel, partitionIndex, numPartitions, publisher, subscriber, msgSize, nParallel, rate);
+            benchmarkPub.warmup(nWarmups);
+            benchmarkPub.call();
+            
+        } else {
+            throw new Exception("unknown mode: " + mode);
+        }
+
+        return null;
+    }
+
+    public static void main(String[] args) throws Exception {
+        ClientConfiguration cfg = new ClientConfiguration();
+        if (args.length > 0) {
+            String confFile = args[0];
+            try {
+                cfg.loadConf(new File(confFile).toURI().toURL());
+            } catch (ConfigurationException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
+
+        HedwigBenchmark app = new HedwigBenchmark(cfg);
+        app.call();
+        System.exit(0);
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.conf;
+
+import java.net.InetSocketAddress;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.Logger;
+
+import org.apache.hedwig.conf.AbstractConfiguration;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public class ClientConfiguration extends AbstractConfiguration {
+    Logger logger = Logger.getLogger(ClientConfiguration.class);
+
+    // Protected member variables for configuration parameter names
+    protected static final String DEFAULT_SERVER_HOST = "default_server_host";
+    protected static final String MAX_MESSAGE_SIZE = "max_message_size";
+    protected static final String MAX_SERVER_REDIRECTS = "max_server_redirects";
+    protected static final String AUTO_SEND_CONSUME_MESSAGE_ENABLED = "auto_send_consume_message_enabled";
+    protected static final String CONSUMED_MESSAGES_BUFFER_SIZE = "consumed_messages_buffer_size";
+    protected static final String MESSAGE_CONSUME_RETRY_WAIT_TIME = "message_consume_retry_wait_time";
+    protected static final String SUBSCRIBE_RECONNECT_RETRY_WAIT_TIME = "subscribe_reconnect_retry_wait_time";
+    protected static final String MAX_OUTSTANDING_MESSAGES = "max_outstanding_messages";
+    protected static final String SERVER_ACK_RESPONSE_TIMEOUT = "server_ack_response_timeout";
+    protected static final String TIMEOUT_THREAD_RUN_INTERVAL = "timeout_thread_run_interval";
+    protected static final String SSL_ENABLED = "ssl_enabled";
+
+    // Singletons we want to instantiate only once per ClientConfiguration
+    protected HedwigSocketAddress myDefaultServerAddress = null;
+
+    // Getters for the various Client Configuration parameters.
+    // This should point to the default server host, or the VIP fronting all of
+    // the server hubs. This will return the HedwigSocketAddress which
+    // encapsulates both the regular and SSL port connection to the server host.
+    protected HedwigSocketAddress getDefaultServerHedwigSocketAddress() {
+        if (myDefaultServerAddress == null)
+            myDefaultServerAddress = new HedwigSocketAddress(conf.getString(DEFAULT_SERVER_HOST, "localhost:4080:9876"));
+        return myDefaultServerAddress;
+    }
+
+    // This will get the default server InetSocketAddress based on if SSL is
+    // enabled or not.
+    public InetSocketAddress getDefaultServerHost() {
+        if (isSSLEnabled())
+            return getDefaultServerHedwigSocketAddress().getSSLSocketAddress();
+        else
+            return getDefaultServerHedwigSocketAddress().getSocketAddress();
+    }
+
+    public int getMaximumMessageSize() {
+        return conf.getInt(MAX_MESSAGE_SIZE, 2 * 1024 * 1024);
+    }
+
+    // This parameter is for setting the maximum number of server redirects to
+    // allow before we consider it as an error condition. This is to stop
+    // infinite redirect loops in case there is a problem with the hub servers
+    // topic mastership.
+    public int getMaximumServerRedirects() {
+        return conf.getInt(MAX_SERVER_REDIRECTS, 2);
+    }
+
+    // This parameter is a boolean flag indicating if the client library should
+    // automatically send the consume message to the server based on the
+    // configured amount of messages consumed by the client app. The client app
+    // could choose to override this behavior and instead, manually send the
+    // consume message to the server via the client library using its own 
+    // logic and policy.
+    public boolean isAutoSendConsumeMessageEnabled() {
+        return conf.getBoolean(AUTO_SEND_CONSUME_MESSAGE_ENABLED, true);
+    }
+
+    // This parameter is to set how many consumed messages we'll buffer up
+    // before we send the Consume message to the server indicating that all
+    // of the messages up to that point have been successfully consumed by
+    // the client.
+    public int getConsumedMessagesBufferSize() {
+        return conf.getInt(CONSUMED_MESSAGES_BUFFER_SIZE, 5);
+    }
+
+    // This parameter is used to determine how long we wait before retrying the
+    // client app's MessageHandler to consume a subscribed messages sent to us
+    // from the server. The time to wait is in milliseconds.
+    public long getMessageConsumeRetryWaitTime() {
+        return conf.getLong(MESSAGE_CONSUME_RETRY_WAIT_TIME, 10000);
+    }
+
+    // This parameter is used to determine how long we wait before retrying the
+    // Subscribe Reconnect request. This is done when the connection to a server
+    // disconnects and we attempt to connect to it. We'll keep on trying but
+    // in case the server(s) is down for a longer time, we want to throttle
+    // how often we do the subscribe reconnect request. The time to wait is in
+    // milliseconds.
+    public long getSubscribeReconnectRetryWaitTime() {
+        return conf.getLong(SUBSCRIBE_RECONNECT_RETRY_WAIT_TIME, 10000);
+    }
+
+    // This parameter is for setting the maximum number of outstanding messages
+    // the client app can be consuming at a time for topic subscription before
+    // we throttle things and stop reading from the Netty Channel.
+    public int getMaximumOutstandingMessages() {
+        return conf.getInt(MAX_OUTSTANDING_MESSAGES, 10);
+    }
+
+    // This parameter is used to determine how long we wait (in milliseconds)
+    // before we time out outstanding PubSubRequests that were written to the
+    // server successfully but haven't yet received the ack response.
+    public long getServerAckResponseTimeout() {
+        return conf.getLong(SERVER_ACK_RESPONSE_TIMEOUT, 30000);
+    }
+
+    // This parameter is used to determine how often we run the server ack
+    // response timeout cleaner thread (in milliseconds).
+    public long getTimeoutThreadRunInterval() {
+        return conf.getLong(TIMEOUT_THREAD_RUN_INTERVAL, 60000);
+    }
+
+    // This parameter is a boolean flag indicating if communication with the
+    // server should be done via SSL for encryption. This is needed for
+    // cross-colo hub clients listening to non-local servers.
+    public boolean isSSLEnabled() {
+        return conf.getBoolean(SSL_ENABLED, false);
+    }
+
+    // Validate that the configuration properties are valid.
+    public void validate() throws ConfigurationException {
+        if (isSSLEnabled() && getDefaultServerHedwigSocketAddress().getSSLSocketAddress() == null) {
+            throw new ConfigurationException("SSL is enabled but a default server SSL port not given!");
+        }
+        // Add other validation checks here
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.data;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+
+/**
+ * Wrapper class to store all of the data points needed to encapsulate Message
+ * Consumption in the Subscribe flow for consuming a message sent from the
+ * server for a given TopicSubscriber. This will be used as the Context in the
+ * VoidCallback for the MessageHandlers once they've completed consuming the
+ * message.
+ * 
+ */
+public class MessageConsumeData {
+
+    // Member variables
+    public final ByteString topic;
+    public final ByteString subscriberId;
+    // This is the Message sent from the server for Subscribes for consumption
+    // by the client.
+    public final Message msg;
+
+    // Constructor
+    public MessageConsumeData(final ByteString topic, final ByteString subscriberId, final Message msg) {
+        this.topic = topic;
+        this.subscriberId = subscriberId;
+        this.msg = msg;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        if (topic != null)
+            sb.append("Topic: " + topic.toStringUtf8());
+        if (subscriberId != null)
+            sb.append(PubSubData.COMMA).append("SubscriberId: " + subscriberId.toStringUtf8());
+        if (msg != null)
+            sb.append(PubSubData.COMMA).append("Message: " + msg);
+        return sb.toString();
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/data/PubSubData.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/data/PubSubData.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/data/PubSubData.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/data/PubSubData.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.data;
+
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * Wrapper class to store all of the data points needed to encapsulate all
+ * PubSub type of request operations the client will do. This includes knowing
+ * all of the information needed if we need to redo the publish/subscribe
+ * request in case of a server redirect. This will be used for all sync/async
+ * calls, and for all the known types of request messages to send to the server
+ * hubs: Publish, Subscribe, Unsubscribe, and Consume.
+ * 
+ */
+public class PubSubData {
+    // Static string constants
+    protected static final String COMMA = ", ";
+
+    // Member variables needed during object construction time.
+    public final ByteString topic;
+    public final Message msg;
+    public final ByteString subscriberId;
+    // Enum to indicate what type of operation this PubSub request data object
+    // is for.
+    public final OperationType operationType;
+    // Enum for subscribe requests to indicate if this is a CREATE, ATTACH, or
+    // CREATE_OR_ATTACH subscription request. For non-subscribe requests,
+    // this will be null.
+    public final CreateOrAttach createOrAttach;
+    // These two variables are not final since we might override them
+    // in the case of a Subscribe reconnect.
+    public Callback<Void> callback;
+    public Object context;
+
+    // Member variables used after object has been constructed.
+    // List of all servers we've sent the PubSubRequest to successfully.
+    // This is to keep track of redirected servers that responded back to us.
+    public List<ByteString> triedServers;
+    // List of all servers that we've tried to connect or write to but
+    // was unsuccessful. We'll retry sending the PubSubRequest but will
+    // quit if we're trying to connect or write to a server that we've
+    // attempted to previously.
+    public List<ByteString> connectFailedServers;
+    public List<ByteString> writeFailedServers;
+    // Boolean to the hub server indicating if it should claim ownership
+    // of the topic the PubSubRequest is for. This is mainly used after
+    // a server redirect. Defaults to false.
+    public boolean shouldClaim = false;
+    // TxnID for the PubSubData if it was sent as a PubSubRequest to the hub
+    // server. This is used in the WriteCallback in case of failure. We want
+    // to remove it from the ResponseHandler.txn2PubSubData map since the
+    // failed PubSubRequest will not get an ack response from the server.
+    // This is set later in the PubSub flows only when we write the actual
+    // request. Therefore it is not an argument in the constructor.
+    public long txnId;
+    // Time in milliseconds using the System.currentTimeMillis() call when the
+    // PubSubRequest was written on the netty Channel to the server.
+    public long requestWriteTime;
+    // For synchronous calls, this variable is used to know when the background
+    // async process for it has completed, set in the VoidCallback.
+    public boolean isDone = false;
+
+    // Constructor for all types of PubSub request data to send to the server
+    public PubSubData(final ByteString topic, final Message msg, final ByteString subscriberId,
+            final OperationType operationType, final CreateOrAttach createOrAttach, final Callback<Void> callback,
+            final Object context) {
+        this.topic = topic;
+        this.msg = msg;
+        this.subscriberId = subscriberId;
+        this.operationType = operationType;
+        this.createOrAttach = createOrAttach;
+        this.callback = callback;
+        this.context = context;
+    }
+
+    // Clear all of the stored servers we've contacted or attempted to in this
+    // request.
+    public void clearServersList() {
+        if (triedServers != null)
+            triedServers.clear();
+        if (connectFailedServers != null)
+            connectFailedServers.clear();
+        if (writeFailedServers != null)
+            writeFailedServers.clear();
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        if (topic != null)
+            sb.append("Topic: " + topic.toStringUtf8());
+        if (msg != null)
+            sb.append(COMMA).append("Message: " + msg);
+        if (subscriberId != null)
+            sb.append(COMMA).append("SubscriberId: " + subscriberId.toStringUtf8());
+        if (operationType != null)
+            sb.append(COMMA).append("Operation Type: " + operationType.toString());
+        if (createOrAttach != null)
+            sb.append(COMMA).append("Create Or Attach: " + createOrAttach.toString());
+        if (triedServers != null && triedServers.size() > 0) {
+            sb.append(COMMA).append("Tried Servers: ");
+            for (ByteString triedServer : triedServers) {
+                sb.append(triedServer.toStringUtf8()).append(COMMA);
+            }
+        }
+        if (connectFailedServers != null && connectFailedServers.size() > 0) {
+            sb.append(COMMA).append("Connect Failed Servers: ");
+            for (ByteString connectFailedServer : connectFailedServers) {
+                sb.append(connectFailedServer.toStringUtf8()).append(COMMA);
+            }
+        }
+        if (writeFailedServers != null && writeFailedServers.size() > 0) {
+            sb.append(COMMA).append("Write Failed Servers: ");
+            for (ByteString writeFailedServer : writeFailedServers) {
+                sb.append(writeFailedServer.toStringUtf8()).append(COMMA);
+            }
+        }
+        sb.append(COMMA).append("Should Claim: " + shouldClaim);
+        if (txnId != 0)
+            sb.append(COMMA).append("TxnID: " + txnId);
+        if (requestWriteTime != 0)
+            sb.append(COMMA).append("Request Write Time: " + requestWriteTime);
+        sb.append(COMMA).append("Is Done: " + isDone);
+        return sb.toString();
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/data/TopicSubscriber.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/data/TopicSubscriber.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/data/TopicSubscriber.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/data/TopicSubscriber.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.data;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * Wrapper class object for the Topic + SubscriberId combination. Since the
+ * Subscribe flows always use the Topic + SubscriberId as the logical entity,
+ * we'll create a simple class to encapsulate that.
+ * 
+ */
+public class TopicSubscriber {
+    private final ByteString topic;
+    private final ByteString subscriberId;
+    private final int hashCode;
+
+    public TopicSubscriber(final ByteString topic, final ByteString subscriberId) {
+        this.topic = topic;
+        this.subscriberId = subscriberId;
+        hashCode = new HashCodeBuilder().append(topic).append(subscriberId).toHashCode();
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (o == this)
+            return true;
+        if (!(o instanceof TopicSubscriber))
+            return false;
+        final TopicSubscriber obj = (TopicSubscriber) o;
+        return topic.equals(obj.topic) && subscriberId.equals(obj.subscriberId);
+    }
+
+    @Override
+    public int hashCode() {
+        return hashCode;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        if (topic != null)
+            sb.append("Topic: " + topic.toStringUtf8());
+        if (subscriberId != null)
+            sb.append(PubSubData.COMMA).append("SubscriberId: " + subscriberId.toStringUtf8());
+        return sb.toString();
+    }
+    
+    public ByteString getTopic() {
+        return topic;
+    }
+    
+    public ByteString getSubscriberId() {
+        return subscriberId;
+    }
+
+}
\ No newline at end of file

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/exceptions/InvalidSubscriberIdException.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/exceptions/InvalidSubscriberIdException.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/exceptions/InvalidSubscriberIdException.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/exceptions/InvalidSubscriberIdException.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.exceptions;
+
+/**
+ * This is a Hedwig client side exception when the local client wants to do
+ * subscribe type of operations. Currently, to distinguish between local and hub
+ * subscribers, the subscriberId will have a specific format.
+ */
+public class InvalidSubscriberIdException extends Exception {
+
+    private static final long serialVersionUID = 873259807218723523L;
+
+    public InvalidSubscriberIdException(String message) {
+        super(message);
+    }
+
+    public InvalidSubscriberIdException(String message, Throwable t) {
+        super(message, t);
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/exceptions/ServerRedirectLoopException.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/exceptions/ServerRedirectLoopException.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/exceptions/ServerRedirectLoopException.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/exceptions/ServerRedirectLoopException.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.exceptions;
+
+/**
+ * This is a Hedwig client side exception when the PubSubRequest is being
+ * redirected to a server where the request has already been sent to previously. 
+ * To avoid having a cyclical redirect loop, this condition is checked for
+ * and this exception will be thrown to the client caller. 
+ */
+public class ServerRedirectLoopException extends Exception {
+
+    private static final long serialVersionUID = 98723508723152897L;
+
+    public ServerRedirectLoopException(String message) {
+        super(message);
+    }
+
+    public ServerRedirectLoopException(String message, Throwable t) {
+        super(message, t);
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/exceptions/TooManyServerRedirectsException.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/exceptions/TooManyServerRedirectsException.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/exceptions/TooManyServerRedirectsException.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/exceptions/TooManyServerRedirectsException.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.exceptions;
+
+/**
+ * This is a Hedwig client side exception when there have been too many server
+ * redirects during a publish/subscribe call. We only allow a certain number of
+ * server redirects to find the topic master. If we have exceeded this
+ * configured amount, the publish/subscribe will fail with this exception.
+ * 
+ */
+public class TooManyServerRedirectsException extends Exception {
+
+    private static final long serialVersionUID = 2341192937965635310L;
+
+    public TooManyServerRedirectsException(String message) {
+        super(message);
+    }
+
+    public TooManyServerRedirectsException(String message, Throwable t) {
+        super(message, t);
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.handlers;
+
+import java.util.TimerTask;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.client.data.MessageConsumeData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * This is the Callback used by the MessageHandlers on the client app when
+ * they've finished consuming a subscription message sent from the server
+ * asynchronously. This callback back to the client libs will be stateless so we
+ * can use a singleton for the class. The object context used should be the
+ * MessageConsumeData type. That will contain all of the information needed to
+ * call the message consume logic in the client lib ResponseHandler.
+ * 
+ */
+public class MessageConsumeCallback implements Callback<Void> {
+
+    private static Logger logger = Logger.getLogger(MessageConsumeCallback.class);
+
+    private final HedwigClient client;
+
+    public MessageConsumeCallback(HedwigClient client) {
+        this.client = client;
+    }
+
+    class MessageConsumeRetryTask extends TimerTask {
+        private final MessageConsumeData messageConsumeData;
+        private final TopicSubscriber topicSubscriber;
+
+        public MessageConsumeRetryTask(MessageConsumeData messageConsumeData, TopicSubscriber topicSubscriber) {
+            this.messageConsumeData = messageConsumeData;
+            this.topicSubscriber = topicSubscriber;
+        }
+
+        @Override
+        public void run() {
+            // Try to consume the message again
+            Channel topicSubscriberChannel = client.getSubscriber().getChannelForTopic(topicSubscriber);
+            HedwigClient.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
+                    .asyncMessageConsume(messageConsumeData.msg);
+        }
+    }
+
+    public void operationFinished(Object ctx, Void resultOfOperation) {
+        MessageConsumeData messageConsumeData = (MessageConsumeData) ctx;
+        TopicSubscriber topicSubscriber = new TopicSubscriber(messageConsumeData.topic, messageConsumeData.subscriberId);
+        // Message has been successfully consumed by the client app so callback
+        // to the ResponseHandler indicating that the message is consumed.
+        Channel topicSubscriberChannel = client.getSubscriber().getChannelForTopic(topicSubscriber);
+        HedwigClient.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
+                .messageConsumed(messageConsumeData.msg);
+    }
+
+    public void operationFailed(Object ctx, PubSubException exception) {
+        // Message has NOT been successfully consumed by the client app so
+        // callback to the ResponseHandler to try the async MessageHandler
+        // Consume logic again.
+        MessageConsumeData messageConsumeData = (MessageConsumeData) ctx;
+        TopicSubscriber topicSubscriber = new TopicSubscriber(messageConsumeData.topic, messageConsumeData.subscriberId);
+        logger.error("Message was not consumed successfully by client MessageHandler: " + messageConsumeData);
+
+        // Sleep a pre-configured amount of time (in milliseconds) before we
+        // do the retry. In the future, we can have more dynamic logic on
+        // what duration to sleep based on how many times we've retried, or
+        // perhaps what the last amount of time we slept was. We could stick
+        // some of this meta-data into the MessageConsumeData when we retry.
+        client.getClientTimer().schedule(new MessageConsumeRetryTask(messageConsumeData, topicSubscriber),
+                client.getConfiguration().getMessageConsumeRetryWaitTime());
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/PubSubCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/PubSubCallback.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/PubSubCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/PubSubCallback.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.handlers;
+
+import org.apache.log4j.Logger;
+
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * This class is used when we are doing synchronous type of operations. All
+ * underlying client ops in Hedwig are async so this is just a way to make the
+ * async calls synchronous.
+ * 
+ */
+public class PubSubCallback implements Callback<Void> {
+
+    private static Logger logger = Logger.getLogger(PubSubCallback.class);
+
+    // Private member variables
+    private PubSubData pubSubData;
+    // Boolean indicator to see if the sync PubSub call was successful or not.
+    private boolean isCallSuccessful;
+    // For sync callbacks, we'd like to know what the PubSubException is thrown
+    // on failure. This is so we can have a handle to the exception and rethrow
+    // it later.
+    private PubSubException failureException;
+
+    // Constructor
+    public PubSubCallback(PubSubData pubSubData) {
+        this.pubSubData = pubSubData;
+    }
+
+    public void operationFinished(Object ctx, Void resultOfOperation) {
+        if (logger.isDebugEnabled())
+            logger.debug("PubSub call succeeded for pubSubData: " + pubSubData);
+        // Wake up the main sync PubSub thread that is waiting for us to
+        // complete.
+        synchronized (pubSubData) {
+            isCallSuccessful = true;
+            pubSubData.isDone = true;
+            pubSubData.notify();
+        }
+    }
+
+    public void operationFailed(Object ctx, PubSubException exception) {
+        if (logger.isDebugEnabled())
+            logger.debug("PubSub call failed with exception: " + exception + ", pubSubData: " + pubSubData);
+        // Wake up the main sync PubSub thread that is waiting for us to
+        // complete.
+        synchronized (pubSubData) {
+            isCallSuccessful = false;
+            failureException = exception;
+            pubSubData.isDone = true;
+            pubSubData.notify();
+        }
+    }
+
+    // Public getter to determine if the PubSub callback is successful or not
+    // based on the PubSub ack response from the server.
+    public boolean getIsCallSuccessful() {
+        return isCallSuccessful;
+    }
+
+    // Public getter to retrieve what the PubSubException was that occurred when
+    // the operation failed.
+    public PubSubException getFailureException() {
+        return failureException;
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.handlers;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.ResponseHandler;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+
+public class PublishResponseHandler {
+
+    private static Logger logger = Logger.getLogger(PublishResponseHandler.class);
+
+    private final ResponseHandler responseHandler;
+
+    public PublishResponseHandler(ResponseHandler responseHandler) {
+        this.responseHandler = responseHandler;
+    }
+
+    // Main method to handle Publish Response messages from the server.
+    public void handlePublishResponse(PubSubResponse response, PubSubData pubSubData, Channel channel) throws Exception {
+        if (logger.isDebugEnabled())
+            logger.debug("Handling a Publish response: " + response + ", pubSubData: " + pubSubData + ", host: "
+                    + HedwigClient.getHostFromChannel(channel));
+        switch (response.getStatusCode()) {
+        case SUCCESS:
+            // Response was success so invoke the callback's operationFinished
+            // method.
+            pubSubData.callback.operationFinished(pubSubData.context, null);
+            break;
+        case SERVICE_DOWN:
+            // Response was service down failure so just invoke the callback's
+            // operationFailed method.
+            pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
+                    "Server responded with a SERVICE_DOWN status"));
+            break;
+        case NOT_RESPONSIBLE_FOR_TOPIC:
+            // Redirect response so we'll need to repost the original Publish
+            // Request
+            responseHandler.handleRedirectResponse(response, pubSubData, channel);
+            break;
+        default:
+            // Consider all other status codes as errors, operation failed
+            // cases.
+            logger.error("Unexpected error response from server for PubSubResponse: " + response);
+            pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
+                    "Server responded with a status code of: " + response.getStatusCode()));
+            break;
+        }
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.handlers;
+
+import java.util.TimerTask;
+
+import org.apache.log4j.Logger;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.HedwigSubscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * This class is used when a Subscribe channel gets disconnected and we attempt
+ * to re-establish the connection. Once the connection to the server host for
+ * the topic is completed, we need to restart delivery for that topic if that
+ * was the case before the original channel got disconnected. This async
+ * callback will be the hook for this.
+ * 
+ */
+public class SubscribeReconnectCallback implements Callback<Void> {
+
+    private static Logger logger = Logger.getLogger(SubscribeReconnectCallback.class);
+
+    // Private member variables
+    private final PubSubData origSubData;
+    private final HedwigClient client;
+    private final HedwigSubscriber sub;
+    private final ClientConfiguration cfg;
+    private final MessageHandler messageHandler;
+
+    // Constructor
+    public SubscribeReconnectCallback(PubSubData origSubData, HedwigClient client, MessageHandler messageHandler) {
+        this.origSubData = origSubData;
+        this.client = client;
+        this.sub = client.getSubscriber();
+        this.cfg = client.getConfiguration();
+        this.messageHandler = messageHandler;
+    }
+
+    class SubscribeReconnectRetryTask extends TimerTask {
+        @Override
+        public void run() {
+            if (logger.isDebugEnabled())
+                logger.debug("Retrying subscribe reconnect request for origSubData: " + origSubData);
+            // Clear out all of the servers we've contacted or attempted to from
+            // this request.
+            origSubData.clearServersList();
+            client.doConnect(origSubData, cfg.getDefaultServerHost());
+        }
+    }
+
+    public void operationFinished(Object ctx, Void resultOfOperation) {
+        if (logger.isDebugEnabled())
+            logger.debug("Subscribe reconnect succeeded for origSubData: " + origSubData);
+        // Now we want to restart delivery for the subscription channel only
+        // if delivery was started at the time the original subscribe channel
+        // was disconnected.
+        if (messageHandler != null) {
+            try {
+                sub.startDelivery(origSubData.topic, origSubData.subscriberId, messageHandler);
+            } catch (ClientNotSubscribedException e) {
+                // This exception should never be thrown here but just in case,
+                // log an error and just keep retrying the subscribe request.
+                logger.error("Subscribe was successful but error starting delivery for topic: "
+                        + origSubData.topic.toStringUtf8() + ", subscriberId: "
+                        + origSubData.subscriberId.toStringUtf8(), e);
+                retrySubscribeRequest();
+            }
+        }
+    }
+
+    public void operationFailed(Object ctx, PubSubException exception) {
+        // If the subscribe reconnect fails, just keep retrying the subscribe
+        // request. There isn't a way to flag to the application layer that
+        // a topic subscription has failed. So instead, we'll just keep
+        // retrying in the background until success.
+        logger.error("Subscribe reconnect failed with error: " + exception.getMessage());
+        retrySubscribeRequest();
+    }
+
+    private void retrySubscribeRequest() {
+        // If the client has stopped, there is no need to proceed with any
+        // callback logic here.
+        if (client.hasStopped())
+            return;
+
+        // Retry the subscribe request but only after waiting for a
+        // preconfigured amount of time.
+        client.getClientTimer().schedule(new SubscribeReconnectRetryTask(),
+                client.getConfiguration().getSubscribeReconnectRetryWaitTime());
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.handlers;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.data.MessageConsumeData;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.ResponseHandler;
+import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+
+public class SubscribeResponseHandler {
+
+    private static Logger logger = Logger.getLogger(SubscribeResponseHandler.class);
+
+    private final ResponseHandler responseHandler;
+
+    // Member variables used when this ResponseHandler is for a Subscribe
+    // channel. We need to be able to consume messages sent back to us from
+    // the server, and to also recreate the Channel connection if it ever goes
+    // down. For that, we need to store the original PubSubData for the
+    // subscribe request, and also the MessageHandler that was registered when
+    // delivery of messages started for the subscription.
+    private PubSubData origSubData;
+    private Channel subscribeChannel;
+    private MessageHandler messageHandler;
+    // Counter for the number of consumed messages so far to buffer up before we
+    // send the Consume message back to the server along with the last/largest
+    // message seq ID seen so far in that batch.
+    private int numConsumedMessagesInBuffer = 0;
+    private MessageSeqId lastMessageSeqId;
+    // Queue used for subscribes when the MessageHandler hasn't been registered
+    // yet but we've already received subscription messages from the server.
+    // This will be lazily created as needed.
+    private Queue<Message> subscribeMsgQueue;
+    // Set to store all of the outstanding subscribed messages that are pending
+    // to be consumed by the client app's MessageHandler. If this ever grows too
+    // big (e.g. problem at the client end for message consumption), we can
+    // throttle things by temporarily setting the Subscribe Netty Channel
+    // to not be readable. When the Set has shrunk sufficiently, we can turn the
+    // channel back on to read new messages.
+    private Set<Message> outstandingMsgSet;
+
+    public SubscribeResponseHandler(ResponseHandler responseHandler) {
+        this.responseHandler = responseHandler;
+    }
+
+    // Public getter to retrieve the original PubSubData used for the Subscribe
+    // request.
+    public PubSubData getOrigSubData() {
+        return origSubData;
+    }
+
+    // Main method to handle Subscribe responses from the server that we sent
+    // a Subscribe Request to.
+    public void handleSubscribeResponse(PubSubResponse response, PubSubData pubSubData, Channel channel)
+            throws Exception {
+        // If this was not a successful response to the Subscribe request, we
+        // won't be using the Netty Channel created so just close it.
+        if (!response.getStatusCode().equals(StatusCode.SUCCESS)) {
+            HedwigClient.getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
+            channel.close();
+        }
+
+        if (logger.isDebugEnabled())
+            logger.debug("Handling a Subscribe response: " + response + ", pubSubData: " + pubSubData + ", host: "
+                    + HedwigClient.getHostFromChannel(channel));
+        switch (response.getStatusCode()) {
+        case SUCCESS:
+            // For successful Subscribe requests, store this Channel locally
+            // and set it to not be readable initially.
+            // This way we won't be delivering messages for this topic
+            // subscription until the client explicitly says so.
+            subscribeChannel = channel;
+            subscribeChannel.setReadable(false);
+            // Store the original PubSubData used to create this successful
+            // Subscribe request.
+            origSubData = pubSubData;
+            // Store the mapping for the TopicSubscriber to the Channel.
+            // This is so we can control the starting and stopping of
+            // message deliveries from the server on that Channel. Store
+            // this only on a successful ack response from the server.
+            TopicSubscriber topicSubscriber = new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId);
+            responseHandler.getSubscriber().setChannelForTopic(topicSubscriber, channel);
+            // Lazily create the Set to keep track of outstanding Messages
+            // to be consumed by the client app. At this stage, delivery for
+            // that topic hasn't started yet so creation of this Set should
+            // be thread safe. We'll create the Set with an initial capacity
+            // equal to the configured parameter for the maximum number of
+            // outstanding messages to allow. The load factor will be set to
+            // 1.0f which means we'll only rehash and allocate more space if
+            // we ever exceed the initial capacity. That should be okay
+            // because when that happens, things are slow already and piling
+            // up on the client app side to consume messages.
+            outstandingMsgSet = new HashSet<Message>(
+                    responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f);
+            // Response was success so invoke the callback's operationFinished
+            // method.
+            pubSubData.callback.operationFinished(pubSubData.context, null);
+            break;
+        case CLIENT_ALREADY_SUBSCRIBED:
+            // For Subscribe requests, the server says that the client is
+            // already subscribed to it.
+            pubSubData.callback.operationFailed(pubSubData.context, new ClientAlreadySubscribedException(
+                    "Client is already subscribed for topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: "
+                            + pubSubData.subscriberId.toStringUtf8()));
+            break;
+        case SERVICE_DOWN:
+            // Response was service down failure so just invoke the callback's
+            // operationFailed method.
+            pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
+                    "Server responded with a SERVICE_DOWN status"));
+            break;
+        case NOT_RESPONSIBLE_FOR_TOPIC:
+            // Redirect response so we'll need to repost the original Subscribe
+            // Request
+            responseHandler.handleRedirectResponse(response, pubSubData, channel);
+            break;
+        default:
+            // Consider all other status codes as errors, operation failed
+            // cases.
+            logger.error("Unexpected error response from server for PubSubResponse: " + response);
+            pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
+                    "Server responded with a status code of: " + response.getStatusCode()));
+            break;
+        }
+    }
+
+    // Main method to handle consuming a message for a topic that the client is
+    // subscribed to.
+    public void handleSubscribeMessage(PubSubResponse response) {
+        if (logger.isDebugEnabled())
+            logger.debug("Handling a Subscribe message in response: " + response + ", topic: "
+                    + origSubData.topic.toStringUtf8() + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
+        Message message = response.getMessage();
+        // Consume the message asynchronously that the client is subscribed
+        // to. Do this only if delivery for the subscription has started and
+        // a MessageHandler has been registered for the TopicSubscriber.
+        if (messageHandler != null) {
+            asyncMessageConsume(message);
+        } else {
+            // MessageHandler has not yet been registered so queue up these
+            // messages for the Topic Subscription. Make the initial lazy
+            // creation of the message queue thread safe just so we don't
+            // run into a race condition where two simultaneous threads process
+            // a received message and both try to create a new instance of
+            // the message queue. Performance overhead should be okay
+            // because the delivery of the topic has not even started yet
+            // so these messages are not consumed and just buffered up here.
+            synchronized (this) {
+                if (subscribeMsgQueue == null)
+                    subscribeMsgQueue = new LinkedList<Message>();
+            }
+            if (logger.isDebugEnabled())
+                logger
+                        .debug("Message has arrived but Subscribe channel does not have a registered MessageHandler yet so queueing up the message: "
+                                + message);
+            subscribeMsgQueue.add(message);
+        }
+    }
+
+    /**
+     * Method called when a message arrives for a subscribe Channel and we want
+     * to consume it asynchronously via the registered MessageHandler (should
+     * not be null when called here).
+     * 
+     * @param message
+     *            Message from Subscribe Channel we want to consume.
+     */
+    protected void asyncMessageConsume(Message message) {
+        if (logger.isDebugEnabled())
+            logger.debug("Call the client app's MessageHandler asynchronously to consume the message: " + message
+                    + ", topic: " + origSubData.topic.toStringUtf8() + ", subscriberId: "
+                    + origSubData.subscriberId.toStringUtf8());
+        // Add this "pending to be consumed" message to the outstandingMsgSet.
+        outstandingMsgSet.add(message);
+        // Check if we've exceeded the max size for the outstanding message set.
+        if (outstandingMsgSet.size() >= responseHandler.getConfiguration().getMaximumOutstandingMessages()
+                && subscribeChannel.isReadable()) {
+            // Too many outstanding messages so throttle it by setting the Netty
+            // Channel to not be readable.
+            if (logger.isDebugEnabled())
+                logger.debug("Too many outstanding messages (" + outstandingMsgSet.size()
+                        + ") so throttling the subscribe netty Channel");
+            subscribeChannel.setReadable(false);
+        }
+        MessageConsumeData messageConsumeData = new MessageConsumeData(origSubData.topic, origSubData.subscriberId,
+                message);
+        messageHandler.consume(origSubData.topic, origSubData.subscriberId, message, responseHandler.getClient()
+                .getConsumeCallback(), messageConsumeData);
+    }
+
+    /**
+     * Method called when the client app's MessageHandler has asynchronously
+     * completed consuming a subscribed message sent from the server. The
+     * contract with the client app is that messages sent to the handler to be
+     * consumed will have the callback response done in the same order. So if we
+     * asynchronously call the MessageHandler to consume messages #1-5, that
+     * should call the messageConsumed method here via the VoidCallback in the
+     * same order. To make this thread safe, since multiple outstanding messages
+     * could be consumed by the client app and then called back to here, make
+     * this method synchronized.
+     * 
+     * @param message
+     *            Message sent from server for topic subscription that has been
+     *            consumed by the client.
+     */
+    protected synchronized void messageConsumed(Message message) {
+        if (logger.isDebugEnabled())
+            logger.debug("Message has been successfully consumed by the client app for message: " + message
+                    + ", topic: " + origSubData.topic.toStringUtf8() + ", subscriberId: "
+                    + origSubData.subscriberId.toStringUtf8());
+        // Update the consumed messages buffer variables
+        if (responseHandler.getConfiguration().isAutoSendConsumeMessageEnabled()) {
+            // Update these variables only if we are auto-sending consume
+            // messages to the server. Otherwise the onus is on the client app
+            // to call the Subscriber consume API to let the server know which
+            // messages it has successfully consumed.
+            numConsumedMessagesInBuffer++;
+            lastMessageSeqId = message.getMsgId();
+        }
+        // Remove this consumed message from the outstanding Message Set.
+        outstandingMsgSet.remove(message);
+
+        // For consume response to server, there is a config param on how many
+        // messages to consume and buffer up before sending the consume request.
+        // We just need to keep a count of the number of messages consumed
+        // and the largest/latest msg ID seen so far in this batch. Messages
+        // should be delivered in order and without gaps. Do this only if
+        // auto-sending of consume messages is enabled.
+        if (responseHandler.getConfiguration().isAutoSendConsumeMessageEnabled()
+                && numConsumedMessagesInBuffer >= responseHandler.getConfiguration().getConsumedMessagesBufferSize()) {
+            // Send the consume request and reset the consumed messages buffer
+            // variables. We will use the same Channel created from the
+            // subscribe request for the TopicSubscriber.
+            if (logger.isDebugEnabled())
+                logger
+                        .debug("Consumed message buffer limit reached so send the Consume Request to the server with lastMessageSeqId: "
+                                + lastMessageSeqId);
+            responseHandler.getSubscriber().doConsume(origSubData, subscribeChannel, lastMessageSeqId);
+            numConsumedMessagesInBuffer = 0;
+            lastMessageSeqId = null;
+        }
+
+        // Check if we throttled message consumption previously when the
+        // outstanding message limit was reached. For now, only turn the
+        // delivery back on if there are no more outstanding messages to
+        // consume. We could make this a configurable parameter if needed.
+        if (!subscribeChannel.isReadable() && outstandingMsgSet.size() == 0) {
+            if (logger.isDebugEnabled())
+                logger
+                        .debug("Message consumption has caught up so okay to turn off throttling of messages on the subscribe channel for topic: "
+                                + origSubData.topic.toStringUtf8()
+                                + ", subscriberId: "
+                                + origSubData.subscriberId.toStringUtf8());
+            subscribeChannel.setReadable(true);
+        }
+    }
+
+    /**
+     * Setter used for Subscribe flows when delivery for the subscription is
+     * started. This is used to register the MessageHandler needed to consumer
+     * the subscribed messages for the topic.
+     * 
+     * @param messageHandler
+     *            MessageHandler to register for this ResponseHandler instance.
+     */
+    public void setMessageHandler(MessageHandler messageHandler) {
+        if (logger.isDebugEnabled())
+            logger.debug("Setting the messageHandler for topic: " + origSubData.topic.toStringUtf8()
+                    + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
+        this.messageHandler = messageHandler;
+        // Once the MessageHandler is registered, see if we have any queued up
+        // subscription messages sent to us already from the server. If so,
+        // consume those first. Do this only if the MessageHandler registered is
+        // not null (since that would be the HedwigSubscriber.stopDelivery
+        // call).
+        if (messageHandler != null && subscribeMsgQueue != null && subscribeMsgQueue.size() > 0) {
+            if (logger.isDebugEnabled())
+                logger.debug("Consuming " + subscribeMsgQueue.size() + " queued up messages for topic: "
+                        + origSubData.topic.toStringUtf8() + ", subscriberId: "
+                        + origSubData.subscriberId.toStringUtf8());
+            for (Message message : subscribeMsgQueue) {
+                asyncMessageConsume(message);
+            }
+            // Now we can remove the queued up messages since they are all
+            // consumed.
+            subscribeMsgQueue.clear();
+        }
+    }
+
+    /**
+     * Getter for the MessageHandler that is set for this subscribe channel.
+     * 
+     * @return The MessageHandler for consuming messages
+     */
+    public MessageHandler getMessageHandler() {
+        return messageHandler;
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.handlers;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.ResponseHandler;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+
+public class UnsubscribeResponseHandler {
+
+    private static Logger logger = Logger.getLogger(UnsubscribeResponseHandler.class);
+
+    private final ResponseHandler responseHandler;
+
+    public UnsubscribeResponseHandler(ResponseHandler responseHandler) {
+        this.responseHandler = responseHandler;
+    }
+
+    // Main method to handle Unsubscribe Response messages from the server.
+    public void handleUnsubscribeResponse(PubSubResponse response, PubSubData pubSubData, Channel channel)
+            throws Exception {
+        if (logger.isDebugEnabled())
+            logger.debug("Handling an Unsubscribe response: " + response + ", pubSubData: " + pubSubData + ", host: "
+                    + HedwigClient.getHostFromChannel(channel));
+        switch (response.getStatusCode()) {
+        case SUCCESS:
+            // For successful Unsubscribe requests, we can now safely close the
+            // Subscribe Channel and any cached data for that TopicSubscriber.
+            responseHandler.getSubscriber().closeSubscription(pubSubData.topic, pubSubData.subscriberId);
+            // Response was success so invoke the callback's operationFinished
+            // method.
+            pubSubData.callback.operationFinished(pubSubData.context, null);
+            break;
+        case CLIENT_NOT_SUBSCRIBED:
+            // For Unsubscribe requests, the server says that the client was
+            // never subscribed to the topic.
+            pubSubData.callback.operationFailed(pubSubData.context, new ClientNotSubscribedException(
+                    "Client was never subscribed to topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: "
+                            + pubSubData.subscriberId.toStringUtf8()));
+            break;
+        case SERVICE_DOWN:
+            // Response was service down failure so just invoke the callback's
+            // operationFailed method.
+            pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
+                    "Server responded with a SERVICE_DOWN status"));
+            break;
+        case NOT_RESPONSIBLE_FOR_TOPIC:
+            // Redirect response so we'll need to repost the original
+            // Unsubscribe Request
+            responseHandler.handleRedirectResponse(response, pubSubData, channel);
+            break;
+        default:
+            // Consider all other status codes as errors, operation failed
+            // cases.
+            logger.error("Unexpected error response from server for PubSubResponse: " + response);
+            pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
+                    "Server responded with a status code of: " + response.getStatusCode()));
+            break;
+        }
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
+import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+import org.apache.hedwig.protocol.PubSubProtocol;
+
+public class ClientChannelPipelineFactory implements ChannelPipelineFactory {
+
+    private HedwigClient client;
+
+    public ClientChannelPipelineFactory(HedwigClient client) {
+        this.client = client;
+    }
+
+    // Retrieve a ChannelPipeline from the factory.
+    public ChannelPipeline getPipeline() throws Exception {
+        // Create a new ChannelPipline using the factory method from the
+        // Channels helper class.
+        ChannelPipeline pipeline = Channels.pipeline();        
+        if (client.getSslFactory() != null) {
+            pipeline.addLast("ssl", new SslHandler(client.getSslFactory().getEngine()));
+        }        
+        pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(client.getConfiguration()
+                .getMaximumMessageSize(), 0, 4, 0, 4));
+        pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+
+        pipeline.addLast("protobufdecoder", new ProtobufDecoder(PubSubProtocol.PubSubResponse.getDefaultInstance()));
+        pipeline.addLast("protobufencoder", new ProtobufEncoder());
+
+        pipeline.addLast("responsehandler", new ResponseHandler(client));
+        return pipeline;
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public class ConnectCallback implements ChannelFutureListener {
+
+    private static Logger logger = Logger.getLogger(ConnectCallback.class);
+
+    // Private member variables
+    private PubSubData pubSubData;
+    private InetSocketAddress host;
+    private final HedwigClient client;
+    private final HedwigPublisher pub;
+    private final HedwigSubscriber sub;
+    private final ClientConfiguration cfg;
+
+    // Constructor
+    public ConnectCallback(PubSubData pubSubData, InetSocketAddress host, HedwigClient client) {
+        super();
+        this.pubSubData = pubSubData;
+        this.host = host;
+        this.client = client;
+        this.pub = client.getPublisher();
+        this.sub = client.getSubscriber();
+        this.cfg = client.getConfiguration();
+    }
+
+    public void operationComplete(ChannelFuture future) throws Exception {
+        // If the client has stopped, there is no need to proceed with any
+        // callback logic here.
+        if (client.hasStopped())
+            return;
+
+        // Check if the connection to the server was done successfully.
+        if (!future.isSuccess()) {
+            logger.error("Error connecting to host: " + host);
+            // If we were not able to connect to the host, it could be down.
+            ByteString hostString = ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(host));
+            if (pubSubData.connectFailedServers != null && pubSubData.connectFailedServers.contains(hostString)) {
+                // We've already tried to connect to this host before so just
+                // invoke the operationFailed callback.
+                logger.error("Error connecting to host more than once so just invoke the operationFailed callback!");
+                pubSubData.callback.operationFailed(pubSubData.context, new CouldNotConnectException(
+                        "Could not connect to host: " + host));
+            } else {
+                if (logger.isDebugEnabled())
+                    logger.debug("Try to connect to server: " + host + " again for pubSubData: " + pubSubData);
+                // Keep track of this current server that we failed to connect
+                // to but retry the request on the default server host/VIP.
+                // The topic2Host mapping might need to be updated.
+                if (pubSubData.connectFailedServers == null)
+                    pubSubData.connectFailedServers = new LinkedList<ByteString>();
+                pubSubData.connectFailedServers.add(hostString);
+                client.doConnect(pubSubData, cfg.getDefaultServerHost());
+            }
+            // Finished with failure logic so just return.
+            return;
+        }
+
+        // Now that we have connected successfully to the server, see what type
+        // of PubSub request this was.
+        if (logger.isDebugEnabled())
+            logger.debug("Connection to host: " + host + " was successful for pubSubData: " + pubSubData);
+        if (pubSubData.operationType.equals(OperationType.PUBLISH)) {
+            // Publish Request so store this Channel connection in the
+            // HedwigPublisher Map (if it doesn't exist yet) and then
+            // do the publish on the cached channel mapped to the host.
+            // Note that due to race concurrency situations, it is
+            // possible that the cached channel is not the same one
+            // as the channel established here. If that is the case,
+            // this channel will be closed but we'll always publish on the
+            // cached channel in the HedwigPublisher.host2Channel map.
+            pub.storeHost2ChannelMapping(future.getChannel());
+            pub.doPublish(pubSubData, pub.host2Channel.get(HedwigClient.getHostFromChannel(future.getChannel())));
+        } else if (pubSubData.operationType.equals(OperationType.UNSUBSCRIBE)) {
+            // Unsubscribe Request so store this Channel connection in the
+            // HedwigPublisher Map (if it doesn't exist yet) and then do the
+            // unsubscribe. Unsubscribe requests will share and reuse
+            // the netty Channel connections that Publish requests use.
+            pub.storeHost2ChannelMapping(future.getChannel());
+            sub.doSubUnsub(pubSubData, pub.host2Channel.get(HedwigClient.getHostFromChannel(future.getChannel())));
+        } else {
+            // Subscribe Request. We do not store the Channel connection yet for
+            // Subscribes here. This will be done only when we've found the
+            // right server topic master. That is only determined when we
+            // receive a successful server ack response to the Subscribe
+            // request (handled in ResponseHandler). There is no need to store
+            // the Unsubscribe channel connection as we won't use it again.
+            sub.doSubUnsub(pubSubData, future.getChannel());
+        }
+    }
+
+}