You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:05 UTC

[27/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/CRC32OutputStream.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/CRC32OutputStream.java b/jstorm-core/src/main/java/backtype/storm/utils/CRC32OutputStream.java
index 7d5ce73..483f0df 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/CRC32OutputStream.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/CRC32OutputStream.java
@@ -23,11 +23,11 @@ import java.util.zip.CRC32;
 
 public class CRC32OutputStream extends OutputStream {
     private CRC32 hasher;
-    
+
     public CRC32OutputStream() {
         hasher = new CRC32();
     }
-    
+
     public long getValue() {
         return hasher.getValue();
     }
@@ -40,5 +40,5 @@ public class CRC32OutputStream extends OutputStream {
     @Override
     public void write(byte[] bytes, int start, int end) throws IOException {
         hasher.update(bytes, start, end);
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ClojureTimerTask.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ClojureTimerTask.java b/jstorm-core/src/main/java/backtype/storm/utils/ClojureTimerTask.java
index ca9b010..677cf60 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/ClojureTimerTask.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/ClojureTimerTask.java
@@ -22,14 +22,14 @@ import java.util.TimerTask;
 
 public class ClojureTimerTask extends TimerTask {
     IFn _afn;
-    
+
     public ClojureTimerTask(IFn afn) {
         super();
         _afn = afn;
     }
-    
+
     @Override
     public void run() {
         _afn.run();
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/Container.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/Container.java b/jstorm-core/src/main/java/backtype/storm/utils/Container.java
index d4edcdf..0927e7c 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/Container.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/Container.java
@@ -20,5 +20,5 @@ package backtype.storm.utils;
 import java.io.Serializable;
 
 public class Container implements Serializable {
-  public Object object;
+    public Object object;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/DRPCClient.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/DRPCClient.java b/jstorm-core/src/main/java/backtype/storm/utils/DRPCClient.java
index b2a2a7d..03ede66 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/DRPCClient.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/DRPCClient.java
@@ -46,15 +46,15 @@ public class DRPCClient extends ThriftClient implements DistributedRPC.Iface {
         this.port = port;
         this.client = new DistributedRPC.Client(_protocol);
     }
-        
+
     public String getHost() {
         return host;
     }
-    
+
     public int getPort() {
         return port;
     }
-    
+
     public String execute(String func, String args) throws TException, DRPCExecutionException, AuthorizationException {
         return client.execute(func, args);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueue.java b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueue.java
index 94768e6..330a5c6 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueue.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueue.java
@@ -32,13 +32,13 @@ public abstract class DisruptorQueue implements IStatefulObject {
     public static void setUseSleep(boolean useSleep) {
         DisruptorQueueImpl.setUseSleep(useSleep);
     }
-    
+
     private static boolean CAPACITY_LIMITED = false;
-    
+
     public static void setLimited(boolean limited) {
         CAPACITY_LIMITED = limited;
     }
-    
+
     public static DisruptorQueue mkInstance(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
         if (CAPACITY_LIMITED == true) {
             return new DisruptorQueueImpl(queueName, producerType, bufferSize, wait);
@@ -46,35 +46,35 @@ public abstract class DisruptorQueue implements IStatefulObject {
             return new DisruptorWrapBlockingQueue(queueName, producerType, bufferSize, wait);
         }
     }
-    
+
     public abstract String getName();
-    
+
     public abstract void haltWithInterrupt();
-    
+
     public abstract Object poll();
-    
+
     public abstract Object take();
-    
+
     public abstract void consumeBatch(EventHandler<Object> handler);
-    
+
     public abstract void consumeBatchWhenAvailable(EventHandler<Object> handler);
-    
+
     public abstract void publish(Object obj);
-    
+
     public abstract void publish(Object obj, boolean block) throws InsufficientCapacityException;
-    
+
     public abstract void consumerStarted();
-    
+
     public abstract void clear();
-    
+
     public abstract long population();
-    
+
     public abstract long capacity();
-    
+
     public abstract long writePos();
-    
+
     public abstract long readPos();
-    
+
     public abstract float pctFull();
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java
index 58d8313..2941cc9 100644
--- a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java
@@ -45,30 +45,30 @@ import com.lmax.disruptor.dsl.ProducerType;
 public class DisruptorQueueImpl extends DisruptorQueue {
     private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueueImpl.class);
     static boolean useSleep = true;
-    
+
     public static void setUseSleep(boolean useSleep) {
         AbstractSequencerExt.setWaitSleep(useSleep);
     }
-    
+
     private static final Object FLUSH_CACHE = new Object();
     private static final Object INTERRUPT = new Object();
     private static final String PREFIX = "disruptor-";
-    
+
     private final String _queueName;
     private final RingBuffer<MutableObject> _buffer;
     private final Sequence _consumer;
     private final SequenceBarrier _barrier;
-    
+
     // TODO: consider having a threadlocal cache of this variable to speed up
     // reads?
     volatile boolean consumerStartedFlag = false;
-    
+
     private final HashMap<String, Object> state = new HashMap<String, Object>(4);
     private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
     private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
     private final Lock readLock = cacheLock.readLock();
     private final Lock writeLock = cacheLock.writeLock();
-    
+
     public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
         this._queueName = PREFIX + queueName;
         _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
@@ -89,19 +89,19 @@ public class DisruptorQueueImpl extends DisruptorQueue {
             }
         }
     }
-    
+
     public String getName() {
         return _queueName;
     }
-    
+
     public void consumeBatch(EventHandler<Object> handler) {
         consumeBatchToCursor(_barrier.getCursor(), handler);
     }
-    
+
     public void haltWithInterrupt() {
         publish(INTERRUPT);
     }
-    
+
     public Object poll() {
         // @@@
         // should use _cache.isEmpty, but it is slow
@@ -109,7 +109,7 @@ public class DisruptorQueueImpl extends DisruptorQueue {
         if (consumerStartedFlag == false) {
             return _cache.poll();
         }
-        
+
         final long nextSequence = _consumer.get() + 1;
         if (nextSequence <= _barrier.getCursor()) {
             MutableObject mo = _buffer.get(nextSequence);
@@ -120,7 +120,7 @@ public class DisruptorQueueImpl extends DisruptorQueue {
         }
         return null;
     }
-    
+
     public Object take() {
         // @@@
         // should use _cache.isEmpty, but it is slow
@@ -128,7 +128,7 @@ public class DisruptorQueueImpl extends DisruptorQueue {
         if (consumerStartedFlag == false) {
             return _cache.poll();
         }
-        
+
         final long nextSequence = _consumer.get() + 1;
         // final long availableSequence;
         try {
@@ -141,7 +141,7 @@ public class DisruptorQueueImpl extends DisruptorQueue {
             // throw new RuntimeException(e);
             return null;
         } catch (TimeoutException e) {
-            //LOG.error(e.getCause(), e);
+            // LOG.error(e.getCause(), e);
             return null;
         }
         MutableObject mo = _buffer.get(nextSequence);
@@ -150,7 +150,7 @@ public class DisruptorQueueImpl extends DisruptorQueue {
         mo.setObject(null);
         return ret;
     }
-    
+
     public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
         try {
             final long nextSequence = _consumer.get() + 1;
@@ -165,11 +165,11 @@ public class DisruptorQueueImpl extends DisruptorQueue {
             LOG.error("InterruptedException " + e.getCause());
             return;
         } catch (TimeoutException e) {
-            //LOG.error(e.getCause(), e);
+            // LOG.error(e.getCause(), e);
             return;
         }
     }
-    
+
     public void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
         for (long curr = _consumer.get() + 1; curr <= cursor; curr++) {
             try {
@@ -202,7 +202,7 @@ public class DisruptorQueueImpl extends DisruptorQueue {
         // TODO: only set this if the consumer cursor has changed?
         _consumer.set(cursor);
     }
-    
+
     /*
      * Caches until consumerStarted is called, upon which the cache is flushed to the consumer
      */
@@ -213,15 +213,15 @@ public class DisruptorQueueImpl extends DisruptorQueue {
             throw new RuntimeException("This code should be unreachable!");
         }
     }
-    
+
     public void tryPublish(Object obj) throws InsufficientCapacityException {
         publish(obj, false);
     }
-    
+
     public void publish(Object obj, boolean block) throws InsufficientCapacityException {
-        
+
         boolean publishNow = consumerStartedFlag;
-        
+
         if (!publishNow) {
             readLock.lock();
             try {
@@ -233,12 +233,12 @@ public class DisruptorQueueImpl extends DisruptorQueue {
                 readLock.unlock();
             }
         }
-        
+
         if (publishNow) {
             publishDirect(obj, block);
         }
     }
-    
+
     protected void publishDirect(Object obj, boolean block) throws InsufficientCapacityException {
         final long id;
         if (block) {
@@ -250,41 +250,41 @@ public class DisruptorQueueImpl extends DisruptorQueue {
         m.setObject(obj);
         _buffer.publish(id);
     }
-    
+
     public void consumerStarted() {
-        
+
         writeLock.lock();
         consumerStartedFlag = true;
-        
+
         writeLock.unlock();
     }
-    
+
     public void clear() {
         while (population() != 0L) {
             poll();
         }
     }
-    
+
     public long population() {
         return (writePos() - readPos());
     }
-    
+
     public long capacity() {
         return _buffer.getBufferSize();
     }
-    
+
     public long writePos() {
         return _buffer.getCursor();
     }
-    
+
     public long readPos() {
         return _consumer.get();
     }
-    
+
     public float pctFull() {
         return (1.0F * population() / capacity());
     }
-    
+
     @Override
     public Object getState() {
         // get readPos then writePos so it's never an under-estimate
@@ -296,7 +296,7 @@ public class DisruptorQueueImpl extends DisruptorQueue {
         state.put("read_pos", rp);
         return state;
     }
-    
+
     public static class ObjectEventFactory implements EventFactory<MutableObject> {
         @Override
         public MutableObject newInstance() {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java
index af5618b..5831a97 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java
@@ -36,33 +36,33 @@ import com.lmax.disruptor.dsl.ProducerType;
  */
 public class DisruptorWrapBlockingQueue extends DisruptorQueue {
     private static final Logger LOG = LoggerFactory.getLogger(DisruptorWrapBlockingQueue.class);
-    
+
     private static final long QUEUE_CAPACITY = 512;
     private LinkedBlockingDeque<Object> queue;
-    
+
     private String queueName;
-    
+
     public DisruptorWrapBlockingQueue(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
         this.queueName = queueName;
         queue = new LinkedBlockingDeque<Object>();
     }
-    
+
     public String getName() {
         return queueName;
     }
-    
+
     // poll method
     public void consumeBatch(EventHandler<Object> handler) {
         consumeBatchToCursor(0, handler);
     }
-    
+
     public void haltWithInterrupt() {
     }
-    
+
     public Object poll() {
         return queue.poll();
     }
-    
+
     public Object take() {
         try {
             return queue.take();
@@ -70,7 +70,7 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue {
             return null;
         }
     }
-    
+
     public void drainQueue(Object object, EventHandler<Object> handler) {
         while (object != null) {
             try {
@@ -84,7 +84,7 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue {
             }
         }
     }
-    
+
     public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
         Object object = queue.poll();
         if (object == null) {
@@ -96,16 +96,16 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue {
                 throw new RuntimeException(e);
             }
         }
-        
+
         drainQueue(object, handler);
-        
+
     }
-    
+
     public void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
         Object object = queue.poll();
         drainQueue(object, handler);
     }
-    
+
     /*
      * Caches until consumerStarted is called, upon which the cache is flushed to the consumer
      */
@@ -118,17 +118,17 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue {
             }
             isSuccess = queue.offer(obj);
         }
-        
+
     }
-    
+
     public void tryPublish(Object obj) throws InsufficientCapacityException {
         boolean isSuccess = queue.offer(obj);
         if (isSuccess == false) {
             throw InsufficientCapacityException.INSTANCE;
         }
-        
+
     }
-    
+
     public void publish(Object obj, boolean block) throws InsufficientCapacityException {
         if (block == true) {
             publish(obj);
@@ -136,21 +136,21 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue {
             tryPublish(obj);
         }
     }
-    
+
     public void consumerStarted() {
     }
-    
+
     private void flushCache() {
     }
-    
+
     public void clear() {
         queue.clear();
     }
-    
+
     public long population() {
         return queue.size();
     }
-    
+
     public long capacity() {
         long used = queue.size();
         if (used < QUEUE_CAPACITY) {
@@ -159,15 +159,15 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue {
             return used;
         }
     }
-    
+
     public long writePos() {
         return 0;
     }
-    
+
     public long readPos() {
         return queue.size();
     }
-    
+
     public float pctFull() {
         long used = queue.size();
         if (used < QUEUE_CAPACITY) {
@@ -176,7 +176,7 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue {
             return 1.0f;
         }
     }
-    
+
     @Override
     public Object getState() {
         Map state = new HashMap<String, Object>();
@@ -189,12 +189,12 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue {
         state.put("read_pos", rp);
         return state;
     }
-    
+
     public static class ObjectEventFactory implements EventFactory<MutableObject> {
         @Override
         public MutableObject newInstance() {
             return new MutableObject();
         }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ExtendedThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ExtendedThreadPoolExecutor.java b/jstorm-core/src/main/java/backtype/storm/utils/ExtendedThreadPoolExecutor.java
index 4614366..e68898e 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/ExtendedThreadPoolExecutor.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/ExtendedThreadPoolExecutor.java
@@ -27,41 +27,44 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-public class ExtendedThreadPoolExecutor extends ThreadPoolExecutor{
+public class ExtendedThreadPoolExecutor extends ThreadPoolExecutor {
 
-  public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
-    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
-  }
-
-  public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
-    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
-  }
+    public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+    }
 
-  public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
-    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
-  }
+    public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
+            ThreadFactory threadFactory) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+    }
 
-  public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
-    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
-  }
+    public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
+            RejectedExecutionHandler handler) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
+    }
 
-  @Override
-  protected void afterExecute(Runnable r, Throwable t) {
-    super.afterExecute(r, t);
-    if (t == null && r instanceof Future<?>) {
-      try {
-        Object result = ((Future<?>) r).get();
-      } catch (CancellationException ce) {
-        t = ce;
-      } catch (ExecutionException ee) {
-        t = ee.getCause();
-      } catch (InterruptedException ie) {
-        // If future got interrupted exception, we want to interrupt parent thread itself.
-        Thread.currentThread().interrupt();
-      }
+    public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
+            ThreadFactory threadFactory, RejectedExecutionHandler handler) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
     }
-    if (t != null) {
-      Utils.handleUncaughtException(t);
+
+    @Override
+    protected void afterExecute(Runnable r, Throwable t) {
+        super.afterExecute(r, t);
+        if (t == null && r instanceof Future<?>) {
+            try {
+                Object result = ((Future<?>) r).get();
+            } catch (CancellationException ce) {
+                t = ce;
+            } catch (ExecutionException ee) {
+                t = ee.getCause();
+            } catch (InterruptedException ie) {
+                // If future got interrupted exception, we want to interrupt parent thread itself.
+                Thread.currentThread().interrupt();
+            }
+        }
+        if (t != null) {
+            Utils.handleUncaughtException(t);
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/IndifferentAccessMap.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/IndifferentAccessMap.java b/jstorm-core/src/main/java/backtype/storm/utils/IndifferentAccessMap.java
index c0190cc..2bc6e7d 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/IndifferentAccessMap.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/IndifferentAccessMap.java
@@ -17,7 +17,6 @@
  */
 package backtype.storm.utils;
 
-
 import clojure.lang.ILookup;
 import clojure.lang.ISeq;
 import clojure.lang.AFn;
@@ -65,16 +64,17 @@ public class IndifferentAccessMap extends AFn implements ILookup, IPersistentMap
 
     @Override
     public Object valAt(Object o) {
-        if(o instanceof Keyword) {
+        if (o instanceof Keyword) {
             return valAt(((Keyword) o).getName());
         }
         return getMap().valAt(o);
     }
-    
+
     @Override
     public Object valAt(Object o, Object def) {
         Object ret = valAt(o);
-        if(ret==null) ret = def;
+        if (ret == null)
+            ret = def;
         return ret;
     }
 
@@ -92,30 +92,35 @@ public class IndifferentAccessMap extends AFn implements ILookup, IPersistentMap
     /* IPersistentMap */
     /* Naive implementation, but it might be good enough */
     public IPersistentMap assoc(Object k, Object v) {
-        if(k instanceof Keyword) return assoc(((Keyword) k).getName(), v);
-        
+        if (k instanceof Keyword)
+            return assoc(((Keyword) k).getName(), v);
+
         return new IndifferentAccessMap(getMap().assoc(k, v));
     }
 
     public IPersistentMap assocEx(Object k, Object v) {
-        if(k instanceof Keyword) return assocEx(((Keyword) k).getName(), v);
+        if (k instanceof Keyword)
+            return assocEx(((Keyword) k).getName(), v);
 
         return new IndifferentAccessMap(getMap().assocEx(k, v));
     }
 
     public IPersistentMap without(Object k) {
-        if(k instanceof Keyword) return without(((Keyword) k).getName());
+        if (k instanceof Keyword)
+            return without(((Keyword) k).getName());
 
         return new IndifferentAccessMap(getMap().without(k));
     }
 
     public boolean containsKey(Object k) {
-        if(k instanceof Keyword) return containsKey(((Keyword) k).getName());
+        if (k instanceof Keyword)
+            return containsKey(((Keyword) k).getName());
         return getMap().containsKey(k);
     }
 
     public IMapEntry entryAt(Object k) {
-        if(k instanceof Keyword) return entryAt(((Keyword) k).getName());
+        if (k instanceof Keyword)
+            return entryAt(((Keyword) k).getName());
 
         return getMap().entryAt(k);
     }
@@ -160,17 +165,20 @@ public class IndifferentAccessMap extends AFn implements ILookup, IPersistentMap
     public Collection values() {
         return ((Map) getMap()).values();
     }
-    
+
     /* Not implemented */
     public void clear() {
         throw new UnsupportedOperationException();
     }
+
     public Object put(Object k, Object v) {
         throw new UnsupportedOperationException();
     }
+
     public void putAll(Map m) {
         throw new UnsupportedOperationException();
     }
+
     public Object remove(Object k) {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/InprocMessaging.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/InprocMessaging.java b/jstorm-core/src/main/java/backtype/storm/utils/InprocMessaging.java
index b20c775..03c8e4b 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/InprocMessaging.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/InprocMessaging.java
@@ -25,32 +25,32 @@ public class InprocMessaging {
     private static Map<Integer, LinkedBlockingQueue<Object>> _queues = new HashMap<Integer, LinkedBlockingQueue<Object>>();
     private static final Object _lock = new Object();
     private static int port = 1;
-    
+
     public static int acquireNewPort() {
         int ret;
-        synchronized(_lock) {
+        synchronized (_lock) {
             ret = port;
             port++;
         }
         return ret;
     }
-    
+
     public static void sendMessage(int port, Object msg) {
         getQueue(port).add(msg);
     }
-    
+
     public static Object takeMessage(int port) throws InterruptedException {
         return getQueue(port).take();
     }
 
     public static Object pollMessage(int port) {
-        return  getQueue(port).poll();
-    }    
-    
+        return getQueue(port).poll();
+    }
+
     private static LinkedBlockingQueue<Object> getQueue(int port) {
-        synchronized(_lock) {
-            if(!_queues.containsKey(port)) {
-              _queues.put(port, new LinkedBlockingQueue<Object>());   
+        synchronized (_lock) {
+            if (!_queues.containsKey(port)) {
+                _queues.put(port, new LinkedBlockingQueue<Object>());
             }
             return _queues.get(port);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/KeyedRoundRobinQueue.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/KeyedRoundRobinQueue.java b/jstorm-core/src/main/java/backtype/storm/utils/KeyedRoundRobinQueue.java
index 3cb455d..661c045 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/KeyedRoundRobinQueue.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/KeyedRoundRobinQueue.java
@@ -33,9 +33,9 @@ public class KeyedRoundRobinQueue<V> {
     private int _currIndex = 0;
 
     public void add(Object key, V val) {
-        synchronized(_lock) {
+        synchronized (_lock) {
             Queue<V> queue = _queues.get(key);
-            if(queue==null) {
+            if (queue == null) {
                 queue = new LinkedList<V>();
                 _queues.put(key, queue);
                 _keyOrder.add(key);
@@ -47,14 +47,14 @@ public class KeyedRoundRobinQueue<V> {
 
     public V take() throws InterruptedException {
         _size.acquire();
-        synchronized(_lock) {
+        synchronized (_lock) {
             Object key = _keyOrder.get(_currIndex);
             Queue<V> queue = _queues.get(key);
             V ret = queue.remove();
-            if(queue.isEmpty()) {
+            if (queue.isEmpty()) {
                 _keyOrder.remove(_currIndex);
                 _queues.remove(key);
-                if(_keyOrder.size()==0) {
+                if (_keyOrder.size() == 0) {
                     _currIndex = 0;
                 } else {
                     _currIndex = _currIndex % _keyOrder.size();

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ListDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ListDelegate.java b/jstorm-core/src/main/java/backtype/storm/utils/ListDelegate.java
index 1e091f0..25e6878 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/ListDelegate.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/ListDelegate.java
@@ -25,11 +25,11 @@ import java.util.ListIterator;
 
 public class ListDelegate implements List<Object> {
     private List<Object> _delegate;
-    
+
     public ListDelegate() {
-    	_delegate = new ArrayList<Object>();
+        _delegate = new ArrayList<Object>();
     }
-    
+
     public void setDelegate(List<Object> delegate) {
         _delegate = delegate;
     }
@@ -37,7 +37,7 @@ public class ListDelegate implements List<Object> {
     public List<Object> getDelegate() {
         return _delegate;
     }
-    
+
     @Override
     public int size() {
         return _delegate.size();
@@ -152,5 +152,5 @@ public class ListDelegate implements List<Object> {
     public List<Object> subList(int i, int i1) {
         return _delegate.subList(i, i1);
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/LocalState.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/LocalState.java b/jstorm-core/src/main/java/backtype/storm/utils/LocalState.java
index 0d8292f..843efb4 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/LocalState.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/LocalState.java
@@ -25,12 +25,9 @@ import java.util.HashMap;
 import java.io.IOException;
 
 /**
- * A simple, durable, atomic K/V database. *Very inefficient*, should only be
- * used for occasional reads/writes. Every read/write hits disk.
+ * A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes. Every read/write hits disk.
  * 
- * @@@ 
- * Right now, This class hasn't upgrade to storm's LocalState
- * It is need define every type in thrift, it is too complicated to do
+ * @@@ Right now, This class hasn't upgrade to storm's LocalState It is need define every type in thrift, it is too complicated to do
  */
 public class LocalState {
     private VersionedStore _vs;
@@ -46,8 +43,7 @@ public class LocalState {
             if (latestPath == null)
                 return new HashMap<Object, Object>();
             try {
-                return (Map<Object, Object>) Utils.javaDeserialize(FileUtils
-                        .readFileToByteArray(new File(latestPath)));
+                return (Map<Object, Object>) Utils.javaDeserialize(FileUtils.readFileToByteArray(new File(latestPath)));
             } catch (IOException e) {
                 attempts++;
                 if (attempts >= 10) {
@@ -65,8 +61,7 @@ public class LocalState {
         put(key, val, true);
     }
 
-    public synchronized void put(Object key, Object val, boolean cleanup)
-            throws IOException {
+    public synchronized void put(Object key, Object val, boolean cleanup) throws IOException {
         Map<Object, Object> curr = snapshot();
         curr.put(key, val);
         persist(curr, cleanup);
@@ -76,8 +71,7 @@ public class LocalState {
         remove(key, true);
     }
 
-    public synchronized void remove(Object key, boolean cleanup)
-            throws IOException {
+    public synchronized void remove(Object key, boolean cleanup) throws IOException {
         Map<Object, Object> curr = snapshot();
         curr.remove(key);
         persist(curr, cleanup);
@@ -87,8 +81,7 @@ public class LocalState {
         _vs.cleanup(keepVersions);
     }
 
-    private void persist(Map<Object, Object> val, boolean cleanup)
-            throws IOException {
+    private void persist(Map<Object, Object> val, boolean cleanup) throws IOException {
         byte[] toWrite = Utils.serialize(val);
         String newPath = _vs.createVersion();
         FileUtils.writeByteArrayToFile(new File(newPath), toWrite);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/Monitor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/Monitor.java b/jstorm-core/src/main/java/backtype/storm/utils/Monitor.java
index eb57e99..b725084 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/Monitor.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/Monitor.java
@@ -17,16 +17,17 @@
  */
 package backtype.storm.utils;
 
-import backtype.storm.generated.*;
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.TopologySummary;
 
 import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
 
 /**
  * Deprecated in JStorm
+ * 
  * @author zhongyan.feng
- *
+ * 
  */
 @Deprecated
 public class Monitor {
@@ -106,17 +107,17 @@ public class Monitor {
     /**
      * @@@ Don't be compatible with Storm
      * 
-     * Here skip the logic
+     *     Here skip the logic
      * @param client
      * @param topology
      * @return
      * @throws Exception
      */
-    private HashSet<String> getComponents(Nimbus.Client client, String topology) throws Exception{
+    private HashSet<String> getComponents(Nimbus.Client client, String topology) throws Exception {
         HashSet<String> components = new HashSet<String>();
         ClusterSummary clusterSummary = client.getClusterInfo();
         TopologySummary topologySummary = null;
-        for (TopologySummary ts: clusterSummary.get_topologies()) {
+        for (TopologySummary ts : clusterSummary.get_topologies()) {
             if (topology.equals(ts.get_name())) {
                 topologySummary = ts;
                 break;
@@ -126,12 +127,12 @@ public class Monitor {
             throw new IllegalArgumentException("topology: " + topology + " not found");
         } else {
             String id = topologySummary.get_id();
-//            GetInfoOptions getInfoOpts = new GetInfoOptions();
-//            getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE);
-//            TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts);
-//            for (ExecutorSummary es: info.get_executors()) {
-//                components.add(es.get_component_id());
-//            }
+            // GetInfoOptions getInfoOpts = new GetInfoOptions();
+            // getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE);
+            // TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts);
+            // for (ExecutorSummary es: info.get_executors()) {
+            // components.add(es.get_component_id());
+            // }
         }
         return components;
     }
@@ -161,7 +162,7 @@ public class Monitor {
             throw new IllegalArgumentException("stream name must be something");
         }
 
-        if ( !WATCH_TRANSFERRED.equals(_watch) && !WATCH_EMITTED.equals(_watch)) {
+        if (!WATCH_TRANSFERRED.equals(_watch) && !WATCH_EMITTED.equals(_watch)) {
             throw new IllegalArgumentException("watch item must either be transferred or emitted");
         }
         System.out.println("topology\tcomponent\tparallelism\tstream\ttime-diff ms\t" + _watch + "\tthroughput (Kt/s)");
@@ -189,7 +190,7 @@ public class Monitor {
         boolean streamFound = false;
         ClusterSummary clusterSummary = client.getClusterInfo();
         TopologySummary topologySummary = null;
-        for (TopologySummary ts: clusterSummary.get_topologies()) {
+        for (TopologySummary ts : clusterSummary.get_topologies()) {
             if (_topology.equals(ts.get_name())) {
                 topologySummary = ts;
                 break;
@@ -198,30 +199,30 @@ public class Monitor {
         if (topologySummary == null) {
             throw new IllegalArgumentException("topology: " + _topology + " not found");
         } else {
-//            String id = topologySummary.get_id();
-//            GetInfoOptions getInfoOpts = new GetInfoOptions();
-//            getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE);
-//            TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts);
-//            for (ExecutorSummary es: info.get_executors()) {
-//                if (_component.equals(es.get_component_id())) {
-//                    componentParallelism ++;
-//                    ExecutorStats stats = es.get_stats();
-//                    if (stats != null) {
-//                        Map<String,Map<String,Long>> statted =
-//                                WATCH_EMITTED.equals(_watch) ? stats.get_emitted() : stats.get_transferred();
-//                        if ( statted != null) {
-//                            Map<String, Long> e2 = statted.get(":all-time");
-//                            if (e2 != null) {
-//                                Long stream = e2.get(_stream);
-//                                if (stream != null){
-//                                    streamFound = true;
-//                                    totalStatted += stream;
-//                                }
-//                            }
-//                        }
-//                    }
-//                }
-//            }
+            // String id = topologySummary.get_id();
+            // GetInfoOptions getInfoOpts = new GetInfoOptions();
+            // getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE);
+            // TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts);
+            // for (ExecutorSummary es: info.get_executors()) {
+            // if (_component.equals(es.get_component_id())) {
+            // componentParallelism ++;
+            // ExecutorStats stats = es.get_stats();
+            // if (stats != null) {
+            // Map<String,Map<String,Long>> statted =
+            // WATCH_EMITTED.equals(_watch) ? stats.get_emitted() : stats.get_transferred();
+            // if ( statted != null) {
+            // Map<String, Long> e2 = statted.get(":all-time");
+            // if (e2 != null) {
+            // Long stream = e2.get(_stream);
+            // if (stream != null){
+            // streamFound = true;
+            // totalStatted += stream;
+            // }
+            // }
+            // }
+            // }
+            // }
+            // }
         }
 
         if (componentParallelism <= 0) {
@@ -242,8 +243,9 @@ public class Monitor {
         long stattedDelta = totalStatted - state.getLastStatted();
         state.setLastTime(now);
         state.setLastStatted(totalStatted);
-        double throughput = (stattedDelta == 0 || timeDelta == 0) ? 0.0 : ((double)stattedDelta/(double)timeDelta);
-        System.out.println(_topology+"\t"+_component+"\t"+componentParallelism+"\t"+_stream+"\t"+timeDelta+"\t"+stattedDelta+"\t"+throughput);
+        double throughput = (stattedDelta == 0 || timeDelta == 0) ? 0.0 : ((double) stattedDelta / (double) timeDelta);
+        System.out.println(_topology + "\t" + _component + "\t" + componentParallelism + "\t" + _stream + "\t" + timeDelta + "\t" + stattedDelta + "\t"
+                + throughput);
     }
 
     public void set_interval(int _interval) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/MutableInt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/MutableInt.java b/jstorm-core/src/main/java/backtype/storm/utils/MutableInt.java
index 326ade0..aca3a24 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/MutableInt.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/MutableInt.java
@@ -23,21 +23,21 @@ public class MutableInt {
     public MutableInt(int val) {
         this.val = val;
     }
-    
+
     public void set(int val) {
         this.val = val;
     }
-    
+
     public int get() {
         return val;
     }
-    
+
     public int increment() {
         return increment(1);
     }
-    
+
     public int increment(int amt) {
-        val+=amt;
+        val += amt;
         return val;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/MutableLong.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/MutableLong.java b/jstorm-core/src/main/java/backtype/storm/utils/MutableLong.java
index a744c1c..2f4034e 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/MutableLong.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/MutableLong.java
@@ -23,21 +23,21 @@ public class MutableLong {
     public MutableLong(long val) {
         this.val = val;
     }
-    
+
     public void set(long val) {
         this.val = val;
     }
-    
+
     public long get() {
         return val;
     }
-    
+
     public long increment() {
         return increment(1);
     }
-    
+
     public long increment(long amt) {
-        val+=amt;
+        val += amt;
         return val;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/MutableObject.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/MutableObject.java b/jstorm-core/src/main/java/backtype/storm/utils/MutableObject.java
index d5cb7db..d0f928c 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/MutableObject.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/MutableObject.java
@@ -19,19 +19,19 @@ package backtype.storm.utils;
 
 public class MutableObject {
     Object o = null;
-    
+
     public MutableObject() {
-        
+
     }
 
     public MutableObject(Object o) {
         this.o = o;
     }
-    
+
     public void setObject(Object o) {
         this.o = o;
     }
-    
+
     public Object getObject() {
         return o;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/NimbusClient.java b/jstorm-core/src/main/java/backtype/storm/utils/NimbusClient.java
index 5829b67..ac76439 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/NimbusClient.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/NimbusClient.java
@@ -31,24 +31,24 @@ import backtype.storm.security.auth.ThriftConnectionType;
 
 public class NimbusClient extends ThriftClient {
     private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class);
-    
+
     private Nimbus.Client _client;
     private static String clientVersion = Utils.getVersion();
-    
+
     @SuppressWarnings("unchecked")
     public static NimbusClient getConfiguredClient(Map conf) {
         return getConfiguredClient(conf, null);
     }
-    
+
     @SuppressWarnings("unchecked")
     public static NimbusClient getConfiguredClient(Map conf, Integer timeout) {
         return getConfiguredClientAs(conf, timeout, null);
     }
-    
+
     public static NimbusClient getConfiguredClientAs(Map conf, String asUser) {
         return getConfiguredClientAs(conf, null, asUser);
     }
-    
+
     public static void checkVersion(NimbusClient client) {
         String serverVersion;
         try {
@@ -56,24 +56,24 @@ public class NimbusClient extends ThriftClient {
         } catch (TException e) {
             // TODO Auto-generated catch block
             LOG.warn("Failed to get nimbus version ");
-            return ;
+            return;
         }
         if (!clientVersion.equals(serverVersion)) {
             LOG.warn("Your client version:  " + clientVersion + " but nimbus version: " + serverVersion);
         }
     }
-    
+
     public static NimbusClient getConfiguredClientAs(Map conf, Integer timeout, String asUser) {
         try {
-            if(conf.containsKey(Config.STORM_DO_AS_USER)) {
-                if(asUser != null && !asUser.isEmpty()) {
-                    LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence."
-                            , asUser, conf.get(Config.STORM_DO_AS_USER));
+            if (conf.containsKey(Config.STORM_DO_AS_USER)) {
+                if (asUser != null && !asUser.isEmpty()) {
+                    LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence.", asUser,
+                            conf.get(Config.STORM_DO_AS_USER));
                 }
                 asUser = (String) conf.get(Config.STORM_DO_AS_USER);
             }
-            
-            NimbusClient client = new NimbusClient(conf, null, null, timeout, asUser);  
+
+            NimbusClient client = new NimbusClient(conf, null, null, timeout, asUser);
             checkVersion(client);
             return client;
         } catch (Exception ex) {
@@ -84,24 +84,24 @@ public class NimbusClient extends ThriftClient {
     public NimbusClient(Map conf, String host, int port) throws TTransportException {
         this(conf, host, port, null);
     }
-    
+
     public NimbusClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
         super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, null);
         _client = new Nimbus.Client(_protocol);
     }
-    
+
     public NimbusClient(Map conf, String host, Integer port, Integer timeout, String asUser) throws TTransportException {
         super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, asUser);
         _client = new Nimbus.Client(_protocol);
     }
-    
+
     public NimbusClient(Map conf, String host) throws TTransportException {
         super(conf, ThriftConnectionType.NIMBUS, host, null, null, null);
         _client = new Nimbus.Client(_protocol);
     }
-    
+
     public Nimbus.Client getClient() {
         return _client;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/RegisteredGlobalState.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/RegisteredGlobalState.java b/jstorm-core/src/main/java/backtype/storm/utils/RegisteredGlobalState.java
index 48053fc..fbaf03b 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/RegisteredGlobalState.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/RegisteredGlobalState.java
@@ -21,44 +21,42 @@ import java.util.HashMap;
 import java.util.UUID;
 
 /**
- * This class is used as part of testing Storm. It is used to keep track of "global metrics"
- * in an atomic way. For example, it is used for doing fine-grained detection of when a 
- * local Storm cluster is idle by tracking the number of transferred tuples vs the number of processed
- * tuples.
+ * This class is used as part of testing Storm. It is used to keep track of "global metrics" in an atomic way. For example, it is used for doing fine-grained
+ * detection of when a local Storm cluster is idle by tracking the number of transferred tuples vs the number of processed tuples.
  */
 public class RegisteredGlobalState {
     private static HashMap<String, Object> _states = new HashMap<String, Object>();
     private static final Object _lock = new Object();
-    
+
     public static Object globalLock() {
         return _lock;
     }
-    
+
     public static String registerState(Object init) {
-        synchronized(_lock) {
+        synchronized (_lock) {
             String id = UUID.randomUUID().toString();
             _states.put(id, init);
             return id;
         }
     }
-    
+
     public static void setState(String id, Object init) {
-        synchronized(_lock) {
+        synchronized (_lock) {
             _states.put(id, init);
         }
     }
-    
+
     public static Object getState(String id) {
-        synchronized(_lock) {
+        synchronized (_lock) {
             Object ret = _states.get(id);
-            //System.out.println("State: " + ret.toString());
+            // System.out.println("State: " + ret.toString());
             return ret;
-        }        
+        }
     }
-    
+
     public static void clearState(String id) {
-        synchronized(_lock) {
+        synchronized (_lock) {
             _states.remove(id);
-        }        
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/RotatingMap.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/RotatingMap.java b/jstorm-core/src/main/java/backtype/storm/utils/RotatingMap.java
index 2ed0e33..db62e5c 100644
--- a/jstorm-core/src/main/java/backtype/storm/utils/RotatingMap.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/RotatingMap.java
@@ -24,18 +24,17 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 /**
- * Expires keys that have not been updated in the configured number of seconds.
- * The algorithm used will take between expirationSecs and
- * expirationSecs * (1 + 1 / (numBuckets-1)) to actually expire the message.
- *
+ * Expires keys that have not been updated in the configured number of seconds. The algorithm used will take between expirationSecs and expirationSecs * (1 + 1
+ * / (numBuckets-1)) to actually expire the message.
+ * 
  * get, put, remove, containsKey, and size take O(numBuckets) time to run.
- *
- * The advantage of this design is that the expiration thread only locks the object
- * for O(1) time, meaning the object is essentially always available for gets/puts.
+ * 
+ * The advantage of this design is that the expiration thread only locks the object for O(1) time, meaning the object is essentially always available for
+ * gets/puts.
  */
 @Deprecated
 public class RotatingMap<K, V> {
-    //this default ensures things expire at most 50% past the expiration time
+    // this default ensures things expire at most 50% past the expiration time
     private static final int DEFAULT_NUM_BUCKETS = 3;
 
     public static interface ExpiredCallback<K, V> {
@@ -45,13 +44,13 @@ public class RotatingMap<K, V> {
     private LinkedList<HashMap<K, V>> _buckets;
 
     private ExpiredCallback _callback;
-    
+
     public RotatingMap(int numBuckets, ExpiredCallback<K, V> callback) {
-        if(numBuckets<2) {
+        if (numBuckets < 2) {
             throw new IllegalArgumentException("numBuckets must be >= 2");
         }
         _buckets = new LinkedList<HashMap<K, V>>();
-        for(int i=0; i<numBuckets; i++) {
+        for (int i = 0; i < numBuckets; i++) {
             _buckets.add(new HashMap<K, V>());
         }
 
@@ -64,13 +63,13 @@ public class RotatingMap<K, V> {
 
     public RotatingMap(int numBuckets) {
         this(numBuckets, null);
-    }   
-    
+    }
+
     public Map<K, V> rotate() {
         Map<K, V> dead = _buckets.removeLast();
         _buckets.addFirst(new HashMap<K, V>());
-        if(_callback!=null) {
-            for(Entry<K, V> entry: dead.entrySet()) {
+        if (_callback != null) {
+            for (Entry<K, V> entry : dead.entrySet()) {
                 _callback.expire(entry.getKey(), entry.getValue());
             }
         }
@@ -78,8 +77,8 @@ public class RotatingMap<K, V> {
     }
 
     public boolean containsKey(K key) {
-        for(HashMap<K, V> bucket: _buckets) {
-            if(bucket.containsKey(key)) {
+        for (HashMap<K, V> bucket : _buckets) {
+            if (bucket.containsKey(key)) {
                 return true;
             }
         }
@@ -87,8 +86,8 @@ public class RotatingMap<K, V> {
     }
 
     public V get(K key) {
-        for(HashMap<K, V> bucket: _buckets) {
-            if(bucket.containsKey(key)) {
+        for (HashMap<K, V> bucket : _buckets) {
+            if (bucket.containsKey(key)) {
                 return bucket.get(key);
             }
         }
@@ -99,16 +98,15 @@ public class RotatingMap<K, V> {
         Iterator<HashMap<K, V>> it = _buckets.iterator();
         HashMap<K, V> bucket = it.next();
         bucket.put(key, value);
-        while(it.hasNext()) {
+        while (it.hasNext()) {
             bucket = it.next();
             bucket.remove(key);
         }
     }
-    
-    
+
     public Object remove(K key) {
-        for(HashMap<K, V> bucket: _buckets) {
-            if(bucket.containsKey(key)) {
+        for (HashMap<K, V> bucket : _buckets) {
+            if (bucket.containsKey(key)) {
                 return bucket.remove(key);
             }
         }
@@ -117,9 +115,9 @@ public class RotatingMap<K, V> {
 
     public int size() {
         int size = 0;
-        for(HashMap<K, V> bucket: _buckets) {
-            size+=bucket.size();
+        for (HashMap<K, V> bucket : _buckets) {
+            size += bucket.size();
         }
         return size;
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ServiceRegistry.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ServiceRegistry.java b/jstorm-core/src/main/java/backtype/storm/utils/ServiceRegistry.java
index 724bc3e..92dc2f7 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/ServiceRegistry.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/ServiceRegistry.java
@@ -24,24 +24,24 @@ import java.util.UUID;
 public class ServiceRegistry {
     private static HashMap<String, Object> _services = new HashMap<String, Object>();
     private static final Object _lock = new Object();
-    
+
     public static String registerService(Object service) {
-        synchronized(_lock) {
+        synchronized (_lock) {
             String id = UUID.randomUUID().toString();
             _services.put(id, service);
             return id;
         }
     }
-    
+
     public static Object getService(String id) {
-        synchronized(_lock) {
+        synchronized (_lock) {
             return _services.get(id);
-        }        
+        }
     }
-    
+
     public static void unregisterService(String id) {
-        synchronized(_lock) {
+        synchronized (_lock) {
             _services.remove(id);
-        }        
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ShellProcess.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ShellProcess.java b/jstorm-core/src/main/java/backtype/storm/utils/ShellProcess.java
index 78f47d6..69af852 100644
--- a/jstorm-core/src/main/java/backtype/storm/utils/ShellProcess.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/ShellProcess.java
@@ -37,10 +37,10 @@ import org.slf4j.LoggerFactory;
 public class ShellProcess implements Serializable {
     public static Logger LOG = LoggerFactory.getLogger(ShellProcess.class);
     public static Logger ShellLogger;
-    private Process      _subprocess;
-    private InputStream  processErrorStream;
-    private String[]     command;
-    public ISerializer   serializer;
+    private Process _subprocess;
+    private InputStream processErrorStream;
+    private String[] command;
+    public ISerializer serializer;
     public Number pid;
     public String componentName;
 
@@ -63,9 +63,7 @@ public class ShellProcess implements Serializable {
             serializer.initialize(_subprocess.getOutputStream(), _subprocess.getInputStream());
             this.pid = serializer.connect(conf, context);
         } catch (IOException e) {
-            throw new RuntimeException(
-                    "Error when launching multilang subprocess\n"
-                            + getErrorsString(), e);
+            throw new RuntimeException("Error when launching multilang subprocess\n" + getErrorsString(), e);
         } catch (NoOutputException e) {
             throw new RuntimeException(e + getErrorsString() + "\n");
         }
@@ -73,18 +71,18 @@ public class ShellProcess implements Serializable {
     }
 
     private ISerializer getSerializer(Map conf) {
-        //get factory class name
-        String serializer_className = (String)conf.get(Config.TOPOLOGY_MULTILANG_SERIALIZER);
+        // get factory class name
+        String serializer_className = (String) conf.get(Config.TOPOLOGY_MULTILANG_SERIALIZER);
         LOG.info("Storm multilang serializer: " + serializer_className);
 
         ISerializer serializer = null;
         try {
-            //create a factory class
+            // create a factory class
             Class klass = Class.forName(serializer_className);
-            //obtain a serializer object
+            // obtain a serializer object
             Object obj = klass.newInstance();
-            serializer = (ISerializer)obj;
-        } catch(Exception e) {
+            serializer = (ISerializer) obj;
+        } catch (Exception e) {
             throw new RuntimeException("Failed to construct multilang serializer from serializer " + serializer_className, e);
         }
         return serializer;
@@ -152,7 +150,7 @@ public class ShellProcess implements Serializable {
     }
 
     /**
-     *
+     * 
      * @return pid, if the process has been launched, null otherwise.
      */
     public Number getPid() {
@@ -160,7 +158,7 @@ public class ShellProcess implements Serializable {
     }
 
     /**
-     *
+     * 
      * @return the name of component.
      */
     public String getComponentName() {
@@ -168,13 +166,13 @@ public class ShellProcess implements Serializable {
     }
 
     /**
-     *
+     * 
      * @return exit code of the process if process is terminated, -1 if process is not started or terminated.
      */
     public int getExitCode() {
         try {
             return this._subprocess != null ? this._subprocess.exitValue() : -1;
-        } catch(IllegalThreadStateException e) {
+        } catch (IllegalThreadStateException e) {
             return -1;
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ShellUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ShellUtils.java b/jstorm-core/src/main/java/backtype/storm/utils/ShellUtils.java
index 1065ff9..261cbb7 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/ShellUtils.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/ShellUtils.java
@@ -31,19 +31,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
 abstract public class ShellUtils {
     public static Logger LOG = LoggerFactory.getLogger(ShellUtils.class);
 
     // OSType detection
     public enum OSType {
-        OS_TYPE_LINUX,
-        OS_TYPE_WIN,
-        OS_TYPE_SOLARIS,
-        OS_TYPE_MAC,
-        OS_TYPE_FREEBSD,
-        OS_TYPE_OTHER
+        OS_TYPE_LINUX, OS_TYPE_WIN, OS_TYPE_SOLARIS, OS_TYPE_MAC, OS_TYPE_FREEBSD, OS_TYPE_OTHER
     }
 
     public static final OSType osType = getOSType();
@@ -69,29 +62,27 @@ abstract public class ShellUtils {
     // Helper static vars for each platform
     public static final boolean WINDOWS = (osType == OSType.OS_TYPE_WIN);
     public static final boolean SOLARIS = (osType == OSType.OS_TYPE_SOLARIS);
-    public static final boolean MAC     = (osType == OSType.OS_TYPE_MAC);
+    public static final boolean MAC = (osType == OSType.OS_TYPE_MAC);
     public static final boolean FREEBSD = (osType == OSType.OS_TYPE_FREEBSD);
-    public static final boolean LINUX   = (osType == OSType.OS_TYPE_LINUX);
-    public static final boolean OTHER   = (osType == OSType.OS_TYPE_OTHER);
-
+    public static final boolean LINUX = (osType == OSType.OS_TYPE_LINUX);
+    public static final boolean OTHER = (osType == OSType.OS_TYPE_OTHER);
 
     /** Token separator regex used to parse Shell tool outputs */
-    public static final String TOKEN_SEPARATOR_REGEX
-        = WINDOWS ? "[|\n\r]" : "[ \t\n\r\f]";
+    public static final String TOKEN_SEPARATOR_REGEX = WINDOWS ? "[|\n\r]" : "[ \t\n\r\f]";
 
-    private long    interval;   // refresh interval in msec
-    private long    lastTime;   // last time the command was performed
+    private long interval; // refresh interval in msec
+    private long lastTime; // last time the command was performed
     final private boolean redirectErrorStream; // merge stdout and stderr
     private Map<String, String> environment; // env for the command execution
     private File dir;
     private Process process; // sub process used to execute the command
     private int exitCode;
-    /**Time after which the executing script would be timedout*/
+    /** Time after which the executing script would be timedout */
     protected long timeOutInterval = 0L;
-    /** If or not script timed out*/
+    /** If or not script timed out */
     private AtomicBoolean timedOut;
 
-    /**If or not script finished executing*/
+    /** If or not script finished executing */
     private volatile AtomicBoolean completed;
 
     public ShellUtils() {
@@ -103,23 +94,26 @@ abstract public class ShellUtils {
     }
 
     /**
-     * @param interval the minimum duration to wait before re-executing the
-     *        command.
+     * @param interval the minimum duration to wait before re-executing the command.
      */
     public ShellUtils(long interval, boolean redirectErrorStream) {
         this.interval = interval;
-        this.lastTime = (interval<0) ? 0 : -interval;
+        this.lastTime = (interval < 0) ? 0 : -interval;
         this.redirectErrorStream = redirectErrorStream;
     }
 
-    /** set the environment for the command
+    /**
+     * set the environment for the command
+     * 
      * @param env Mapping of environment variables
      */
     protected void setEnvironment(Map<String, String> env) {
         this.environment = env;
     }
 
-    /** set the working directory
+    /**
+     * set the working directory
+     * 
      * @param dir The directory where the command would be executed
      */
     protected void setWorkingDirectory(File dir) {
@@ -128,23 +122,18 @@ abstract public class ShellUtils {
 
     /** a Unix command to get the current user's groups list */
     public static String[] getGroupsCommand() {
-        return (WINDOWS)? new String[]{"cmd", "/c", "groups"}
-        : new String[]{"bash", "-c", "groups"};
+        return (WINDOWS) ? new String[] { "cmd", "/c", "groups" } : new String[] { "bash", "-c", "groups" };
     }
 
     /**
-     * a Unix command to get a given user's groups list.
-     * If the OS is not WINDOWS, the command will get the user's primary group
-     * first and finally get the groups list which includes the primary group.
-     * i.e. the user's primary group will be included twice.
+     * a Unix command to get a given user's groups list. If the OS is not WINDOWS, the command will get the user's primary group first and finally get the
+     * groups list which includes the primary group. i.e. the user's primary group will be included twice.
      */
     public static String[] getGroupsForUserCommand(final String user) {
-        //'groups username' command return is non-consistent across different unixes
-        return new String [] {"bash", "-c", "id -gn " + user
-                         + "&& id -Gn " + user};
+        // 'groups username' command return is non-consistent across different unixes
+        return new String[] { "bash", "-c", "id -gn " + user + "&& id -Gn " + user };
     }
 
-
     /** check to see if a command needs to be executed and execute if needed */
     protected void run() throws IOException {
         if (lastTime + interval > System.currentTimeMillis())
@@ -174,51 +163,48 @@ abstract public class ShellUtils {
         if (timeOutInterval > 0) {
             timeOutTimer = new Timer("Shell command timeout");
             timeoutTimerTask = new ShellTimeoutTimerTask(this);
-            //One time scheduling.
+            // One time scheduling.
             timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
         }
-        final BufferedReader errReader =
-            new BufferedReader(new InputStreamReader(process
-                                                     .getErrorStream()));
-        BufferedReader inReader =
-            new BufferedReader(new InputStreamReader(process
-                                                     .getInputStream()));
+        final BufferedReader errReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
+        BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
         final StringBuffer errMsg = new StringBuffer();
 
         // read error and input streams as this would free up the buffers
         // free the error stream buffer
         Thread errThread = new Thread() {
-                @Override
-                public void run() {
-                    try {
-                        String line = errReader.readLine();
-                        while((line != null) && !isInterrupted()) {
-                            errMsg.append(line);
-                            errMsg.append(System.getProperty("line.separator"));
-                            line = errReader.readLine();
-                        }
-                    } catch(IOException ioe) {
-                        LOG.warn("Error reading the error stream", ioe);
+            @Override
+            public void run() {
+                try {
+                    String line = errReader.readLine();
+                    while ((line != null) && !isInterrupted()) {
+                        errMsg.append(line);
+                        errMsg.append(System.getProperty("line.separator"));
+                        line = errReader.readLine();
                     }
+                } catch (IOException ioe) {
+                    LOG.warn("Error reading the error stream", ioe);
                 }
-            };
+            }
+        };
         try {
             errThread.start();
-        } catch (IllegalStateException ise) { }
+        } catch (IllegalStateException ise) {
+        }
         try {
             parseExecResult(inReader); // parse the output
             // clear the input stream buffer
             String line = inReader.readLine();
-            while(line != null) {
+            while (line != null) {
                 line = inReader.readLine();
             }
             // wait for the process to finish and check the exit code
-            exitCode  = process.waitFor();
+            exitCode = process.waitFor();
             // make sure that the error thread exits
             joinThread(errThread);
             completed.set(true);
-            //the timeout thread handling
-            //taken care in finally block
+            // the timeout thread handling
+            // taken care in finally block
             if (exitCode != 0) {
                 throw new ExitCodeException(exitCode, errMsg.toString());
             }
@@ -233,10 +219,10 @@ abstract public class ShellUtils {
                 // JDK 7 tries to automatically drain the input streams for us
                 // when the process exits, but since close is not synchronized,
                 // it creates a race if we close the stream first and the same
-                // fd is recycled.  the stream draining thread will attempt to
-                // drain that fd!!  it may block, OOM, or cause bizarre behavior
+                // fd is recycled. the stream draining thread will attempt to
+                // drain that fd!! it may block, OOM, or cause bizarre behavior
                 // see: https://bugs.openjdk.java.net/browse/JDK-8024521
-                //      issue is fixed in build 7u60
+                // issue is fixed in build 7u60
                 InputStream stdout = process.getInputStream();
                 synchronized (stdout) {
                     inReader.close();
@@ -278,10 +264,11 @@ abstract public class ShellUtils {
     protected abstract String[] getExecString();
 
     /** Parse the execution result */
-    protected abstract void parseExecResult(BufferedReader lines)
-        throws IOException;
+    protected abstract void parseExecResult(BufferedReader lines) throws IOException;
 
-    /** get the current sub-process executing the given command
+    /**
+     * get the current sub-process executing the given command
+     * 
      * @return process executing the command
      */
     public Process getProcess() {
@@ -306,18 +293,15 @@ abstract public class ShellUtils {
 
     /**
      * A simple shell command executor.
-     *
-     * <code>ShellCommandExecutor</code>should be used in cases where the output
-     * of the command needs no explicit parsing and where the command, working
-     * directory and the environment remains unchanged. The output of the command
-     * is stored as-is and is expected to be small.
+     * 
+     * <code>ShellCommandExecutor</code>should be used in cases where the output of the command needs no explicit parsing and where the command, working
+     * directory and the environment remains unchanged. The output of the command is stored as-is and is expected to be small.
      */
     public static class ShellCommandExecutor extends ShellUtils {
 
         private String[] command;
         private StringBuffer output;
 
-
         public ShellCommandExecutor(String[] execString) {
             this(execString, null);
         }
@@ -326,27 +310,22 @@ abstract public class ShellUtils {
             this(execString, dir, null);
         }
 
-        public ShellCommandExecutor(String[] execString, File dir,
-                                    Map<String, String> env) {
-            this(execString, dir, env , 0L);
+        public ShellCommandExecutor(String[] execString, File dir, Map<String, String> env) {
+            this(execString, dir, env, 0L);
         }
 
         /**
          * Create a new instance of the ShellCommandExecutor to execute a command.
-         *
+         * 
          * @param execString The command to execute with arguments
-         * @param dir If not-null, specifies the directory which should be set
-         *            as the current working directory for the command.
-         *            If null, the current working directory is not modified.
-         * @param env If not-null, environment of the command will include the
-         *            key-value pairs specified in the map. If null, the current
-         *            environment is not modified.
-         * @param timeout Specifies the time in milliseconds, after which the
-         *                command will be killed and the status marked as timedout.
-         *                If 0, the command will not be timed out.
+         * @param dir If not-null, specifies the directory which should be set as the current working directory for the command. If null, the current working
+         *            directory is not modified.
+         * @param env If not-null, environment of the command will include the key-value pairs specified in the map. If null, the current environment is not
+         *            modified.
+         * @param timeout Specifies the time in milliseconds, after which the command will be killed and the status marked as timedout. If 0, the command will
+         *            not be timed out.
          */
-        public ShellCommandExecutor(String[] execString, File dir,
-                                    Map<String, String> env, long timeout) {
+        public ShellCommandExecutor(String[] execString, File dir, Map<String, String> env, long timeout) {
             command = execString.clone();
             if (dir != null) {
                 setWorkingDirectory(dir);
@@ -357,7 +336,6 @@ abstract public class ShellUtils {
             timeOutInterval = timeout;
         }
 
-
         /** Execute the shell command. */
         public void execute() throws IOException {
             this.run();
@@ -373,21 +351,19 @@ abstract public class ShellUtils {
             output = new StringBuffer();
             char[] buf = new char[512];
             int nRead;
-            while ( (nRead = lines.read(buf, 0, buf.length)) > 0 ) {
+            while ((nRead = lines.read(buf, 0, buf.length)) > 0) {
                 output.append(buf, 0, nRead);
             }
         }
 
-        /** Get the output of the shell command.*/
+        /** Get the output of the shell command. */
         public String getOutput() {
             return (output == null) ? "" : output.toString();
         }
 
         /**
-         * Returns the commands of this instance.
-         * Arguments with spaces in are presented with quotes round; other
-         * arguments are presented raw
-         *
+         * Returns the commands of this instance. Arguments with spaces in are presented with quotes round; other arguments are presented raw
+         * 
          * @return a string representation of the object.
          */
         @Override
@@ -407,9 +383,8 @@ abstract public class ShellUtils {
     }
 
     /**
-     * To check if the passed script to shell command executor timed out or
-     * not.
-     *
+     * To check if the passed script to shell command executor timed out or not.
+     * 
      * @return if the script timed out.
      */
     public boolean isTimedOut() {
@@ -418,52 +393,45 @@ abstract public class ShellUtils {
 
     /**
      * Set if the command has timed out.
-     *
+     * 
      */
     private void setTimedOut() {
         this.timedOut.set(true);
     }
 
-
     /**
-     * Static method to execute a shell command.
-     * Covers most of the simple cases without requiring the user to implement
-     * the <code>Shell</code> interface.
+     * Static method to execute a shell command. Covers most of the simple cases without requiring the user to implement the <code>Shell</code> interface.
+     * 
      * @param cmd shell command to execute.
      * @return the output of the executed command.
      */
-    public static String execCommand(String ... cmd) throws IOException {
+    public static String execCommand(String... cmd) throws IOException {
         return execCommand(null, cmd, 0L);
     }
 
     /**
-     * Static method to execute a shell command.
-     * Covers most of the simple cases without requiring the user to implement
-     * the <code>Shell</code> interface.
+     * Static method to execute a shell command. Covers most of the simple cases without requiring the user to implement the <code>Shell</code> interface.
+     * 
      * @param env the map of environment key=value
      * @param cmd shell command to execute.
      * @param timeout time in milliseconds after which script should be marked timeout
      * @return the output of the executed command.o
      */
 
-    public static String execCommand(Map<String, String> env, String[] cmd,
-                                     long timeout) throws IOException {
-        ShellCommandExecutor exec = new ShellCommandExecutor(cmd, null, env,
-                                                             timeout);
+    public static String execCommand(Map<String, String> env, String[] cmd, long timeout) throws IOException {
+        ShellCommandExecutor exec = new ShellCommandExecutor(cmd, null, env, timeout);
         exec.execute();
         return exec.getOutput();
     }
 
     /**
-     * Static method to execute a shell command.
-     * Covers most of the simple cases without requiring the user to implement
-     * the <code>Shell</code> interface.
+     * Static method to execute a shell command. Covers most of the simple cases without requiring the user to implement the <code>Shell</code> interface.
+     * 
      * @param env the map of environment key=value
      * @param cmd shell command to execute.
      * @return the output of the executed command.
      */
-    public static String execCommand(Map<String,String> env, String ... cmd)
-        throws IOException {
+    public static String execCommand(Map<String, String> env, String... cmd) throws IOException {
         return execCommand(env, cmd, 0L);
     }
 
@@ -484,9 +452,9 @@ abstract public class ShellUtils {
             try {
                 p.exitValue();
             } catch (Exception e) {
-                //Process has not terminated.
-                //So check if it has completed
-                //if not just destroy it.
+                // Process has not terminated.
+                // So check if it has completed
+                // if not just destroy it.
                 if (p != null && !shell.completed.get()) {
                     shell.setTimedOut();
                     p.destroy();

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java b/jstorm-core/src/main/java/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java
index 4aa5556..dd57832 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java
@@ -31,12 +31,9 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko
     private final int linearBaseSleepMs;
 
     /**
-     * The class provides generic exponential-linear backoff retry strategy for
-     * storm. It calculates threshold for exponentially increasing sleeptime
-     * for retries. Beyond this threshold, the sleeptime increase is linear.
-     * Also adds jitter for exponential/linear retry.
-     * It guarantees currSleepTimeMs >= prevSleepTimeMs and 
-     * baseSleepTimeMs <= currSleepTimeMs <= maxSleepTimeMs
+     * The class provides generic exponential-linear backoff retry strategy for storm. It calculates threshold for exponentially increasing sleeptime for
+     * retries. Beyond this threshold, the sleeptime increase is linear. Also adds jitter for exponential/linear retry. It guarantees currSleepTimeMs >=
+     * prevSleepTimeMs and baseSleepTimeMs <= currSleepTimeMs <= maxSleepTimeMs
      */
 
     public StormBoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxSleepTimeMs, int maxRetries) {
@@ -44,17 +41,15 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko
         expRetriesThreshold = 1;
         while ((1 << (expRetriesThreshold + 1)) < ((maxSleepTimeMs - baseSleepTimeMs) / 2))
             expRetriesThreshold++;
-        LOG.info("The baseSleepTimeMs [" + baseSleepTimeMs + "] the maxSleepTimeMs [" + maxSleepTimeMs + "] " +
-                "the maxRetries [" + maxRetries + "]");
+        LOG.info("The baseSleepTimeMs [" + baseSleepTimeMs + "] the maxSleepTimeMs [" + maxSleepTimeMs + "] " + "the maxRetries [" + maxRetries + "]");
         if (baseSleepTimeMs > maxSleepTimeMs) {
-            LOG.warn("Misconfiguration: the baseSleepTimeMs [" + baseSleepTimeMs + "] can't be greater than " +
-                    "the maxSleepTimeMs [" + maxSleepTimeMs + "].");
+            LOG.warn("Misconfiguration: the baseSleepTimeMs [" + baseSleepTimeMs + "] can't be greater than " + "the maxSleepTimeMs [" + maxSleepTimeMs + "].");
         }
-        if( maxRetries > 0 && maxRetries > expRetriesThreshold ) {
+        if (maxRetries > 0 && maxRetries > expRetriesThreshold) {
             this.stepSize = Math.max(1, (maxSleepTimeMs - (1 << expRetriesThreshold)) / (maxRetries - expRetriesThreshold));
         } else {
             this.stepSize = 1;
-	}
+        }
         this.linearBaseSleepMs = super.getBaseSleepTimeMs() + (1 << expRetriesThreshold);
     }
 
@@ -67,8 +62,7 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko
             return sleepTimeMs;
         } else {
             int stepJitter = random.nextInt(stepSize);
-            return Math.min(super.getMaxSleepTimeMs(), (linearBaseSleepMs +
-                    (stepSize * (retryCount - expRetriesThreshold)) + stepJitter));
+            return Math.min(super.getMaxSleepTimeMs(), (linearBaseSleepMs + (stepSize * (retryCount - expRetriesThreshold)) + stepJitter));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/TestUtils.java b/jstorm-core/src/main/java/backtype/storm/utils/TestUtils.java
index 276559c..f905ae4 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/TestUtils.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/TestUtils.java
@@ -25,9 +25,7 @@ import java.util.Map;
 
 public class TestUtils extends Utils {
 
-    public static void testSetupBuilder(CuratorFrameworkFactory.Builder
-            builder, String zkStr, Map conf, ZookeeperAuthInfo auth)
-    {
+    public static void testSetupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth) {
         setupBuilder(builder, zkStr, conf, auth);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ThreadResourceManager.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ThreadResourceManager.java b/jstorm-core/src/main/java/backtype/storm/utils/ThreadResourceManager.java
index e3ab03f..c43ff06 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/ThreadResourceManager.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/ThreadResourceManager.java
@@ -23,14 +23,14 @@ public class ThreadResourceManager<T> {
     public static interface ResourceFactory<X> {
         X makeResource();
     }
-    
+
     ResourceFactory<T> _factory;
     ConcurrentLinkedQueue<T> _resources = new ConcurrentLinkedQueue<T>();
-    
+
     public ThreadResourceManager(ResourceFactory<T> factory) {
         _factory = factory;
     }
-    
+
     public T acquire() {
         T ret = _resources.poll();
         if (ret == null) {
@@ -38,7 +38,7 @@ public class ThreadResourceManager<T> {
         }
         return ret;
     }
-    
+
     public void release(T resource) {
         _resources.add(resource);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ThriftTopologyUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ThriftTopologyUtils.java b/jstorm-core/src/main/java/backtype/storm/utils/ThriftTopologyUtils.java
index 47a48c7..c872721 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/ThriftTopologyUtils.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/ThriftTopologyUtils.java
@@ -17,17 +17,13 @@
  */
 package backtype.storm.utils;
 
+import backtype.storm.generated.*;
+
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.SpoutSpec;
-import backtype.storm.generated.StateSpoutSpec;
-import backtype.storm.generated.StormTopology;
-
 public class ThriftTopologyUtils {
     public static Set<String> getComponentIds(StormTopology topology) {
         Set<String> ret = new HashSet<String>();
@@ -37,7 +33,7 @@ public class ThriftTopologyUtils {
         }
         return ret;
     }
-    
+
     public static Map<String, Object> getComponents(StormTopology topology) {
         Map<String, Object> ret = new HashMap<String, Object>();
         for (StormTopology._Fields f : StormTopology.metaDataMap.keySet()) {
@@ -46,7 +42,7 @@ public class ThriftTopologyUtils {
         }
         return ret;
     }
-    
+
     public static ComponentCommon getComponentCommon(StormTopology topology, String componentId) {
         for (StormTopology._Fields f : StormTopology.metaDataMap.keySet()) {
             Map<String, Object> componentMap = (Map<String, Object>) topology.getFieldValue(f);