You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2011/09/05 19:39:03 UTC

svn commit: r1165369 [8/9] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/clie...

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java Mon Sep  5 17:38:57 2011
@@ -20,7 +20,7 @@ package org.apache.hedwig.server.persist
 public interface PersistenceManagerWithRangeScan extends PersistenceManager {
     /**
      * Executes the given range scan request
-     * 
+     *
      * @param request
      */
     public void scanMessages(RangeScanRequest request);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java Mon Sep  5 17:38:57 2011
@@ -30,7 +30,7 @@ import com.google.protobuf.ByteString;
  * {@link ScanCallback} used should be prepared to deal with more or less
  * messages scanned. If an error occurs during scanning, the
  * {@link ScanCallback} is notified of the error.
- * 
+ *
  */
 public class RangeScanRequest {
     ByteString topic;
@@ -41,7 +41,7 @@ public class RangeScanRequest {
     Object ctx;
 
     public RangeScanRequest(ByteString topic, long startSeqId, int messageLimit, long sizeLimit, ScanCallback callback,
-            Object ctx) {
+                            Object ctx) {
         this.topic = topic;
         this.startSeqId = startSeqId;
         this.messageLimit = messageLimit;

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java Mon Sep  5 17:38:57 2011
@@ -107,7 +107,7 @@ public class ReadAheadCache implements P
 
     /**
      * Constructor. Starts the cache maintainer thread
-     * 
+     *
      * @param realPersistenceManager
      */
     public ReadAheadCache(PersistenceManagerWithRangeScan realPersistenceManager, ServerConfiguration cfg) {
@@ -139,10 +139,10 @@ public class ReadAheadCache implements P
      * ========================================================================
      * Other methods of {@link PersistenceManager} that the cache needs to take
      * some action on.
-     * 
+     *
      * 1. Persist: We pass it through to the real persistence manager but insert
      * our callback on the return path
-     * 
+     *
      */
     public void persistMessage(PersistRequest request) {
         // make a new PersistRequest object so that we can insert our own
@@ -158,7 +158,7 @@ public class ReadAheadCache implements P
      * The callback that we insert on the persist request return path. The
      * callback simply forms a {@link PersistResponse} object and inserts it in
      * the request queue to be handled serially by the cache maintainer thread.
-     * 
+     *
      */
     public class PersistCallback implements Callback<Long> {
 
@@ -188,7 +188,7 @@ public class ReadAheadCache implements P
             // Original message that was persisted didn't have the local seq-id.
             // Lets add that in
             Message messageWithLocalSeqId = MessageIdUtils.mergeLocalSeqId(originalRequest.getMessage(),
-                    resultOfOperation);
+                                            resultOfOperation);
 
             // Now enqueue a request to add this newly persisted message to our
             // cache
@@ -204,20 +204,20 @@ public class ReadAheadCache implements P
      * callbacks. Its just simpler to quit and restart afresh. Moreover, this
      * should not happen as the request queue for the cache maintainer is
      * unbounded.
-     * 
+     *
      * @param obj
      */
     protected void enqueueWithoutFailure(CacheRequest obj) {
         if (!requestQueue.offer(obj)) {
             throw new UnexpectedError("Could not enqueue object: " + obj.toString()
-                    + " to cache request queue. Exiting.");
+                                      + " to cache request queue. Exiting.");
 
         }
     }
 
     /**
      * Another method from {@link PersistenceManager}.
-     * 
+     *
      * 2. Scan - Since the scan needs to touch the cache, we will just enqueue
      * the scan request and let the cache maintainer thread handle it.
      */
@@ -228,7 +228,7 @@ public class ReadAheadCache implements P
 
     /**
      * Another method from {@link PersistenceManager}.
-     * 
+     *
      * 3. Enqueue the request so that the cache maintainer thread can delete all
      * message-ids older than the one specified
      */
@@ -238,7 +238,7 @@ public class ReadAheadCache implements P
 
     /**
      * Another method from {@link PersistenceManager}.
-     * 
+     *
      * Since this is a cache layer on top of an underlying persistence manager,
      * we can just call the consumedUntil method there. The messages older than
      * the latest one passed here won't be accessed anymore so they should just
@@ -252,7 +252,7 @@ public class ReadAheadCache implements P
     /**
      * ========================================================================
      * BEGINNING OF CODE FOR THE CACHE MAINTAINER THREAD
-     * 
+     *
      * 1. The run method. It simply dequeues from the request queue, checks the
      * type of object and acts accordingly
      */
@@ -284,7 +284,7 @@ public class ReadAheadCache implements P
      * outstanding. In that case, we look a little ahead (by readAheadCount/2)
      * and issue a range read of readAheadCount/2 messages. The idea is to
      * ensure that the next readAheadCount messages are always available.
-     * 
+     *
      * @return the range scan that should be issued for read ahead
      */
     protected RangeScanRequest doReadAhead(ScanRequest request) {
@@ -313,7 +313,7 @@ public class ReadAheadCache implements P
     /**
      * This method just checks if the provided seq-id already exists in the
      * cache. If not, a range read of the specified amount is issued.
-     * 
+     *
      * @param topic
      * @param seqId
      * @param readAheadCount
@@ -369,7 +369,7 @@ public class ReadAheadCache implements P
 
         /**
          * Constructor
-         * 
+         *
          * @param installedStubs
          *            The list of stubs that were installed for this range scan
          * @param topic
@@ -407,8 +407,8 @@ public class ReadAheadCache implements P
             // should remove them, so that whoever is waiting on them can retry.
             // This shouldn't be happening usually
             logger.warn("Unexpected message seq-id: " + message.getMsgId().getLocalComponent() + " on topic: "
-                    + topic.toStringUtf8() + " from readahead scan, was expecting seq-id: " + expectedKey.seqId
-                    + " topic: " + expectedKey.topic.toStringUtf8() + " installedStubs: " + installedStubs);
+                        + topic.toStringUtf8() + " from readahead scan, was expecting seq-id: " + expectedKey.seqId
+                        + " topic: " + expectedKey.topic.toStringUtf8() + " installedStubs: " + installedStubs);
             enqueueDeleteOfRemainingStubs(noSuchSeqIdExceptionInstance);
 
         }
@@ -457,14 +457,14 @@ public class ReadAheadCache implements P
      * For adding the message to the cache, we do some bookeeping such as the
      * total size of cache, order in which entries were added etc. If the size
      * of the cache has exceeded our budget, old entries are collected.
-     * 
+     *
      * @param cacheKey
      * @param message
      */
     protected void addMessageToCache(CacheKey cacheKey, Message message, long currTime) {
         if (logger.isDebugEnabled()) {
             logger.debug("Adding msg (topic: " + cacheKey.getTopic().toStringUtf8() + ", seq-id: "
-                    + message.getMsgId().getLocalComponent() + ") to readahead cache");
+                         + message.getMsgId().getLocalComponent() + ") to readahead cache");
         }
 
         CacheValue cacheValue;
@@ -482,7 +482,7 @@ public class ReadAheadCache implements P
 
         // maintain the index of seq-id
         MapMethods.addToMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(), cacheKey.getSeqId(),
-                TreeSetLongFactory.instance);
+                                 TreeSetLongFactory.instance);
 
         // finally add the message to the cache
         cacheValue.setMessageAndInvokeCallbacks(message, currTime);
@@ -492,7 +492,7 @@ public class ReadAheadCache implements P
     }
 
     protected void removeMessageFromCache(CacheKey cacheKey, Exception exception, boolean maintainTimeIndex,
-            boolean maintainSeqIdIndex) {
+                                          boolean maintainSeqIdIndex) {
         CacheValue cacheValue = cache.remove(cacheKey);
 
         if (cacheValue == null) {
@@ -537,13 +537,13 @@ public class ReadAheadCache implements P
 
                 if (logger.isDebugEnabled()) {
                     logger.debug("Removing topic: " + cacheKey.getTopic() + "seq-id: " + cacheKey.getSeqId()
-                            + " from cache because its the oldest");
+                                 + " from cache because its the oldest");
                 }
                 removeMessageFromCache(cacheKey, readAheadExceptionInstance, //
-                        // maintainTimeIndex=
-                        false,
-                        // maintainSeqIdIndex=
-                        true);
+                                       // maintainTimeIndex=
+                                       false,
+                                       // maintainSeqIdIndex=
+                                       true);
             }
 
             timeIndexOfAddition.remove(earliestTime);
@@ -554,7 +554,7 @@ public class ReadAheadCache implements P
     /**
      * ========================================================================
      * The rest is just simple wrapper classes.
-     * 
+     *
      */
 
     protected class ExceptionOnCacheKey implements CacheRequest {
@@ -575,10 +575,10 @@ public class ReadAheadCache implements P
          */
         public void performRequest() {
             removeMessageFromCache(cacheKey, exception,
-            // maintainTimeIndex=
-                    true,
-                    // maintainSeqIdIndex=
-                    true);
+                                   // maintainTimeIndex=
+                                   true,
+                                   // maintainSeqIdIndex=
+                                   true);
         }
 
     }
@@ -638,15 +638,15 @@ public class ReadAheadCache implements P
 
                 if (logger.isDebugEnabled()) {
                     logger.debug("Removing seq-id: " + cacheKey.getSeqId() + " topic: "
-                            + cacheKey.getTopic().toStringUtf8()
-                            + " from cache because every subscriber has moved past");
+                                 + cacheKey.getTopic().toStringUtf8()
+                                 + " from cache because every subscriber has moved past");
                 }
 
                 removeMessageFromCache(cacheKey, readAheadExceptionInstance, //
-                        // maintainTimeIndex=
-                        true,
-                        // maintainSeqIdIndex=
-                        false);
+                                       // maintainTimeIndex=
+                                       true,
+                                       // maintainSeqIdIndex=
+                                       false);
                 iter.remove();
             }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java Mon Sep  5 17:38:57 2011
@@ -30,7 +30,7 @@ public interface ScanCallback {
      * as part of a scan. The message just read is handed to this listener which
      * can then take the desired action on it. The return value from the method
      * indicates whether the scan should continue or not.
-     * 
+     *
      * @param ctx
      *            The context for the callback
      * @param message
@@ -41,8 +41,8 @@ public interface ScanCallback {
 
     /**
      * This method is called when the scan finishes
-     * 
-     * 
+     *
+     *
      * @param ctx
      * @param reason
      */
@@ -52,7 +52,7 @@ public interface ScanCallback {
     /**
      * This method is called when the operation failed due to some reason. The
      * reason for failure is passed in.
-     * 
+     *
      * @param ctx
      *            The context for the callback
      * @param exception

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java Mon Sep  5 17:38:57 2011
@@ -30,7 +30,7 @@ import org.apache.hedwig.protocol.PubSub
  * its redundant.
  * {@link ScanCallback#scanFailed(Object, org.apache.hedwig.exceptions.PubSubException)}
  * method is called in case of error.
- * 
+ *
  */
 public class ScanRequest {
     ByteString topic;

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java Mon Sep  5 17:38:57 2011
@@ -72,7 +72,7 @@ public class ChannelTracker implements C
 
         if (topicSub2Channel.containsKey(topicSubscriber)) {
             TopicBusyException pse = new PubSubException.TopicBusyException(
-                    "subscription for this topic, subscriberId is already being served on a different channel");
+                "subscription for this topic, subscriberId is already being served on a different channel");
             throw pse;
         }
 
@@ -107,12 +107,12 @@ public class ChannelTracker implements C
 
         if (subscribedChannel == null) {
             throw new PubSubException.ClientNotSubscribedException(
-                    "Can't start delivery since client is not subscribed");
+                "Can't start delivery since client is not subscribed");
         }
 
         if (subscribedChannel != channel) {
             throw new PubSubException.TopicBusyException(
-                    "Can't start delivery since client is subscribed on a different channel");
+                "Can't start delivery since client is subscribed on a different channel");
         }
 
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java Mon Sep  5 17:38:57 2011
@@ -48,7 +48,7 @@ public class HedwigProxy {
 
     HedwigClient client;
     ServerSocketChannelFactory serverSocketChannelFactory;
-    ChannelGroup allChannels; 
+    ChannelGroup allChannels;
     Map<OperationType, Handler> handlers;
     ProxyConfiguration cfg;
 
@@ -69,7 +69,7 @@ public class HedwigProxy {
             @Override
             public void run() {
                 client = new HedwigClient(cfg);
-                
+
                 serverSocketChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
                         Executors.newCachedThreadPool());
                 initializeHandlers();
@@ -125,7 +125,7 @@ public class HedwigProxy {
 
     // the following method only exists for unit-testing purposes, should go
     // away once we make start delivery totally server-side
-    public Handler getStartDeliveryHandler(){
+    public Handler getStartDeliveryHandler() {
         return handlers.get(OperationType.START_DELIVERY);
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java Mon Sep  5 17:38:57 2011
@@ -19,18 +19,18 @@ package org.apache.hedwig.server.proxy;
 
 import org.apache.hedwig.client.conf.ClientConfiguration;
 
-public class ProxyConfiguration extends ClientConfiguration{
+public class ProxyConfiguration extends ClientConfiguration {
 
     protected static String PROXY_PORT = "proxy_port";
     protected static String MAX_MESSAGE_SIZE = "max_message_size";
-    
-    public int getProxyPort(){
+
+    public int getProxyPort() {
         return conf.getInt(PROXY_PORT, 9099);
     }
-    
+
     @Override
     public int getMaximumMessageSize() {
         return conf.getInt(MAX_MESSAGE_SIZE, 1258291); /* 1.2M */
     }
-    
+
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java Mon Sep  5 17:38:57 2011
@@ -71,22 +71,22 @@ public class ProxyStartDeliveryHandler i
             // }
 
             final Channel subscribedChannel = tracker.getChannel(topic, subscriberId);
-            
+
             if (subscribedChannel == null) {
                 channel.write(PubSubResponseUtils.getResponseForException(
-                        new PubSubException.ClientNotSubscribedException("no subscription to start delivery on"),
-                        request.getTxnId()));
+                                  new PubSubException.ClientNotSubscribedException("no subscription to start delivery on"),
+                                  request.getTxnId()));
                 return;
             }
-            
+
             MessageHandler handler = new MessageHandler() {
                 @Override
                 public void consume(ByteString topic, ByteString subscriberId, Message msg,
-                        final Callback<Void> callback, final Object context) {
+                final Callback<Void> callback, final Object context) {
 
                     PubSubResponse response = PubSubResponse.newBuilder().setProtocolVersion(
-                            ProtocolVersion.VERSION_ONE).setStatusCode(StatusCode.SUCCESS).setTxnId(0).setMessage(msg)
-                            .setTopic(topic).setSubscriberId(subscriberId).build();
+                                                  ProtocolVersion.VERSION_ONE).setStatusCode(StatusCode.SUCCESS).setTxnId(0).setMessage(msg)
+                                              .setTopic(topic).setSubscriberId(subscriberId).build();
 
                     ChannelFuture future = subscribedChannel.write(response);
 
@@ -119,7 +119,7 @@ public class ProxyStartDeliveryHandler i
                 logger.fatal("Unexpected: No subscription when attempting to start delivery", e);
                 throw new RuntimeException(e);
             }
-            
+
 
 
         }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java Mon Sep  5 17:38:57 2011
@@ -60,7 +60,7 @@ public class ProxySubscribeHandler imple
         final TopicSubscriber topicSubscriber = new TopicSubscriber(request.getTopic(), subRequest.getSubscriberId());
 
         subscriber.asyncSubscribe(topicSubscriber.getTopic(), subRequest.getSubscriberId(), subRequest
-                .getCreateOrAttach(), new Callback<Void>() {
+        .getCreateOrAttach(), new Callback<Void>() {
             @Override
             public void operationFailed(Object ctx, PubSubException exception) {
                 channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java Mon Sep  5 17:38:57 2011
@@ -38,7 +38,7 @@ public class HedwigHubClientFactory {
     /**
      * Manufacture a hub client whose default server to connect to is the input
      * HedwigSocketAddress hub.
-     * 
+     *
      * @param hub
      *            The hub in another region to connect to.
      */

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java Mon Sep  5 17:38:57 2011
@@ -45,25 +45,25 @@ public class HedwigHubSubscriber extends
     @Override
     public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
             throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
-            InvalidSubscriberIdException {
+        InvalidSubscriberIdException {
         subscribe(topic, subscriberId, mode, true);
     }
 
     @Override
     public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
-            Object context) {
+                               Object context) {
         asyncSubscribe(topic, subscriberId, mode, callback, context, true);
     }
 
     @Override
     public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
-            ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException {
+        ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException {
         unsubscribe(topic, subscriberId, true);
     }
 
     @Override
     public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId, final Callback<Void> callback,
-            final Object context) {
+                                 final Object context) {
         asyncUnsubscribe(topic, subscriberId, callback, context, true);
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java Mon Sep  5 17:38:57 2011
@@ -52,7 +52,7 @@ public class RegionManager implements Su
     private final TopicOpQueuer queue;
 
     public RegionManager(final PersistenceManager pm, final ServerConfiguration cfg, final ZooKeeper zk,
-            ScheduledExecutorService scheduler, HedwigHubClientFactory hubClientFactory) {
+                         ScheduledExecutorService scheduler, HedwigHubClientFactory hubClientFactory) {
         this.pm = pm;
         mySubId = ByteString.copyFromUtf8(SubscriptionStateUtils.HUB_SUBSCRIBER_PREFIX + cfg.getMyRegion());
         queue = new TopicOpQueuer(scheduler);
@@ -70,7 +70,7 @@ public class RegionManager implements Su
             @Override
             public void run() {
                 Callback<Void> postCb = synchronous ? cb : CallbackUtils.logger(LOGGER, Level.DEBUG, Level.ERROR,
-                        "all cross-region subscriptions succeeded", "at least one cross-region subscription failed");
+                                        "all cross-region subscriptions succeeded", "at least one cross-region subscription failed");
                 final Callback<Void> mcb = CallbackUtils.multiCallback(clients.size(), postCb, ctx);
                 for (final HedwigHubClient client : clients) {
                     final HedwigSubscriber sub = client.getSubscriber();
@@ -83,23 +83,23 @@ public class RegionManager implements Su
                                 sub.startDelivery(topic, mySubId, new MessageHandler() {
                                     @Override
                                     public void consume(final ByteString topic, ByteString subscriberId, Message msg,
-                                            final Callback<Void> callback, final Object context) {
+                                    final Callback<Void> callback, final Object context) {
                                         // When messages are first published
                                         // locally, the PublishHandler sets the
                                         // source region in the Message.
                                         if (msg.hasSrcRegion()) {
                                             Message.newBuilder(msg).setMsgId(
-                                                    MessageSeqId.newBuilder(msg.getMsgId()).addRemoteComponents(
-                                                            RegionSpecificSeqId.newBuilder().setRegion(
-                                                                    msg.getSrcRegion()).setSeqId(
-                                                                    msg.getMsgId().getLocalComponent())));
+                                                MessageSeqId.newBuilder(msg.getMsgId()).addRemoteComponents(
+                                                    RegionSpecificSeqId.newBuilder().setRegion(
+                                                        msg.getSrcRegion()).setSeqId(
+                                                        msg.getMsgId().getLocalComponent())));
                                         }
                                         pm.persistMessage(new PersistRequest(topic, msg, new Callback<Long>() {
                                             @Override
                                             public void operationFinished(Object ctx, Long resultOfOperation) {
                                                 if (LOGGER.isDebugEnabled())
                                                     LOGGER.debug("cross-region recv-fwd succeeded for topic "
-                                                            + topic.toStringUtf8());
+                                                                 + topic.toStringUtf8());
                                                 callback.operationFinished(context, null);
                                             }
 
@@ -107,7 +107,7 @@ public class RegionManager implements Su
                                             public void operationFailed(Object ctx, PubSubException exception) {
                                                 if (LOGGER.isDebugEnabled())
                                                     LOGGER.error("cross-region recv-fwd failed for topic "
-                                                            + topic.toStringUtf8(), exception);
+                                                                 + topic.toStringUtf8(), exception);
                                                 callback.operationFailed(context, exception);
                                             }
                                         }, null));
@@ -115,12 +115,12 @@ public class RegionManager implements Su
                                 });
                                 if (LOGGER.isDebugEnabled())
                                     LOGGER.debug("cross-region start-delivery succeeded for topic "
-                                            + topic.toStringUtf8());
+                                                 + topic.toStringUtf8());
                                 mcb.operationFinished(ctx, null);
                             } catch (PubSubException ex) {
                                 if (LOGGER.isDebugEnabled())
                                     LOGGER.error(
-                                            "cross-region start-delivery failed for topic " + topic.toStringUtf8(), ex);
+                                        "cross-region start-delivery failed for topic " + topic.toStringUtf8(), ex);
                                 mcb.operationFailed(ctx, ex);
                             }
                         }
@@ -129,7 +129,7 @@ public class RegionManager implements Su
                         public void operationFailed(Object ctx, PubSubException exception) {
                             if (LOGGER.isDebugEnabled())
                                 LOGGER.error("cross-region subscribe failed for topic " + topic.toStringUtf8(),
-                                        exception);
+                                             exception);
                             mcb.operationFailed(ctx, exception);
                         }
                     }, null);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Mon Sep  5 17:38:57 2011
@@ -66,7 +66,7 @@ public abstract class AbstractSubscripti
     private final ConcurrentHashMap<ByteString, Long> topic2MinConsumedMessagesMap = new ConcurrentHashMap<ByteString, Long>();
 
     public AbstractSubscriptionManager(ServerConfiguration cfg, TopicManager tm, PersistenceManager pm,
-            ScheduledExecutorService scheduler) {
+                                       ScheduledExecutorService scheduler) {
         this.cfg = cfg;
         queuer = new TopicOpQueuer(scheduler);
         tm.addTopicOwnershipChangeListener(this);
@@ -141,7 +141,7 @@ public abstract class AbstractSubscripti
 
                 @Override
                 public void operationFinished(final Object ctx,
-                        final Map<ByteString, InMemorySubscriptionState> resultOfOperation) {
+                final Map<ByteString, InMemorySubscriptionState> resultOfOperation) {
                     // We've just inherited a bunch of subscriber for this
                     // topic, some of which may be local. If they are, then we
                     // need to (1) notify listeners of this and (2) record the
@@ -163,7 +163,7 @@ public abstract class AbstractSubscripti
                         @Override
                         public void operationFailed(Object ctx, PubSubException exception) {
                             logger.error("Subscription manager failed to acquired topic " + topic.toStringUtf8(),
-                                    exception);
+                                         exception);
                             cb.operationFailed(ctx, null);
                         }
 
@@ -201,7 +201,7 @@ public abstract class AbstractSubscripti
      * Figure out who is subscribed. Do nothing if already acquired. If there's
      * an error reading the subscribers' sequence IDs, then the topic is not
      * acquired.
-     * 
+     *
      * @param topic
      * @param callback
      * @param ctx
@@ -236,7 +236,7 @@ public abstract class AbstractSubscripti
         MessageSeqId consumeSeqId;
 
         public SubscribeOp(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
-                Callback<MessageSeqId> callback, Object ctx) {
+                           Callback<MessageSeqId> callback, Object ctx) {
             queuer.super(topic, callback, ctx);
             this.subRequest = subRequest;
             this.consumeSeqId = consumeSeqId;
@@ -259,8 +259,8 @@ public abstract class AbstractSubscripti
 
                 if (createOrAttach.equals(CreateOrAttach.CREATE)) {
                     String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
-                            + " requested creating a subscription but it is already subscribed with state: "
-                            + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState());
+                                 + " requested creating a subscription but it is already subscribed with state: "
+                                 + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState());
                     logger.debug(msg);
                     cb.operationFailed(ctx, new PubSubException.ClientAlreadySubscribedException(msg));
                     return;
@@ -269,8 +269,8 @@ public abstract class AbstractSubscripti
                 // otherwise just attach
                 if (logger.isDebugEnabled()) {
                     logger.debug("Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
-                            + " attaching to subscription with state: "
-                            + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState()));
+                                 + " attaching to subscription with state: "
+                                 + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState()));
                 }
 
                 cb.operationFinished(ctx, subscriptionState.getLastConsumeSeqId());
@@ -280,7 +280,7 @@ public abstract class AbstractSubscripti
             // we don't have a mapping for this subscriber
             if (createOrAttach.equals(CreateOrAttach.ATTACH)) {
                 String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
-                        + " requested attaching to an existing subscription but it is not subscribed";
+                             + " requested attaching to an existing subscription but it is not subscribed";
                 logger.debug(msg);
                 cb.operationFailed(ctx, new PubSubException.ClientNotSubscribedException(msg));
                 return;
@@ -301,7 +301,7 @@ public abstract class AbstractSubscripti
                         @Override
                         public void operationFailed(Object ctx, PubSubException exception) {
                             logger.error("subscription for subscriber " + subscriberId.toStringUtf8() + " to topic "
-                                    + topic.toStringUtf8() + " failed due to failed listener callback", exception);
+                                         + topic.toStringUtf8() + " failed due to failed listener callback", exception);
                             cb.operationFailed(ctx, exception);
                         }
 
@@ -325,7 +325,7 @@ public abstract class AbstractSubscripti
 
     @Override
     public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
-            Callback<MessageSeqId> callback, Object ctx) {
+                                      Callback<MessageSeqId> callback, Object ctx) {
         queuer.pushAndMaybeRun(topic, new SubscribeOp(topic, subRequest, consumeSeqId, callback, ctx));
     }
 
@@ -334,7 +334,7 @@ public abstract class AbstractSubscripti
         MessageSeqId consumeSeqId;
 
         public ConsumeOp(ByteString topic, ByteString subscriberId, MessageSeqId consumeSeqId, Callback<Void> callback,
-                Object ctx) {
+                         Object ctx) {
             queuer.super(topic, callback, ctx);
             this.subscriberId = subscriberId;
             this.consumeSeqId = consumeSeqId;
@@ -359,10 +359,10 @@ public abstract class AbstractSubscripti
             } else {
                 if (logger.isDebugEnabled()) {
                     logger.debug("Only advanced consume pointer in memory, will persist later, topic: "
-                            + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
-                            + " persistentState: " + SubscriptionStateUtils.toString(subState.getSubscriptionState())
-                            + " in-memory consume-id: "
-                            + MessageIdUtils.msgIdToReadableString(subState.getLastConsumeSeqId()));
+                                 + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
+                                 + " persistentState: " + SubscriptionStateUtils.toString(subState.getSubscriptionState())
+                                 + " in-memory consume-id: "
+                                 + MessageIdUtils.msgIdToReadableString(subState.getLastConsumeSeqId()));
                 }
                 cb.operationFinished(ctx, null);
             }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java Mon Sep  5 17:38:57 2011
@@ -36,21 +36,21 @@ public class InMemorySubscriptionManager
 
     @Override
     protected void createSubscriptionState(ByteString topic, ByteString subscriberId, SubscriptionState state,
-            Callback<Void> callback, Object ctx) {
+                                           Callback<Void> callback, Object ctx) {
         // nothing to do, in-memory info is already recorded by base class
         callback.operationFinished(ctx, null);
     }
 
     @Override
     protected void deleteSubscriptionState(ByteString topic, ByteString subscriberId, Callback<Void> callback,
-            Object ctx) {
+                                           Object ctx) {
         // nothing to do, in-memory info is already deleted by base class
         callback.operationFinished(ctx, null);
     }
 
     @Override
     protected void updateSubscriptionState(ByteString topic, ByteString subscriberId, SubscriptionState state,
-            Callback<Void> callback, Object ctx) {
+                                           Callback<Void> callback, Object ctx) {
         // nothing to do, in-memory info is already updated by base class
         callback.operationFinished(ctx, null);
     }
@@ -62,7 +62,7 @@ public class InMemorySubscriptionManager
 
     @Override
     protected void readSubscriptions(ByteString topic,
-            Callback<Map<ByteString, InMemorySubscriptionState>> cb, Object ctx) {
+                                     Callback<Map<ByteString, InMemorySubscriptionState>> cb, Object ctx) {
         // Since we don't lose in-memory information on lostTopic, we can just
         // return that back
         Map<ByteString, InMemorySubscriptionState> topicSubs = top2sub2seq.get(topic);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java Mon Sep  5 17:38:57 2011
@@ -42,7 +42,7 @@ public class InMemorySubscriptionState {
     }
 
     /**
-     * 
+     *
      * @param lastConsumeSeqId
      * @param consumeInterval
      *            The amount of laziness we want in persisting the consume

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MessageFilter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MessageFilter.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MessageFilter.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MessageFilter.java Mon Sep  5 17:38:57 2011
@@ -23,7 +23,7 @@ public interface MessageFilter {
 
     /**
      * Tests whether a particular message passes the filter or not
-     * 
+     *
      * @param message
      * @return
      */

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java Mon Sep  5 17:38:57 2011
@@ -22,7 +22,7 @@ import org.apache.hedwig.util.Callback;
 
 /**
  * For listening to events that are issued by a SubscriptionManager.
- * 
+ *
  */
 public interface SubscriptionEventListener {
 
@@ -30,7 +30,7 @@ public interface SubscriptionEventListen
      * Called by the subscription manager when it previously had zero local
      * subscribers for a topic and is currently accepting its first local
      * subscriber.
-     * 
+     *
      * @param topic
      *            The topic of interest.
      * @param synchronous
@@ -48,10 +48,10 @@ public interface SubscriptionEventListen
      * Called by the SubscriptionManager when it previously had non-zero local
      * subscribers for a topic and is currently dropping its last local
      * subscriber. This is fully asynchronous so there is no callback.
-     * 
+     *
      * @param topic
      *            The topic of interest.
      */
     public void onLastLocalUnsubscribe(ByteString topic);
-    
+
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java Mon Sep  5 17:38:57 2011
@@ -28,15 +28,15 @@ import org.apache.hedwig.util.Callback;
 public interface SubscriptionManager {
 
     /**
-     * 
+     *
      * Register a new subscription for the given subscriber for the given topic.
      * This method should reliably persist the existence of the subscription in
      * a way that it can't be lost. If the subscription already exists,
      * depending on the create or attach flag in the subscribe request, an
      * exception may be returned.
-     * 
+     *
      * This is an asynchronous method.
-     * 
+     *
      * @param topic
      * @param subRequest
      * @param consumeSeqId
@@ -48,14 +48,14 @@ public interface SubscriptionManager {
      * @param ctx
      */
     public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
-            Callback<MessageSeqId> callback, Object ctx);
+                                      Callback<MessageSeqId> callback, Object ctx);
 
     /**
      * Set the consume position of a given subscriber on a given topic. Note
      * that this method need not persist the consume position immediately but
      * can be lazy and persist it later asynchronously, if that is more
      * efficient.
-     * 
+     *
      * @param topic
      * @param subscriberId
      * @param consumeSeqId
@@ -65,7 +65,7 @@ public interface SubscriptionManager {
 
     /**
      * Delete a particular subscription
-     * 
+     *
      * @param topic
      * @param subscriberId
      */
@@ -100,5 +100,5 @@ public interface SubscriptionManager {
      * or removed.
      */
     public void addListener(SubscriptionEventListener listener);
-    
+
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java Mon Sep  5 17:38:57 2011
@@ -50,7 +50,7 @@ public class ZkSubscriptionManager exten
     protected final static Logger logger = Logger.getLogger(ZkSubscriptionManager.class);
 
     public ZkSubscriptionManager(ZooKeeper zk, TopicManager topicMgr, PersistenceManager pm, ServerConfiguration cfg,
-            ScheduledExecutorService scheduler) {
+                                 ScheduledExecutorService scheduler) {
         super(cfg, topicMgr, pm, scheduler);
         this.zk = zk;
     }
@@ -61,12 +61,12 @@ public class ZkSubscriptionManager exten
 
     private String topicSubscriberPath(ByteString topic, ByteString subscriber) {
         return topicSubscribersPath(new StringBuilder(), topic).append("/").append(subscriber.toStringUtf8())
-                .toString();
+               .toString();
     }
 
     @Override
     protected void readSubscriptions(final ByteString topic,
-            final Callback<Map<ByteString, InMemorySubscriptionState>> cb, final Object ctx) {
+                                     final Callback<Map<ByteString, InMemorySubscriptionState>> cb, final Object ctx) {
 
         String topicSubscribersPath = topicSubscribersPath(new StringBuilder(), topic).toString();
         zk.getChildren(topicSubscribersPath, false, new SafeAsyncZKCallback.ChildrenCallback() {
@@ -75,7 +75,7 @@ public class ZkSubscriptionManager exten
 
                 if (rc != Code.OK.intValue() && rc != Code.NONODE.intValue()) {
                     KeeperException e = ZkUtils.logErrorAndCreateZKException("Could not read subscribers for topic "
-                            + topic.toStringUtf8(), path, rc);
+                                        + topic.toStringUtf8(), path, rc);
                     cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
                     return;
                 }
@@ -104,8 +104,8 @@ public class ZkSubscriptionManager exten
 
                             if (rc != Code.OK.intValue()) {
                                 KeeperException e = ZkUtils.logErrorAndCreateZKException(
-                                        "Could not read subscription data for topic: " + topic.toStringUtf8()
-                                                + ", subscriberId: " + subscriberId.toStringUtf8(), path, rc);
+                                                        "Could not read subscription data for topic: " + topic.toStringUtf8()
+                                                        + ", subscriberId: " + subscriberId.toStringUtf8(), path, rc);
                                 reportFailure(new PubSubException.ServiceDownException(e));
                                 return;
                             }
@@ -120,7 +120,7 @@ public class ZkSubscriptionManager exten
                                 state = SubscriptionState.parseFrom(data);
                             } catch (InvalidProtocolBufferException ex) {
                                 String msg = "Failed to deserialize state for topic: " + topic.toStringUtf8()
-                                        + " subscriberId: " + subscriberId.toStringUtf8();
+                                             + " subscriberId: " + subscriberId.toStringUtf8();
                                 logger.error(msg, ex);
                                 reportFailure(new PubSubException.UnexpectedConditionException(msg));
                                 return;
@@ -128,8 +128,8 @@ public class ZkSubscriptionManager exten
 
                             if (logger.isDebugEnabled()) {
                                 logger.debug("Found subscription while acquiring topic: " + topic.toStringUtf8()
-                                        + " subscriberId: " + child + "state: "
-                                        + SubscriptionStateUtils.toString(state));
+                                             + " subscriberId: " + child + "state: "
+                                             + SubscriptionStateUtils.toString(state));
                             }
 
                             topicSubs.put(subscriberId, new InMemorySubscriptionState(state));
@@ -151,65 +151,65 @@ public class ZkSubscriptionManager exten
 
     @Override
     protected void createSubscriptionState(final ByteString topic, final ByteString subscriberId,
-            final SubscriptionState state, final Callback<Void> callback, final Object ctx) {
+                                           final SubscriptionState state, final Callback<Void> callback, final Object ctx) {
         ZkUtils.createFullPathOptimistic(zk, topicSubscriberPath(topic, subscriberId), state.toByteArray(),
-                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
+        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
 
-                    @Override
-                    public void safeProcessResult(int rc, String path, Object ctx, String name) {
-                        if (rc == Code.OK.intValue()) {
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("Successfully recorded subscription for topic: " + topic.toStringUtf8()
-                                        + " subscriberId: " + subscriberId.toStringUtf8() + " state: "
-                                        + SubscriptionStateUtils.toString(state));
-                            }
-                            callback.operationFinished(ctx, null);
-                        } else {
-                            KeeperException ke = ZkUtils.logErrorAndCreateZKException(
-                                    "Could not record new subscription for topic: " + topic.toStringUtf8()
-                                            + " subscriberId: " + subscriberId.toStringUtf8(), path, rc);
-                            callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
-                        }
+            @Override
+            public void safeProcessResult(int rc, String path, Object ctx, String name) {
+                if (rc == Code.OK.intValue()) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Successfully recorded subscription for topic: " + topic.toStringUtf8()
+                                     + " subscriberId: " + subscriberId.toStringUtf8() + " state: "
+                                     + SubscriptionStateUtils.toString(state));
                     }
-                }, ctx);
+                    callback.operationFinished(ctx, null);
+                } else {
+                    KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+                                             "Could not record new subscription for topic: " + topic.toStringUtf8()
+                                             + " subscriberId: " + subscriberId.toStringUtf8(), path, rc);
+                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+                }
+            }
+        }, ctx);
     }
 
     @Override
     protected void updateSubscriptionState(final ByteString topic, final ByteString subscriberId,
-            final SubscriptionState state, final Callback<Void> callback, final Object ctx) {
+                                           final SubscriptionState state, final Callback<Void> callback, final Object ctx) {
         zk.setData(topicSubscriberPath(topic, subscriberId), state.toByteArray(), -1,
-                new SafeAsyncZKCallback.StatCallback() {
-                    @Override
-                    public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
-                        if (rc != Code.OK.intValue()) {
-                            KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
-                                    + " subscriberId: " + subscriberId.toStringUtf8()
-                                    + " could not set subscription state: " + SubscriptionStateUtils.toString(state),
-                                    path, rc);
-                            callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                        } else {
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("Successfully updated subscription for topic: " + topic.toStringUtf8()
-                                        + " subscriberId: " + subscriberId.toStringUtf8() + " state: "
-                                        + SubscriptionStateUtils.toString(state));
-                            }
-
-                            callback.operationFinished(ctx, null);
-                        }
+        new SafeAsyncZKCallback.StatCallback() {
+            @Override
+            public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
+                if (rc != Code.OK.intValue()) {
+                    KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
+                                        + " subscriberId: " + subscriberId.toStringUtf8()
+                                        + " could not set subscription state: " + SubscriptionStateUtils.toString(state),
+                                        path, rc);
+                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+                } else {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Successfully updated subscription for topic: " + topic.toStringUtf8()
+                                     + " subscriberId: " + subscriberId.toStringUtf8() + " state: "
+                                     + SubscriptionStateUtils.toString(state));
                     }
-                }, ctx);
+
+                    callback.operationFinished(ctx, null);
+                }
+            }
+        }, ctx);
     }
 
     @Override
     protected void deleteSubscriptionState(final ByteString topic, final ByteString subscriberId,
-            final Callback<Void> callback, final Object ctx) {
+                                           final Callback<Void> callback, final Object ctx) {
         zk.delete(topicSubscriberPath(topic, subscriberId), -1, new SafeAsyncZKCallback.VoidCallback() {
             @Override
             public void safeProcessResult(int rc, String path, Object ctx) {
                 if (rc == Code.OK.intValue()) {
                     if (logger.isDebugEnabled()) {
                         logger.debug("Successfully deleted subscription for topic: " + topic.toStringUtf8()
-                                + " subscriberId: " + subscriberId.toStringUtf8());
+                                     + " subscriberId: " + subscriberId.toStringUtf8());
                     }
 
                     callback.operationFinished(ctx, null);
@@ -217,7 +217,7 @@ public class ZkSubscriptionManager exten
                 }
 
                 KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
-                        + " subscriberId: " + subscriberId.toStringUtf8() + " failed to delete subscription", path, rc);
+                                    + " subscriberId: " + subscriberId.toStringUtf8() + " failed to delete subscription", path, rc);
                 callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
             }
         }, ctx);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java Mon Sep  5 17:38:57 2011
@@ -60,8 +60,8 @@ public abstract class AbstractTopicManag
     private class GetOwnerOp extends TopicOpQueuer.AsynchronousOp<HedwigSocketAddress> {
         public boolean shouldClaim;
 
-        public GetOwnerOp(final ByteString topic, boolean shouldClaim, 
-                final Callback<HedwigSocketAddress> cb, Object ctx) {
+        public GetOwnerOp(final ByteString topic, boolean shouldClaim,
+                          final Callback<HedwigSocketAddress> cb, Object ctx) {
             queuer.super(topic, cb, ctx);
             this.shouldClaim = shouldClaim;
         }
@@ -119,7 +119,7 @@ public abstract class AbstractTopicManag
                                 @Override
                                 public void operationFailed(Object ctx, PubSubException exception) {
                                     logger.error("failure that should never happen when periodically releasing topic "
-                                            + topic, exception);
+                                                 + topic, exception);
                                 }
 
                                 @Override
@@ -157,7 +157,7 @@ public abstract class AbstractTopicManag
 
     @Override
     public final void getOwner(ByteString topic, boolean shouldClaim,
-            Callback<HedwigSocketAddress> cb, Object ctx) {
+                               Callback<HedwigSocketAddress> cb, Object ctx) {
         queuer.pushAndMaybeRun(topic, new GetOwnerOp(topic, shouldClaim, cb, ctx));
     }
 
@@ -173,10 +173,10 @@ public abstract class AbstractTopicManag
      * choosing this hub as the owner, the {@code
      * AbstractTopicManager#notifyListenersAndAddToOwnedTopics(ByteString,
      * OperationCallback, Object)} method must be called.
-     * 
+     *
      */
     protected abstract void realGetOwner(ByteString topic, boolean shouldClaim,
-            Callback<HedwigSocketAddress> cb, Object ctx);
+                                         Callback<HedwigSocketAddress> cb, Object ctx);
 
     /**
      * The method should do any cleanup necessary to indicate to other hubs that

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java Mon Sep  5 17:38:57 2011
@@ -30,13 +30,13 @@ import org.apache.hedwig.util.HedwigSock
  * claim responsibilities for the topics that were at the failed host. On
  * claiming responsibility for a topic, a host should call its
  * {@link TopicOwnershipChangeListener}.
- * 
+ *
  */
 
 public interface TopicManager {
     /**
      * Get the name of the host responsible for the given topic.
-     * 
+     *
      * @param topic
      *            The topic whose owner to get.
      * @param cb
@@ -45,8 +45,8 @@ public interface TopicManager {
      * @throws ServiceDownException
      *             If there is an error looking up the information
      */
-    public void getOwner(ByteString topic, boolean shouldClaim, 
-            Callback<HedwigSocketAddress> cb, Object ctx);
+    public void getOwner(ByteString topic, boolean shouldClaim,
+                         Callback<HedwigSocketAddress> cb, Object ctx);
 
     /**
      * Whenever the topic manager finds out that the set of topics owned by this
@@ -59,14 +59,14 @@ public interface TopicManager {
      * immediately to such notifications, and with no blocking (because multiple
      * listeners might need to be informed and they are all informed by the same
      * thread).
-     * 
+     *
      * @param listener
      */
     public void addTopicOwnershipChangeListener(TopicOwnershipChangeListener listener);
 
     /**
      * Give up ownership of a topic. If I don't own it, do nothing.
-     * 
+     *
      * @throws ServiceDownException
      *             If there is an error in claiming responsibility for the topic
      */

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java Mon Sep  5 17:38:57 2011
@@ -33,8 +33,8 @@ public class TrivialOwnAllTopicManager e
     }
 
     @Override
-    protected void realGetOwner(ByteString topic, boolean shouldClaim, 
-            Callback<HedwigSocketAddress> cb, Object ctx) {
+    protected void realGetOwner(ByteString topic, boolean shouldClaim,
+                                Callback<HedwigSocketAddress> cb, Object ctx) {
 
         if (topics.contains(topic)) {
             cb.operationFinished(ctx, addr);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java Mon Sep  5 17:38:57 2011
@@ -47,13 +47,13 @@ import org.apache.hedwig.zookeeper.SafeA
 
 /**
  * Topics are operated on in parallel as they are independent.
- * 
+ *
  */
 public class ZkTopicManager extends AbstractTopicManager implements TopicManager {
 
     static Logger logger = Logger.getLogger(ZkTopicManager.class);
     Random rand = new Random();
-    
+
     /**
      * Persistent storage for topic metadata.
      */
@@ -75,7 +75,7 @@ public class ZkTopicManager extends Abst
 
     /**
      * Create a new topic manager. Pass in an active ZooKeeper client object.
-     * 
+     *
      * @param zk
      */
     public ZkTopicManager(final ZooKeeper zk, final ServerConfiguration cfg, ScheduledExecutorService scheduler)
@@ -93,18 +93,18 @@ public class ZkTopicManager extends Abst
                         logger.warn("ZK client has been disconnected to the ZK server!");
                         isSuspended = true;
                     } else if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
-			if (isSuspended) {
-	                    logger.info("ZK client has been reconnected to the ZK server!");
-			}
-			isSuspended = false;
+                        if (isSuspended) {
+                            logger.info("ZK client has been reconnected to the ZK server!");
+                        }
+                        isSuspended = false;
                     }
-		}
-		// Check for expired connection.
+                }
+                // Check for expired connection.
                 if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
                     logger.error("ZK client connection to the ZK server has expired!");
                     System.exit(1);
-                }             
-	    }
+                }
+            }
         });
         final SynchronousQueue<Either<Void, PubSubException>> queue = new SynchronousQueue<Either<Void, PubSubException>>();
 
@@ -132,41 +132,41 @@ public class ZkTopicManager extends Abst
     void registerWithZookeeper(final Callback<Void> callback, Object ctx) {
 
         ZkUtils.createFullPathOptimistic(zk, ephemeralNodePath, getCurrentLoadData(), Ids.OPEN_ACL_UNSAFE,
-                CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() {
+        CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() {
+
+            @Override
+            public void safeProcessResult(int rc, String path, Object ctx, String name) {
+                if (rc == Code.OK.intValue()) {
+                    callback.operationFinished(ctx, null);
+                    return;
+                }
+                if (rc != Code.NODEEXISTS.intValue()) {
+                    KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+                                             "Could not create ephemeral node to register hub", ephemeralNodePath, rc);
+                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+                    return;
+                }
+
+                logger.info("Found stale ephemeral node while registering hub with ZK, deleting it");
 
+                // Node exists, lets try to delete it and retry
+                zk.delete(ephemeralNodePath, -1, new SafeAsyncZKCallback.VoidCallback() {
                     @Override
-                    public void safeProcessResult(int rc, String path, Object ctx, String name) {
-                        if (rc == Code.OK.intValue()) {
-                            callback.operationFinished(ctx, null);
-                            return;
-                        }
-                        if (rc != Code.NODEEXISTS.intValue()) {
-                            KeeperException ke = ZkUtils.logErrorAndCreateZKException(
-                                    "Could not create ephemeral node to register hub", ephemeralNodePath, rc);
-                            callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+                    public void safeProcessResult(int rc, String path, Object ctx) {
+                        if (rc == Code.OK.intValue() || rc == Code.NONODE.intValue()) {
+                            registerWithZookeeper(callback, ctx);
                             return;
                         }
-
-                        logger.info("Found stale ephemeral node while registering hub with ZK, deleting it");
-
-                        // Node exists, lets try to delete it and retry
-                        zk.delete(ephemeralNodePath, -1, new SafeAsyncZKCallback.VoidCallback() {
-                            @Override
-                            public void safeProcessResult(int rc, String path, Object ctx) {
-                                if (rc == Code.OK.intValue() || rc == Code.NONODE.intValue()) {
-                                    registerWithZookeeper(callback, ctx);
-                                    return;
-                                }
-                                KeeperException ke = ZkUtils.logErrorAndCreateZKException(
-                                        "Could not delete stale ephemeral node to register hub", ephemeralNodePath, rc);
-                                callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
-                                return;
-
-                            }
-                        }, ctx);
+                        KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+                                                 "Could not delete stale ephemeral node to register hub", ephemeralNodePath, rc);
+                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+                        return;
 
                     }
-                }, null);
+                }, ctx);
+
+            }
+        }, null);
     }
 
     String hubPath(ByteString topic) {
@@ -175,12 +175,12 @@ public class ZkTopicManager extends Abst
 
     @Override
     protected void realGetOwner(final ByteString topic, final boolean shouldClaim,
-            final Callback<HedwigSocketAddress> cb, final Object ctx) {
+                                final Callback<HedwigSocketAddress> cb, final Object ctx) {
         // If operations are suspended due to a ZK client disconnect, just error
         // out this call and return.
         if (isSuspended) {
             cb.operationFailed(ctx, new PubSubException.ServiceDownException(
-                    "ZKTopicManager service is temporarily suspended!"));
+                                   "ZKTopicManager service is temporarily suspended!"));
             return;
         }
 
@@ -217,7 +217,7 @@ public class ZkTopicManager extends Abst
                 public void safeProcessResult(int rc, String path, Object ctx, List<String> children) {
                     if (rc != Code.OK.intValue()) {
                         KeeperException e = ZkUtils.logErrorAndCreateZKException(
-                                "Could not get list of available hubs", path, rc);
+                                                "Could not get list of available hubs", path, rc);
                         cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
                         return;
                     }
@@ -238,8 +238,8 @@ public class ZkTopicManager extends Abst
                         if (rc == KeeperException.Code.OK.intValue()) {
                             try {
                                 int load = Integer.parseInt(new String(data));
-                                if (logger.isDebugEnabled()){
-                                	logger.debug("Found server: " + ctx + " with load: " + load);
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug("Found server: " + ctx + " with load: " + load);
                                 }
                                 if (load < minLoad  || (load == minLoad && rand.nextBoolean())) {
                                     minLoad = load;
@@ -256,7 +256,7 @@ public class ZkTopicManager extends Abst
                         if (numResponses == children.size()) {
                             if (leastLoaded == null) {
                                 cb.operationFailed(ZkGetOwnerOp.this.ctx, new PubSubException.ServiceDownException(
-                                        "No hub available"));
+                                                       "No hub available"));
                                 return;
                             }
                             HedwigSocketAddress owner = new HedwigSocketAddress(leastLoaded);
@@ -273,7 +273,7 @@ public class ZkTopicManager extends Abst
 
             for (String child : children) {
                 zk.getData(cfg.getZkHostsPrefix(new StringBuilder()).append("/").append(child).toString(), false,
-                        dataCallback, child);
+                           dataCallback, child);
             }
         }
 
@@ -296,7 +296,7 @@ public class ZkTopicManager extends Abst
 
                     if (rc != Code.OK.intValue()) {
                         KeeperException e = ZkUtils.logErrorAndCreateZKException("Could not read ownership for topic: "
-                                + topic.toStringUtf8(), path, rc);
+                                            + topic.toStringUtf8(), path, rc);
                         cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
                         return;
                     }
@@ -324,7 +324,7 @@ public class ZkTopicManager extends Abst
                                 claimOrChoose();
                             } else {
                                 KeeperException e = ZkUtils.logErrorAndCreateZKException(
-                                        "Could not delete self node for topic: " + topic.toStringUtf8(), path, rc);
+                                                        "Could not delete self node for topic: " + topic.toStringUtf8(), path, rc);
                                 cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
                             }
                         }
@@ -339,26 +339,26 @@ public class ZkTopicManager extends Abst
             }
 
             ZkUtils.createFullPathOptimistic(zk, hubPath, addr.toString().getBytes(), Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() {
+            CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() {
 
-                        @Override
-                        public void safeProcessResult(int rc, String path, Object ctx, String name) {
-                            if (rc == Code.OK.intValue()) {
-                                if (logger.isDebugEnabled()) {
-                                    logger.debug("claimed topic: " + topic.toStringUtf8());
-                                }
-                                notifyListenersAndAddToOwnedTopics(topic, cb, ctx);
-                                updateLoadInformation();
-                            } else if (rc == Code.NODEEXISTS.intValue()) {
-                                read();
-                            } else {
-                                KeeperException e = ZkUtils.logErrorAndCreateZKException(
-                                        "Failed to create ephemeral node to claim ownership of topic: "
-                                                + topic.toStringUtf8(), path, rc);
-                                cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                            }
+                @Override
+                public void safeProcessResult(int rc, String path, Object ctx, String name) {
+                    if (rc == Code.OK.intValue()) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("claimed topic: " + topic.toStringUtf8());
                         }
-                    }, ctx);
+                        notifyListenersAndAddToOwnedTopics(topic, cb, ctx);
+                        updateLoadInformation();
+                    } else if (rc == Code.NODEEXISTS.intValue()) {
+                        read();
+                    } else {
+                        KeeperException e = ZkUtils.logErrorAndCreateZKException(
+                                                "Failed to create ephemeral node to claim ownership of topic: "
+                                                + topic.toStringUtf8(), path, rc);
+                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+                    }
+                }
+            }, ctx);
         }
 
     }
@@ -370,10 +370,10 @@ public class ZkTopicManager extends Abst
     }
 
     void updateLoadInformation() {
-    	byte[] currentLoad = getCurrentLoadData();
-    	if (logger.isDebugEnabled()){
-    		logger.debug("Reporting load of " + new String(currentLoad));
-    	}
+        byte[] currentLoad = getCurrentLoadData();
+        if (logger.isDebugEnabled()) {
+            logger.debug("Reporting load of " + new String(currentLoad));
+        }
         zk.setData(ephemeralNodePath, currentLoad, -1, loadReportingStatCallback, null);
     }
 
@@ -393,7 +393,7 @@ public class ZkTopicManager extends Abst
 
                 if (rc != Code.OK.intValue()) {
                     KeeperException e = ZkUtils.logErrorAndCreateZKException(
-                            "Failed to delete self-ownership node for topic: " + topic.toStringUtf8(), path, rc);
+                                            "Failed to delete self-ownership node for topic: " + topic.toStringUtf8(), path, rc);
                     cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
                     return;
                 }
@@ -401,7 +401,7 @@ public class ZkTopicManager extends Abst
                 HedwigSocketAddress owner = new HedwigSocketAddress(new String(data));
                 if (!owner.equals(addr)) {
                     logger.warn("Wanted to delete self-node for topic: " + topic.toStringUtf8() + " but node for "
-                            + owner + " found, leaving untouched");
+                                + owner + " found, leaving untouched");
                     // Not our node, someone else's, leave it alone
                     cb.operationFinished(ctx, null);
                     return;
@@ -412,8 +412,8 @@ public class ZkTopicManager extends Abst
                     public void safeProcessResult(int rc, String path, Object ctx) {
                         if (rc != Code.OK.intValue() && rc != Code.NONODE.intValue()) {
                             KeeperException e = ZkUtils
-                                    .logErrorAndCreateZKException("Failed to delete self-ownership node for topic: "
-                                            + topic.toStringUtf8(), path, rc);
+                                                .logErrorAndCreateZKException("Failed to delete self-ownership node for topic: "
+                                                        + topic.toStringUtf8(), path, rc);
                             cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
                             return;
                         }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsynBKCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsynBKCallback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsynBKCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsynBKCallback.java Mon Sep  5 17:38:57 2011
@@ -24,81 +24,81 @@ import org.apache.bookkeeper.client.Ledg
 import org.apache.bookkeeper.client.LedgerHandle;
 
 
-public class SafeAsynBKCallback extends SafeAsyncCallback{
+public class SafeAsynBKCallback extends SafeAsyncCallback {
 
     public static abstract class OpenCallback implements AsyncCallback.OpenCallback {
         @Override
         public void openComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
-            try{
+            try {
                 safeOpenComplete(rc, ledgerHandle, ctx);
-            }catch(Throwable t){
+            } catch(Throwable t) {
                 invokeUncaughtExceptionHandler(t);
             }
         }
-        
+
         public abstract void safeOpenComplete(int rc, LedgerHandle ledgerHandle, Object ctx);
 
     }
-    
+
     public static abstract class CloseCallback implements AsyncCallback.CloseCallback {
         @Override
-        public void closeComplete(int rc, LedgerHandle ledgerHandle, Object ctx){
-            try{
+        public void closeComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
+            try {
                 safeCloseComplete(rc, ledgerHandle, ctx);
-            }catch(Throwable t){
+            } catch(Throwable t) {
                 invokeUncaughtExceptionHandler(t);
             }
         }
-        
+
         public abstract void safeCloseComplete(int rc, LedgerHandle ledgerHandle, Object ctx) ;
     }
-    
+
     public static abstract class ReadCallback implements AsyncCallback.ReadCallback {
-        
+
         @Override
         public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
-            try{
+            try {
                 safeReadComplete(rc, lh, seq, ctx);
-            }catch(Throwable t){
+            } catch(Throwable t) {
                 invokeUncaughtExceptionHandler(t);
             }
-            
+
         }
-        
+
         public abstract void safeReadComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx);
     }
-    
+
     public static abstract class CreateCallback implements AsyncCallback.CreateCallback {
-        
+
         @Override
         public void createComplete(int rc, LedgerHandle lh, Object ctx) {
-            try{
+            try {
                 safeCreateComplete(rc, lh, ctx);
-            }catch(Throwable t){
+            } catch(Throwable t) {
                 invokeUncaughtExceptionHandler(t);
             }
-            
+
         }
-        
+
         public abstract void safeCreateComplete(int rc, LedgerHandle lh, Object ctx);
-        
-        
+
+
     }
-    
+
     public static abstract class AddCallback implements AsyncCallback.AddCallback {
-        
+
         @Override
         public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
-            try{
+            try {
                 safeAddComplete(rc, lh, entryId, ctx);
-            }catch(Throwable t){
+            } catch(Throwable t) {
                 invokeUncaughtExceptionHandler(t);
             }
         }
-        
+
         public abstract void safeAddComplete(int rc, LedgerHandle lh, long entryId, Object ctx);
-        
+
     }
-    
+
 }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncZKCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncZKCallback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncZKCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncZKCallback.java Mon Sep  5 17:38:57 2011
@@ -23,7 +23,7 @@ import org.apache.zookeeper.AsyncCallbac
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
-public class SafeAsyncZKCallback extends SafeAsyncCallback{
+public class SafeAsyncZKCallback extends SafeAsyncCallback {
     public static abstract class StatCallback implements AsyncCallback.StatCallback {
         public void processResult(int rc, String path, Object ctx, Stat stat) {
             try {