You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2011/09/05 19:39:03 UTC

svn commit: r1165369 [5/9] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/clie...

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java Mon Sep  5 17:38:57 2011
@@ -36,7 +36,7 @@ import org.apache.hedwig.util.Callback;
  * the topic is completed, we need to restart delivery for that topic if that
  * was the case before the original channel got disconnected. This async
  * callback will be the hook for this.
- * 
+ *
  */
 public class SubscribeReconnectCallback implements Callback<Void> {
 
@@ -83,8 +83,8 @@ public class SubscribeReconnectCallback 
                 // This exception should never be thrown here but just in case,
                 // log an error and just keep retrying the subscribe request.
                 logger.error("Subscribe was successful but error starting delivery for topic: "
-                        + origSubData.topic.toStringUtf8() + ", subscriberId: "
-                        + origSubData.subscriberId.toStringUtf8(), e);
+                             + origSubData.topic.toStringUtf8() + ", subscriberId: "
+                             + origSubData.subscriberId.toStringUtf8(), e);
                 retrySubscribeRequest();
             }
         }
@@ -108,6 +108,6 @@ public class SubscribeReconnectCallback 
         // Retry the subscribe request but only after waiting for a
         // preconfigured amount of time.
         client.getClientTimer().schedule(new SubscribeReconnectRetryTask(),
-                client.getConfiguration().getSubscribeReconnectRetryWaitTime());
+                                         client.getConfiguration().getSubscribeReconnectRetryWaitTime());
     }
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Mon Sep  5 17:38:57 2011
@@ -93,7 +93,7 @@ public class SubscribeResponseHandler {
 
         if (logger.isDebugEnabled())
             logger.debug("Handling a Subscribe response: " + response + ", pubSubData: " + pubSubData + ", host: "
-                    + HedwigClient.getHostFromChannel(channel));
+                         + HedwigClient.getHostFromChannel(channel));
         switch (response.getStatusCode()) {
         case SUCCESS:
             // For successful Subscribe requests, store this Channel locally
@@ -122,7 +122,7 @@ public class SubscribeResponseHandler {
             // because when that happens, things are slow already and piling
             // up on the client app side to consume messages.
             outstandingMsgSet = new HashSet<Message>(
-                    responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f);
+                responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f);
             // Response was success so invoke the callback's operationFinished
             // method.
             pubSubData.callback.operationFinished(pubSubData.context, null);
@@ -131,14 +131,14 @@ public class SubscribeResponseHandler {
             // For Subscribe requests, the server says that the client is
             // already subscribed to it.
             pubSubData.callback.operationFailed(pubSubData.context, new ClientAlreadySubscribedException(
-                    "Client is already subscribed for topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: "
-                            + pubSubData.subscriberId.toStringUtf8()));
+                                                    "Client is already subscribed for topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: "
+                                                    + pubSubData.subscriberId.toStringUtf8()));
             break;
         case SERVICE_DOWN:
             // Response was service down failure so just invoke the callback's
             // operationFailed method.
             pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
-                    "Server responded with a SERVICE_DOWN status"));
+                                                    "Server responded with a SERVICE_DOWN status"));
             break;
         case NOT_RESPONSIBLE_FOR_TOPIC:
             // Redirect response so we'll need to repost the original Subscribe
@@ -150,7 +150,7 @@ public class SubscribeResponseHandler {
             // cases.
             logger.error("Unexpected error response from server for PubSubResponse: " + response);
             pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
-                    "Server responded with a status code of: " + response.getStatusCode()));
+                                                    "Server responded with a status code of: " + response.getStatusCode()));
             break;
         }
     }
@@ -160,7 +160,7 @@ public class SubscribeResponseHandler {
     public void handleSubscribeMessage(PubSubResponse response) {
         if (logger.isDebugEnabled())
             logger.debug("Handling a Subscribe message in response: " + response + ", topic: "
-                    + origSubData.topic.toStringUtf8() + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
+                         + origSubData.topic.toStringUtf8() + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
         Message message = response.getMessage();
 
         synchronized (this) {
@@ -182,8 +182,8 @@ public class SubscribeResponseHandler {
                     subscribeMsgQueue = new LinkedList<Message>();
                 if (logger.isDebugEnabled())
                     logger
-                        .debug("Message has arrived but Subscribe channel does not have a registered MessageHandler yet so queueing up the message: "
-                                + message);
+                    .debug("Message has arrived but Subscribe channel does not have a registered MessageHandler yet so queueing up the message: "
+                           + message);
                 subscribeMsgQueue.add(message);
             }
         }
@@ -193,15 +193,15 @@ public class SubscribeResponseHandler {
      * Method called when a message arrives for a subscribe Channel and we want
      * to consume it asynchronously via the registered MessageHandler (should
      * not be null when called here).
-     * 
+     *
      * @param message
      *            Message from Subscribe Channel we want to consume.
      */
     protected void asyncMessageConsume(Message message) {
         if (logger.isDebugEnabled())
             logger.debug("Call the client app's MessageHandler asynchronously to consume the message: " + message
-                    + ", topic: " + origSubData.topic.toStringUtf8() + ", subscriberId: "
-                    + origSubData.subscriberId.toStringUtf8());
+                         + ", topic: " + origSubData.topic.toStringUtf8() + ", subscriberId: "
+                         + origSubData.subscriberId.toStringUtf8());
         // Add this "pending to be consumed" message to the outstandingMsgSet.
         outstandingMsgSet.add(message);
         // Check if we've exceeded the max size for the outstanding message set.
@@ -211,13 +211,13 @@ public class SubscribeResponseHandler {
             // Channel to not be readable.
             if (logger.isDebugEnabled())
                 logger.debug("Too many outstanding messages (" + outstandingMsgSet.size()
-                        + ") so throttling the subscribe netty Channel");
+                             + ") so throttling the subscribe netty Channel");
             subscribeChannel.setReadable(false);
         }
         MessageConsumeData messageConsumeData = new MessageConsumeData(origSubData.topic, origSubData.subscriberId,
                 message);
         messageHandler.consume(origSubData.topic, origSubData.subscriberId, message, responseHandler.getClient()
-                .getConsumeCallback(), messageConsumeData);
+                               .getConsumeCallback(), messageConsumeData);
     }
 
     /**
@@ -230,7 +230,7 @@ public class SubscribeResponseHandler {
      * same order. To make this thread safe, since multiple outstanding messages
      * could be consumed by the client app and then called back to here, make
      * this method synchronized.
-     * 
+     *
      * @param message
      *            Message sent from server for topic subscription that has been
      *            consumed by the client.
@@ -238,8 +238,8 @@ public class SubscribeResponseHandler {
     protected synchronized void messageConsumed(Message message) {
         if (logger.isDebugEnabled())
             logger.debug("Message has been successfully consumed by the client app for message: " + message
-                    + ", topic: " + origSubData.topic.toStringUtf8() + ", subscriberId: "
-                    + origSubData.subscriberId.toStringUtf8());
+                         + ", topic: " + origSubData.topic.toStringUtf8() + ", subscriberId: "
+                         + origSubData.subscriberId.toStringUtf8());
         // Update the consumed messages buffer variables
         if (responseHandler.getConfiguration().isAutoSendConsumeMessageEnabled()) {
             // Update these variables only if we are auto-sending consume
@@ -265,8 +265,8 @@ public class SubscribeResponseHandler {
             // subscribe request for the TopicSubscriber.
             if (logger.isDebugEnabled())
                 logger
-                        .debug("Consumed message buffer limit reached so send the Consume Request to the server with lastMessageSeqId: "
-                                + lastMessageSeqId);
+                .debug("Consumed message buffer limit reached so send the Consume Request to the server with lastMessageSeqId: "
+                       + lastMessageSeqId);
             responseHandler.getSubscriber().doConsume(origSubData, subscribeChannel, lastMessageSeqId);
             numConsumedMessagesInBuffer = 0;
             lastMessageSeqId = null;
@@ -279,10 +279,10 @@ public class SubscribeResponseHandler {
         if (!subscribeChannel.isReadable() && outstandingMsgSet.size() == 0) {
             if (logger.isDebugEnabled())
                 logger
-                        .debug("Message consumption has caught up so okay to turn off throttling of messages on the subscribe channel for topic: "
-                                + origSubData.topic.toStringUtf8()
-                                + ", subscriberId: "
-                                + origSubData.subscriberId.toStringUtf8());
+                .debug("Message consumption has caught up so okay to turn off throttling of messages on the subscribe channel for topic: "
+                       + origSubData.topic.toStringUtf8()
+                       + ", subscriberId: "
+                       + origSubData.subscriberId.toStringUtf8());
             subscribeChannel.setReadable(true);
         }
     }
@@ -291,14 +291,14 @@ public class SubscribeResponseHandler {
      * Setter used for Subscribe flows when delivery for the subscription is
      * started. This is used to register the MessageHandler needed to consumer
      * the subscribed messages for the topic.
-     * 
+     *
      * @param messageHandler
      *            MessageHandler to register for this ResponseHandler instance.
      */
     public void setMessageHandler(MessageHandler messageHandler) {
         if (logger.isDebugEnabled())
             logger.debug("Setting the messageHandler for topic: " + origSubData.topic.toStringUtf8()
-                    + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
+                         + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
         synchronized (this) {
             this.messageHandler = messageHandler;
             // Once the MessageHandler is registered, see if we have any queued up
@@ -309,8 +309,8 @@ public class SubscribeResponseHandler {
             if (messageHandler != null && subscribeMsgQueue != null && subscribeMsgQueue.size() > 0) {
                 if (logger.isDebugEnabled())
                     logger.debug("Consuming " + subscribeMsgQueue.size() + " queued up messages for topic: "
-                            + origSubData.topic.toStringUtf8() + ", subscriberId: "
-                            + origSubData.subscriberId.toStringUtf8());
+                                 + origSubData.topic.toStringUtf8() + ", subscriberId: "
+                                 + origSubData.subscriberId.toStringUtf8());
                 for (Message message : subscribeMsgQueue) {
                     asyncMessageConsume(message);
                 }
@@ -323,7 +323,7 @@ public class SubscribeResponseHandler {
 
     /**
      * Getter for the MessageHandler that is set for this subscribe channel.
-     * 
+     *
      * @return The MessageHandler for consuming messages
      */
     public MessageHandler getMessageHandler() {

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java Mon Sep  5 17:38:57 2011
@@ -42,7 +42,7 @@ public class UnsubscribeResponseHandler 
             throws Exception {
         if (logger.isDebugEnabled())
             logger.debug("Handling an Unsubscribe response: " + response + ", pubSubData: " + pubSubData + ", host: "
-                    + HedwigClient.getHostFromChannel(channel));
+                         + HedwigClient.getHostFromChannel(channel));
         switch (response.getStatusCode()) {
         case SUCCESS:
             // For successful Unsubscribe requests, we can now safely close the
@@ -56,14 +56,14 @@ public class UnsubscribeResponseHandler 
             // For Unsubscribe requests, the server says that the client was
             // never subscribed to the topic.
             pubSubData.callback.operationFailed(pubSubData.context, new ClientNotSubscribedException(
-                    "Client was never subscribed to topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: "
-                            + pubSubData.subscriberId.toStringUtf8()));
+                                                    "Client was never subscribed to topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: "
+                                                    + pubSubData.subscriberId.toStringUtf8()));
             break;
         case SERVICE_DOWN:
             // Response was service down failure so just invoke the callback's
             // operationFailed method.
             pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
-                    "Server responded with a SERVICE_DOWN status"));
+                                                    "Server responded with a SERVICE_DOWN status"));
             break;
         case NOT_RESPONSIBLE_FOR_TOPIC:
             // Redirect response so we'll need to repost the original
@@ -75,7 +75,7 @@ public class UnsubscribeResponseHandler 
             // cases.
             logger.error("Unexpected error response from server for PubSubResponse: " + response);
             pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
-                    "Server responded with a status code of: " + response.getStatusCode()));
+                                                    "Server responded with a status code of: " + response.getStatusCode()));
             break;
         }
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java Mon Sep  5 17:38:57 2011
@@ -40,12 +40,12 @@ public class ClientChannelPipelineFactor
     public ChannelPipeline getPipeline() throws Exception {
         // Create a new ChannelPipline using the factory method from the
         // Channels helper class.
-        ChannelPipeline pipeline = Channels.pipeline();        
+        ChannelPipeline pipeline = Channels.pipeline();
         if (client.getSslFactory() != null) {
             pipeline.addLast("ssl", new SslHandler(client.getSslFactory().getEngine()));
-        }        
+        }
         pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(client.getConfiguration()
-                .getMaximumMessageSize(), 0, 4, 0, 4));
+                         .getMaximumMessageSize(), 0, 4, 0, 4));
         pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
 
         pipeline.addLast("protobufdecoder", new ProtobufDecoder(PubSubProtocol.PubSubResponse.getDefaultInstance()));

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java Mon Sep  5 17:38:57 2011
@@ -70,7 +70,7 @@ public class ConnectCallback implements 
                 // invoke the operationFailed callback.
                 logger.error("Error connecting to host more than once so just invoke the operationFailed callback!");
                 pubSubData.callback.operationFailed(pubSubData.context, new CouldNotConnectException(
-                        "Could not connect to host: " + host));
+                                                        "Could not connect to host: " + host));
             } else {
                 if (logger.isDebugEnabled())
                     logger.debug("Try to connect to server: " + host + " again for pubSubData: " + pubSubData);

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java Mon Sep  5 17:38:57 2011
@@ -44,7 +44,7 @@ import org.apache.hedwig.exceptions.PubS
 /**
  * This is a top level Hedwig Client class that encapsulates the common
  * functionality needed for both Publish and Subscribe operations.
- * 
+ *
  */
 public class HedwigClient {
 
@@ -182,7 +182,7 @@ public class HedwigClient {
         }
 
         private void checkPubSubDataToTimeOut(PubSubData pubSubData, ResponseHandler responseHandler, long curTime,
-                long timeoutInterval) {
+                                              long timeoutInterval) {
             if (curTime > pubSubData.requestWriteTime + timeoutInterval) {
                 // Current PubSubRequest has timed out so remove it from the
                 // ResponseHandler's map and invoke the VoidCallback's
@@ -190,7 +190,7 @@ public class HedwigClient {
                 logger.error("Current PubSubRequest has timed out for pubSubData: " + pubSubData);
                 responseHandler.txn2PubSubData.remove(pubSubData.txnId);
                 pubSubData.callback.operationFailed(pubSubData.context, new UncertainStateException(
-                        "Server ack response never received so PubSubRequest has timed out!"));
+                                                        "Server ack response never received so PubSubRequest has timed out!"));
             }
         }
     }
@@ -236,7 +236,7 @@ public class HedwigClient {
      * to the one that is responsible for the topic. Once the connect to the
      * server is done, we will perform the corresponding PubSub write on that
      * channel.
-     * 
+     *
      * @param pubSubData
      *            PubSub call's data wrapper object
      * @param serverHost
@@ -261,7 +261,7 @@ public class HedwigClient {
      * Helper method to store the topic2Host mapping in the HedwigClient cache
      * map. This method is assumed to be called when we've done a successful
      * connection to the correct server topic master.
-     * 
+     *
      * @param pubSubData
      *            PubSub wrapper data
      * @param channel
@@ -282,7 +282,7 @@ public class HedwigClient {
         // Store the relevant mappings for this topic and host combination.
         if (logger.isDebugEnabled())
             logger.debug("Storing info for topic: " + pubSubData.topic.toStringUtf8() + ", old host: "
-                    + topic2Host.get(pubSubData.topic) + ", new host: " + host);
+                         + topic2Host.get(pubSubData.topic) + ", new host: " + host);
         topic2Host.put(pubSubData.topic, host);
         if (host2Topics.containsKey(host)) {
             host2Topics.get(host).add(pubSubData.topic);
@@ -297,7 +297,7 @@ public class HedwigClient {
      * Helper static method to get the String Hostname:Port from a netty
      * Channel. Assumption is that the netty Channel was originally created with
      * an InetSocketAddress. This is true with the Hedwig netty implementation.
-     * 
+     *
      * @param channel
      *            Netty channel to extract the hostname and port from.
      * @return String representation of the Hostname:Port from the Netty Channel
@@ -310,7 +310,7 @@ public class HedwigClient {
      * Helper static method to get the ResponseHandler instance from a Channel
      * via the ChannelPipeline it is associated with. The assumption is that the
      * last ChannelHandler tied to the ChannelPipeline is the ResponseHandler.
-     * 
+     *
      * @param channel
      *            Channel we are retrieving the ResponseHandler instance for
      * @return ResponseHandler Instance tied to the Channel's Pipeline

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java Mon Sep  5 17:38:57 2011
@@ -42,7 +42,7 @@ import org.apache.hedwig.util.Callback;
 
 /**
  * This is the Hedwig Netty specific implementation of the Publisher interface.
- * 
+ *
  */
 public class HedwigPublisher implements Publisher {
 
@@ -143,7 +143,7 @@ public class HedwigPublisher implements 
     /**
      * This is a helper method to write the actual publish message once the
      * client is connected to the server and a Channel is available.
-     * 
+     *
      * @param pubSubData
      *            Publish call's data wrapper object
      * @param channel
@@ -183,7 +183,7 @@ public class HedwigPublisher implements 
         // Finally, write the Publish request through the Channel.
         if (logger.isDebugEnabled())
             logger.debug("Writing a Publish request to host: " + HedwigClient.getHostFromChannel(channel)
-                    + " for pubSubData: " + pubSubData);
+                         + " for pubSubData: " + pubSubData);
         ChannelFuture future = channel.write(pubsubRequestBuilder.build());
         future.addListener(new WriteCallback(pubSubData, client));
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Mon Sep  5 17:38:57 2011
@@ -53,7 +53,7 @@ import org.apache.hedwig.util.Callback;
 
 /**
  * This is the Hedwig Netty specific implementation of the Subscriber interface.
- * 
+ *
  */
 public class HedwigSubscriber implements Subscriber {
 
@@ -79,12 +79,12 @@ public class HedwigSubscriber implements
     // two flows are very similar. The assumption is that the input
     // OperationType is either SUBSCRIBE or UNSUBSCRIBE.
     private void subUnsub(ByteString topic, ByteString subscriberId, OperationType operationType,
-            CreateOrAttach createOrAttach) throws CouldNotConnectException, ClientAlreadySubscribedException,
-            ClientNotSubscribedException, ServiceDownException {
+                          CreateOrAttach createOrAttach) throws CouldNotConnectException, ClientAlreadySubscribedException,
+        ClientNotSubscribedException, ServiceDownException {
         if (logger.isDebugEnabled())
             logger.debug("Calling a sync subUnsub request for topic: " + topic.toStringUtf8() + ", subscriberId: "
-                    + subscriberId.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: "
-                    + createOrAttach);
+                         + subscriberId.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: "
+                         + createOrAttach);
         PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, createOrAttach, null, null);
         synchronized (pubSubData) {
             PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
@@ -132,15 +132,15 @@ public class HedwigSubscriber implements
     // flows are very similar. The assumption is that the input OperationType is
     // either SUBSCRIBE or UNSUBSCRIBE.
     private void asyncSubUnsub(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object context,
-            OperationType operationType, CreateOrAttach createOrAttach) {
+                               OperationType operationType, CreateOrAttach createOrAttach) {
         if (logger.isDebugEnabled())
             logger.debug("Calling an async subUnsub request for topic: " + topic.toStringUtf8() + ", subscriberId: "
-                    + subscriberId.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: "
-                    + createOrAttach);
+                         + subscriberId.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: "
+                         + createOrAttach);
         // Check if we know which server host is the master for the topic we are
         // subscribing to.
         PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, createOrAttach, callback,
-                context);
+                                               context);
         if (client.topic2Host.containsKey(topic)) {
             InetSocketAddress host = client.topic2Host.get(topic);
             if (operationType.equals(OperationType.UNSUBSCRIBE) && client.getPublisher().host2Channel.containsKey(host)) {
@@ -173,18 +173,18 @@ public class HedwigSubscriber implements
 
     public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
             throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
-            InvalidSubscriberIdException {
+        InvalidSubscriberIdException {
         subscribe(topic, subscriberId, mode, false);
     }
 
     protected void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, boolean isHub)
             throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
-            InvalidSubscriberIdException {
+        InvalidSubscriberIdException {
         // Validate that the format of the subscriberId is valid either as a
         // local or hub subscriber.
         if (!isValidSubscriberId(subscriberId, isHub)) {
             throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8()
-                    + ", isHub: " + isHub);
+                                                   + ", isHub: " + isHub);
         }
         try {
             subUnsub(topic, subscriberId, OperationType.SUBSCRIBE, mode);
@@ -198,35 +198,35 @@ public class HedwigSubscriber implements
     }
 
     public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
-            Object context) {
+                               Object context) {
         asyncSubscribe(topic, subscriberId, mode, callback, context, false);
     }
 
     protected void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode,
-            Callback<Void> callback, Object context, boolean isHub) {
+                                  Callback<Void> callback, Object context, boolean isHub) {
         // Validate that the format of the subscriberId is valid either as a
         // local or hub subscriber.
         if (!isValidSubscriberId(subscriberId, isHub)) {
             callback.operationFailed(context, new ServiceDownException(new InvalidSubscriberIdException(
-                    "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
+                                         "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
             return;
         }
         asyncSubUnsub(topic, subscriberId, callback, context, OperationType.SUBSCRIBE, mode);
     }
 
     public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
-            ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException {
+        ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException {
         unsubscribe(topic, subscriberId, false);
     }
 
     protected void unsubscribe(ByteString topic, ByteString subscriberId, boolean isHub)
             throws CouldNotConnectException, ClientNotSubscribedException, ServiceDownException,
-            InvalidSubscriberIdException {
+        InvalidSubscriberIdException {
         // Validate that the format of the subscriberId is valid either as a
         // local or hub subscriber.
         if (!isValidSubscriberId(subscriberId, isHub)) {
             throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8()
-                    + ", isHub: " + isHub);
+                                                   + ", isHub: " + isHub);
         }
         // Synchronously close the subscription on the client side. Even
         // if the unsubscribe request to the server errors out, we won't be
@@ -246,17 +246,17 @@ public class HedwigSubscriber implements
     }
 
     public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId, final Callback<Void> callback,
-            final Object context) {
+                                 final Object context) {
         asyncUnsubscribe(topic, subscriberId, callback, context, false);
     }
 
     protected void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
-            final Callback<Void> callback, final Object context, boolean isHub) {
+                                    final Callback<Void> callback, final Object context, boolean isHub) {
         // Validate that the format of the subscriberId is valid either as a
         // local or hub subscriber.
         if (!isValidSubscriberId(subscriberId, isHub)) {
             callback.operationFailed(context, new ServiceDownException(new InvalidSubscriberIdException(
-                    "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
+                                         "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
             return;
         }
         // Asynchronously close the subscription. On the callback to that
@@ -288,13 +288,13 @@ public class HedwigSubscriber implements
             throws ClientNotSubscribedException {
         if (logger.isDebugEnabled())
             logger.debug("Calling consume for topic: " + topic.toStringUtf8() + ", subscriberId: "
-                    + subscriberId.toStringUtf8() + ", messageSeqId: " + messageSeqId);
+                         + subscriberId.toStringUtf8() + ", messageSeqId: " + messageSeqId);
         TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
         // Check that this topic subscription on the client side exists.
         if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
             throw new ClientNotSubscribedException(
-                    "Cannot send consume message since client is not subscribed to topic: " + topic.toStringUtf8()
-                            + ", subscriberId: " + subscriberId.toStringUtf8());
+                "Cannot send consume message since client is not subscribed to topic: " + topic.toStringUtf8()
+                + ", subscriberId: " + subscriberId.toStringUtf8());
         }
         PubSubData pubSubData = new PubSubData(topic, null, subscriberId, OperationType.CONSUME, null, null, null);
         // Send the consume message to the server using the same subscribe
@@ -305,7 +305,7 @@ public class HedwigSubscriber implements
     /**
      * This is a helper method to write the actual subscribe/unsubscribe message
      * once the client is connected to the server and a Channel is available.
-     * 
+     *
      * @param pubSubData
      *            Subscribe/Unsubscribe call's data wrapper object. We assume
      *            that the operationType field is either SUBSCRIBE or
@@ -360,7 +360,7 @@ public class HedwigSubscriber implements
         // Finally, write the Subscribe request through the Channel.
         if (logger.isDebugEnabled())
             logger.debug("Writing a SubUnsub request to host: " + HedwigClient.getHostFromChannel(channel)
-                    + " for pubSubData: " + pubSubData);
+                         + " for pubSubData: " + pubSubData);
         ChannelFuture future = channel.write(pubsubRequestBuilder.build());
         future.addListener(new WriteCallback(pubSubData, client));
     }
@@ -369,7 +369,7 @@ public class HedwigSubscriber implements
      * This is a helper method to write a consume message to the server after a
      * subscribe Channel connection is made to the server and messages are being
      * consumed by the client.
-     * 
+     *
      * @param pubSubData
      *            Consume call's data wrapper object. We assume that the
      *            operationType field is CONSUME.
@@ -405,14 +405,14 @@ public class HedwigSubscriber implements
         // message if there was a problem writing the consume request.
         if (logger.isDebugEnabled())
             logger.debug("Writing a Consume request to host: " + HedwigClient.getHostFromChannel(channel)
-                    + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
+                         + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
         ChannelFuture future = channel.write(pubsubRequestBuilder.build());
         future.addListener(new ChannelFutureListener() {
             @Override
             public void operationComplete(ChannelFuture future) throws Exception {
                 if (!future.isSuccess()) {
                     logger.error("Error writing a Consume request to host: " + HedwigClient.getHostFromChannel(channel)
-                            + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);                    
+                                 + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
                 }
             }
         });
@@ -420,7 +420,7 @@ public class HedwigSubscriber implements
     }
 
     public boolean hasSubscription(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
-            ServiceDownException {
+        ServiceDownException {
         // The subscription type of info should be stored on the server end, not
         // the client side. Eventually, the server will have the Subscription
         // Manager part that ties into Zookeeper to manage this info.
@@ -433,7 +433,7 @@ public class HedwigSubscriber implements
     }
 
     public List<ByteString> getSubscriptionList(ByteString subscriberId) throws CouldNotConnectException,
-            ServiceDownException {
+        ServiceDownException {
         // Same as the previous hasSubscription method, this data should reside
         // on the server end, not the client side.
         return null;
@@ -443,7 +443,7 @@ public class HedwigSubscriber implements
             throws ClientNotSubscribedException {
         if (logger.isDebugEnabled())
             logger.debug("Starting delivery for topic: " + topic.toStringUtf8() + ", subscriberId: "
-                    + subscriberId.toStringUtf8());
+                         + subscriberId.toStringUtf8());
         TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
         // Make sure we know about this topic subscription on the client side
         // exists. The assumption is that the client should have in memory the
@@ -451,16 +451,16 @@ public class HedwigSubscriber implements
         // an ack response to the initial subscribe request.
         if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
             logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: "
-                    + subscriberId.toStringUtf8());
+                         + subscriberId.toStringUtf8());
             throw new ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8()
-                    + ", subscriberId: " + subscriberId.toStringUtf8());
+                                                   + ", subscriberId: " + subscriberId.toStringUtf8());
         }
 
         // Register the MessageHandler with the subscribe Channel's
         // Response Handler.
         Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
         HedwigClient.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
-                .setMessageHandler(messageHandler);
+        .setMessageHandler(messageHandler);
         // Now make the TopicSubscriber Channel readable (it is set to not be
         // readable when the initial subscription is done). Note that this is an
         // asynchronous call. If this fails (not likely), the futureListener
@@ -471,7 +471,7 @@ public class HedwigSubscriber implements
             public void operationComplete(ChannelFuture future) throws Exception {
                 if (!future.isSuccess()) {
                     logger.error("Unable to make subscriber Channel readable in startDelivery call for topic: "
-                            + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+                                 + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
                 }
             }
         });
@@ -480,7 +480,7 @@ public class HedwigSubscriber implements
     public void stopDelivery(final ByteString topic, final ByteString subscriberId) throws ClientNotSubscribedException {
         if (logger.isDebugEnabled())
             logger.debug("Stopping delivery for topic: " + topic.toStringUtf8() + ", subscriberId: "
-                    + subscriberId.toStringUtf8());
+                         + subscriberId.toStringUtf8());
         TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
         // Make sure we know that this topic subscription on the client side
         // exists. The assumption is that the client should have in memory the
@@ -488,16 +488,16 @@ public class HedwigSubscriber implements
         // an ack response to the initial subscribe request.
         if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
             logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: "
-                    + subscriberId.toStringUtf8());
+                         + subscriberId.toStringUtf8());
             throw new ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8()
-                    + ", subscriberId: " + subscriberId.toStringUtf8());
+                                                   + ", subscriberId: " + subscriberId.toStringUtf8());
         }
 
         // Unregister the MessageHandler for the subscribe Channel's
         // Response Handler.
         Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
         HedwigClient.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
-                .setMessageHandler(null);
+        .setMessageHandler(null);
         // Now make the TopicSubscriber channel not-readable. This will buffer
         // up messages if any are sent from the server. Note that this is an
         // asynchronous call. If this fails (not likely), the futureListener
@@ -508,7 +508,7 @@ public class HedwigSubscriber implements
             public void operationComplete(ChannelFuture future) throws Exception {
                 if (!future.isSuccess()) {
                     logger.error("Unable to make subscriber Channel not readable in stopDelivery call for topic: "
-                            + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+                                 + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
                 }
             }
         });
@@ -528,16 +528,16 @@ public class HedwigSubscriber implements
             // Check from the PubSubCallback if it was successful or not.
             if (!pubSubCallback.getIsCallSuccessful()) {
                 throw new ServiceDownException("Exception while trying to close the subscription for topic: "
-                        + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+                                               + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
             }
         }
     }
 
     public void asyncCloseSubscription(final ByteString topic, final ByteString subscriberId,
-            final Callback<Void> callback, final Object context) {
+                                       final Callback<Void> callback, final Object context) {
         if (logger.isDebugEnabled())
             logger.debug("Closing subscription asynchronously for topic: " + topic.toStringUtf8() + ", subscriberId: "
-                    + subscriberId.toStringUtf8());
+                         + subscriberId.toStringUtf8());
         TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
         if (topicSubscriber2Channel.containsKey(topicSubscriber)) {
             // Remove all cached references for the TopicSubscriber
@@ -551,10 +551,10 @@ public class HedwigSubscriber implements
                 public void operationComplete(ChannelFuture future) throws Exception {
                     if (!future.isSuccess()) {
                         logger.error("Failed to close the subscription channel for topic: " + topic.toStringUtf8()
-                                + ", subscriberId: " + subscriberId.toStringUtf8());
+                                     + ", subscriberId: " + subscriberId.toStringUtf8());
                         callback.operationFailed(context, new ServiceDownException(
-                                "Failed to close the subscription channel for topic: " + topic.toStringUtf8()
-                                        + ", subscriberId: " + subscriberId.toStringUtf8()));
+                                                     "Failed to close the subscription channel for topic: " + topic.toStringUtf8()
+                                                     + ", subscriberId: " + subscriberId.toStringUtf8()));
                     } else {
                         callback.operationFinished(context, null);
                     }
@@ -562,7 +562,7 @@ public class HedwigSubscriber implements
             });
         } else {
             logger.warn("Trying to close a subscription when we don't have a subscribe channel cached for topic: "
-                    + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+                        + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
             callback.operationFinished(context, null);
         }
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java Mon Sep  5 17:38:57 2011
@@ -113,7 +113,7 @@ public class ResponseHandler extends Sim
         PubSubResponse response = (PubSubResponse) e.getMessage();
         if (logger.isDebugEnabled())
             logger.debug("Response received from host: " + HedwigClient.getHostFromChannel(ctx.getChannel())
-                    + ", response: " + response);
+                         + ", response: " + response);
 
         // Determine if this PubSubResponse is an ack response for a PubSub
         // Request or if it is a message being pushed to the client subscriber.
@@ -127,7 +127,7 @@ public class ResponseHandler extends Sim
         // Response is an ack to a prior PubSubRequest so first retrieve the
         // PubSub data for this txn.
         PubSubData pubSubData = txn2PubSubData.containsKey(response.getTxnId()) ? txn2PubSubData.get(response
-                .getTxnId()) : null;
+                                .getTxnId()) : null;
         // Validate that the PubSub data for this txn is stored. If not, just
         // log an error message and return since we don't know how to handle
         // this.
@@ -163,14 +163,14 @@ public class ResponseHandler extends Sim
             // The above are the only expected PubSubResponse messages received
             // from the server for the various client side requests made.
             logger.error("Response received from server is for an unhandled operation type, txnId: "
-                    + response.getTxnId() + ", operationType: " + pubSubData.operationType);
+                         + response.getTxnId() + ", operationType: " + pubSubData.operationType);
         }
     }
 
     /**
      * Logic to repost a PubSubRequest when the server responds with a redirect
      * indicating they are not the topic master.
-     * 
+     *
      * @param response
      *            PubSubResponse from the server for the redirect
      * @param pubSubData
@@ -185,7 +185,7 @@ public class ResponseHandler extends Sim
             throws Exception {
         if (logger.isDebugEnabled())
             logger.debug("Handling a redirect from host: " + HedwigClient.getHostFromChannel(channel) + ", response: "
-                    + response + ", pubSubData: " + pubSubData);
+                         + response + ", pubSubData: " + pubSubData);
         // In this case, the PubSub request was done to a server that is not
         // responsible for the topic. First make sure that we haven't
         // exceeded the maximum number of server redirects.
@@ -197,8 +197,8 @@ public class ResponseHandler extends Sim
             if (logger.isDebugEnabled())
                 logger.debug("Exceeded the number of server redirects (" + curNumServerRedirects + ") so error out.");
             pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
-                    new TooManyServerRedirectsException("Already reached max number of redirects: "
-                            + curNumServerRedirects)));
+                                                    new TooManyServerRedirectsException("Already reached max number of redirects: "
+                                                            + curNumServerRedirects)));
             return;
         }
 
@@ -206,7 +206,7 @@ public class ResponseHandler extends Sim
         // stored in the StatusMsg of the response. First store the
         // server that we sent the PubSub request to for the topic.
         ByteString triedServer = ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(HedwigClient
-                .getHostFromChannel(channel)));
+                                 .getHostFromChannel(channel)));
         if (pubSubData.triedServers == null)
             pubSubData.triedServers = new LinkedList<ByteString>();
         pubSubData.shouldClaim = true;
@@ -232,10 +232,10 @@ public class ResponseHandler extends Sim
         // already before in this PubSub request.
         if (pubSubData.triedServers.contains(ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(redirectedHost)))) {
             logger.error("We've already sent this PubSubRequest before to redirectedHost: " + redirectedHost
-                    + ", pubSubData: " + pubSubData);
+                         + ", pubSubData: " + pubSubData);
             pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
-                    new ServerRedirectLoopException("Already made the request before to redirected host: "
-                            + redirectedHost)));
+                                                    new ServerRedirectLoopException("Already made the request before to redirected host: "
+                                                            + redirectedHost)));
             return;
         }
 
@@ -297,7 +297,7 @@ public class ResponseHandler extends Sim
             if (pub.host2Channel.containsKey(host) && pub.host2Channel.get(host).equals(ctx.getChannel())) {
                 if (logger.isDebugEnabled())
                     logger.debug("Disconnected channel for host: " + host
-                            + " was for Publish/Unsubscribe requests so remove all references to it.");
+                                 + " was for Publish/Unsubscribe requests so remove all references to it.");
                 pub.host2Channel.remove(host);
                 client.clearAllTopicsForHost(host);
             }
@@ -333,9 +333,9 @@ public class ResponseHandler extends Sim
         for (PubSubData pubSubData : txn2PubSubData.values()) {
             if (logger.isDebugEnabled())
                 logger.debug("Channel disconnected so invoking the operationFailed callback for pubSubData: "
-                        + pubSubData);
+                             + pubSubData);
             pubSubData.callback.operationFailed(pubSubData.context, new UncertainStateException(
-                    "Server ack response never received before server connection disconnected!"));
+                                                    "Server ack response never received before server connection disconnected!"));
         }
         txn2PubSubData.clear();
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java Mon Sep  5 17:38:57 2011
@@ -52,7 +52,7 @@ public class WriteCallback implements Ch
         // with any callback logic here.
         if (client.hasStopped())
             return;
-        
+
         // When the write operation to the server is done, we just need to check
         // if it was successful or not.
         InetSocketAddress host = HedwigClient.getHostFromChannel(future.getChannel());
@@ -73,11 +73,11 @@ public class WriteCallback implements Ch
                 // failed, so invoke the operationFailed callback.
                 logger.error("Error writing to host more than once so just invoke the operationFailed callback!");
                 pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
-                        "Error while writing message to server: " + hostString));
+                                                        "Error while writing message to server: " + hostString));
             } else {
                 if (logger.isDebugEnabled())
                     logger.debug("Try to send the PubSubRequest again to the default server host/VIP for pubSubData: "
-                            + pubSubData);
+                                 + pubSubData);
                 // Keep track of this current server that we failed to write to
                 // but retry the request on the default server host/VIP.
                 if (pubSubData.writeFailedServers == null)

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslContextFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslContextFactory.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslContextFactory.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslContextFactory.java Mon Sep  5 17:38:57 2011
@@ -43,23 +43,24 @@ public abstract class SslContextFactory 
 
     protected TrustManager[] getTrustManagers() {
         return new TrustManager[] { new X509TrustManager() {
-            // Always trust, even if invalid.
+                // Always trust, even if invalid.
 
-            @Override
-            public X509Certificate[] getAcceptedIssuers() {
-                return new X509Certificate[0];
-            }
+                @Override
+                public X509Certificate[] getAcceptedIssuers() {
+                    return new X509Certificate[0];
+                }
 
-            @Override
-            public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
-                // Always trust.
-            }
+                @Override
+                public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+                    // Always trust.
+                }
 
-            @Override
-            public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
-                // Always trust.
+                @Override
+                public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+                    // Always trust.
+                }
             }
-        } };
+        };
     }
 
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java Mon Sep  5 17:38:57 2011
@@ -34,7 +34,7 @@ public abstract class AbstractConfigurat
     /**
      * You can load configurations in precedence order. The first one takes
      * precedence over any loaded later.
-     * 
+     *
      * @param confURL
      */
     public void loadConf(URL confURL) throws ConfigurationException {

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/Callback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/Callback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/Callback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/Callback.java Mon Sep  5 17:38:57 2011
@@ -21,13 +21,13 @@ import org.apache.hedwig.exceptions.PubS
 
 /**
  * This class is used for callbacks for asynchronous operations
- * 
+ *
  */
 public interface Callback<T> {
 
     /**
      * This method is called when the asynchronous operation finishes
-     * 
+     *
      * @param ctx
      * @param resultOfOperation
      */
@@ -36,7 +36,7 @@ public interface Callback<T> {
     /**
      * This method is called when the operation failed due to some reason. The
      * reason for failure is passed in.
-     * 
+     *
      * @param ctx
      *            The context for the callback
      * @param exception
@@ -44,4 +44,4 @@ public interface Callback<T> {
      */
     public abstract void operationFailed(Object ctx, PubSubException exception);
 
-}
\ No newline at end of file
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/CallbackUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/CallbackUtils.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/CallbackUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/CallbackUtils.java Mon Sep  5 17:38:57 2011
@@ -31,10 +31,10 @@ public class CallbackUtils {
     /**
      * A callback that waits for all of a number of events to fire. If any fail,
      * then fail the final callback with a composite exception.
-     * 
+     *
      * TODO: change this to use any Exception and make CompositeException
      * generic, not a PubSubException.
-     * 
+     *
      * @param expected
      *            Number of expected callbacks.
      * @param cb
@@ -135,7 +135,7 @@ public class CallbackUtils {
      * Logs what happened before continuing the callback chain.
      */
     public static <T> Callback<T> logger(final Logger logger, final Level successLevel, final Level failureLevel, final Object successMsg,
-            final Object failureMsg, final Callback<T> cont) {
+                                         final Object failureMsg, final Callback<T> cont) {
         return new Callback<T>() {
 
             @Override

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/test/java/org/apache/hedwig/client/AppTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/test/java/org/apache/hedwig/client/AppTest.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/test/java/org/apache/hedwig/client/AppTest.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/test/java/org/apache/hedwig/client/AppTest.java Mon Sep  5 17:38:57 2011
@@ -27,7 +27,7 @@ import junit.framework.TestSuite;
 public class AppTest extends TestCase {
     /**
      * Create the test case
-     * 
+     *
      * @param testName
      *            name of the test case
      */

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/test/java/org/apache/hedwig/util/TestHedwigSocketAddress.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/test/java/org/apache/hedwig/util/TestHedwigSocketAddress.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/test/java/org/apache/hedwig/util/TestHedwigSocketAddress.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/test/java/org/apache/hedwig/util/TestHedwigSocketAddress.java Mon Sep  5 17:38:57 2011
@@ -31,7 +31,7 @@ public class TestHedwigSocketAddress ext
     private int sslPort = 9876;
     private int invalidPort = -9999;
     private String COLON = ":";
-    
+
     @Test
     public void testCreateWithSSLPort() throws Exception {
         HedwigSocketAddress addr = new HedwigSocketAddress(hostname, port, sslPort);
@@ -51,7 +51,7 @@ public class TestHedwigSocketAddress ext
         HedwigSocketAddress addr = new HedwigSocketAddress(hostname+COLON+port+COLON+sslPort);
         assertTrue(addr.getSocketAddress().equals(new InetSocketAddress(hostname, port)));
         assertTrue(addr.getSSLSocketAddress().equals(new InetSocketAddress(hostname, sslPort)));
-    }    
+    }
 
     @Test
     public void testCreateFromStringWithNoSSLPort() throws Exception {
@@ -59,7 +59,7 @@ public class TestHedwigSocketAddress ext
         assertTrue(addr.getSocketAddress().equals(new InetSocketAddress(hostname, port)));
         assertTrue(addr.getSSLSocketAddress() == null);
     }
-    
+
     @Test
     public void testCreateWithInvalidRegularPort() throws Exception {
         boolean success = false;
@@ -70,7 +70,7 @@ public class TestHedwigSocketAddress ext
             success = true;
         }
         assertTrue(success);
-    }    
+    }
 
     @Test
     public void testCreateWithInvalidSSLPort() throws Exception {
@@ -82,7 +82,7 @@ public class TestHedwigSocketAddress ext
             success = true;
         }
         assertTrue(success);
-    }    
+    }
 
     @Test
     public void testToStringConversion() throws Exception {
@@ -98,7 +98,7 @@ public class TestHedwigSocketAddress ext
         HedwigSocketAddress sslAddr = new HedwigSocketAddress(hostname, port, sslPort);
         assertTrue(sslAddr.isSSLEnabled());
         HedwigSocketAddress addr = new HedwigSocketAddress(hostname, port);
-        assertFalse(addr.isSSLEnabled());               
+        assertFalse(addr.isSSLEnabled());
     }
-    
+
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java Mon Sep  5 17:38:57 2011
@@ -135,7 +135,7 @@ public abstract class PubSubException ex
             super(StatusCode.UNEXPECTED_CONDITION, msg);
         }
     }
-    
+
     // The composite exception (for concurrent operations).
     public static class CompositeException extends PubSubException {
         private final Collection<PubSubException> exceptions;