You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:08 UTC

[30/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/task/TopologyContext.java b/jstorm-core/src/main/java/backtype/storm/task/TopologyContext.java
index 8124651..e8390f6 100755
--- a/jstorm-core/src/main/java/backtype/storm/task/TopologyContext.java
+++ b/jstorm-core/src/main/java/backtype/storm/task/TopologyContext.java
@@ -21,86 +21,76 @@ import backtype.storm.generated.GlobalStreamId;
 import backtype.storm.generated.Grouping;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.hooks.ITaskHook;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.ReducedMetric;
-import backtype.storm.metric.api.CombinedMetric;
+import backtype.storm.metric.api.*;
 import backtype.storm.state.ISubscribedState;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import com.alibaba.jstorm.cluster.StormClusterState;
 import org.apache.commons.lang.NotImplementedException;
 import org.json.simple.JSONValue;
 
+import java.util.*;
+
 /**
- * A TopologyContext is given to bolts and spouts in their "prepare" and "open"
- * methods, respectively. This object provides information about the component's
+ * A TopologyContext is given to bolts and spouts in their "prepare" and "open" methods, respectively. This object provides information about the component's
  * place within the topology, such as task ids, inputs and outputs, etc.
- *
- * <p>The TopologyContext is also used to declare ISubscribedState objects to
- * synchronize state with StateSpouts this object is subscribed to.</p>
+ * 
+ * <p>
+ * The TopologyContext is also used to declare ISubscribedState objects to synchronize state with StateSpouts this object is subscribed to.
+ * </p>
  */
 public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
     private Integer _taskId;
     private Map<String, Object> _taskData = new HashMap<String, Object>();
     private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
     private Map<String, Object> _executorData;
-    private Map<Integer,Map<Integer, Map<String, IMetric>>> _registeredMetrics;
+    private Map<Integer, Map<Integer, Map<String, IMetric>>> _registeredMetrics;
     private clojure.lang.Atom _openOrPrepareWasCalled;
-
-
-    public TopologyContext(StormTopology topology, Map stormConf,
-            Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
-            Map<String, Map<String, Fields>> componentToStreamToFields,
-            String stormId, String codeDir, String pidDir, Integer taskId,
-            Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources,
-            Map<String, Object> userResources, Map<String, Object> executorData, Map registeredMetrics,
-            clojure.lang.Atom openOrPrepareWasCalled) {
-        super(topology, stormConf, taskToComponent, componentToSortedTasks,
-                componentToStreamToFields, stormId, codeDir, pidDir,
-                workerPort, workerTasks, defaultResources, userResources);
+    private StormClusterState _zkCluster;
+
+    public TopologyContext(StormTopology topology, Map stormConf, Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
+            Map<String, Map<String, Fields>> componentToStreamToFields, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort,
+            List<Integer> workerTasks, Map<String, Object> defaultResources, Map<String, Object> userResources, Map<String, Object> executorData,
+            Map registeredMetrics, clojure.lang.Atom openOrPrepareWasCalled, StormClusterState zkCluster) {
+        super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId, codeDir, pidDir, workerPort, workerTasks,
+                defaultResources, userResources);
         _taskId = taskId;
         _executorData = executorData;
         _registeredMetrics = registeredMetrics;
         _openOrPrepareWasCalled = openOrPrepareWasCalled;
+        _zkCluster = zkCluster;
     }
 
     /**
-     * All state from all subscribed state spouts streams will be synced with
-     * the provided object.
-     *
-     * <p>It is recommended that your ISubscribedState object is kept as an instance
-     * variable of this object. The recommended usage of this method is as follows:</p>
-     *
+     * All state from all subscribed state spouts streams will be synced with the provided object.
+     * 
+     * <p>
+     * It is recommended that your ISubscribedState object is kept as an instance variable of this object. The recommended usage of this method is as follows:
+     * </p>
+     * 
      * <p>
      * _myState = context.setAllSubscribedState(new MyState());
      * </p>
+     * 
      * @param obj Provided ISubscribedState implementation
      * @return Returns the ISubscribedState object provided
      */
     public <T extends ISubscribedState> T setAllSubscribedState(T obj) {
-        //check that only subscribed to one component/stream for statespout
-        //setsubscribedstate appropriately
+        // check that only subscribed to one component/stream for statespout
+        // setsubscribedstate appropriately
         throw new NotImplementedException();
     }
 
-
     /**
-     * Synchronizes the default stream from the specified state spout component
-     * id with the provided ISubscribedState object.
-     *
-     * <p>The recommended usage of this method is as follows:</p>
+     * Synchronizes the default stream from the specified state spout component id with the provided ISubscribedState object.
+     * 
+     * <p>
+     * The recommended usage of this method is as follows:
+     * </p>
      * <p>
      * _myState = context.setSubscribedState(componentId, new MyState());
      * </p>
-     *
+     * 
      * @param componentId the id of the StateSpout component to subscribe to
      * @param obj Provided ISubscribedState implementation
      * @return Returns the ISubscribedState object provided
@@ -110,14 +100,15 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     }
 
     /**
-     * Synchronizes the specified stream from the specified state spout component
-     * id with the provided ISubscribedState object.
-     *
-     * <p>The recommended usage of this method is as follows:</p>
+     * Synchronizes the specified stream from the specified state spout component id with the provided ISubscribedState object.
+     * 
+     * <p>
+     * The recommended usage of this method is as follows:
+     * </p>
      * <p>
      * _myState = context.setSubscribedState(componentId, streamId, new MyState());
      * </p>
-     *
+     * 
      * @param componentId the id of the StateSpout component to subscribe to
      * @param streamId the stream to subscribe to
      * @param obj Provided ISubscribedState implementation
@@ -129,7 +120,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
 
     /**
      * Gets the task id of this task.
-     *
+     * 
      * @return the task id
      */
     public int getThisTaskId() {
@@ -137,33 +128,31 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     }
 
     /**
-     * Gets the component id for this task. The component id maps
-     * to a component id specified for a Spout or Bolt in the topology definition.
+     * Gets the component id for this task. The component id maps to a component id specified for a Spout or Bolt in the topology definition.
+     * 
      * @return
      */
     public String getThisComponentId() {
         return getComponentId(_taskId);
     }
 
-	/**
-	 * Gets the declared output fields for the specified stream id for the
-	 * component this task is a part of.
-	 */
-	public Fields getThisOutputFields(String streamId) {
-		return getComponentOutputFields(getThisComponentId(), streamId);
-	}
-
-	/**
-	 * Gets the declared output fields for the specified stream id for the
-	 * component this task is a part of.
-	 */
-	public Map<String, List<String>> getThisOutputFieldsForStreams() {
-		Map<String, List<String>> streamToFields = new HashMap<String, List<String>>();
-		for (String stream : this.getThisStreams()) {
-			streamToFields.put(stream, this.getThisOutputFields(stream).toList());
-		}
-		return streamToFields;
-	}
+    /**
+     * Gets the declared output fields for the specified stream id for the component this task is a part of.
+     */
+    public Fields getThisOutputFields(String streamId) {
+        return getComponentOutputFields(getThisComponentId(), streamId);
+    }
+
+    /**
+     * Gets the declared output fields for the specified stream id for the component this task is a part of.
+     */
+    public Map<String, List<String>> getThisOutputFieldsForStreams() {
+        Map<String, List<String>> streamToFields = new HashMap<String, List<String>>();
+        for (String stream : this.getThisStreams()) {
+            streamToFields.put(stream, this.getThisOutputFields(stream).toList());
+        }
+        return streamToFields;
+    }
 
     /**
      * Gets the set of streams declared for the component of this task.
@@ -173,15 +162,14 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     }
 
     /**
-     * Gets the index of this task id in getComponentTasks(getThisComponentId()).
-     * An example use case for this method is determining which task
-     * accesses which resource in a distributed resource to ensure an even distribution.
+     * Gets the index of this task id in getComponentTasks(getThisComponentId()). An example use case for this method is determining which task accesses which
+     * resource in a distributed resource to ensure an even distribution.
      */
     public int getThisTaskIndex() {
         List<Integer> tasks = new ArrayList<Integer>(getComponentTasks(getThisComponentId()));
         Collections.sort(tasks);
-        for(int i=0; i<tasks.size(); i++) {
-            if(tasks.get(i) == getThisTaskId()) {
+        for (int i = 0; i < tasks.size(); i++) {
+            if (tasks.get(i) == getThisTaskId()) {
                 return i;
             }
         }
@@ -190,7 +178,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
 
     /**
      * Gets the declared inputs to this component.
-     *
+     * 
      * @return A map from subscribed component/stream to the grouping subscribed with.
      */
     public Map<GlobalStreamId, Grouping> getThisSources() {
@@ -199,7 +187,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
 
     /**
      * Gets information about who is consuming the outputs of this component, and how.
-     *
+     * 
      * @return Map from stream id to component id to the Grouping used.
      */
     public Map<String, Map<String, Grouping>> getThisTargets() {
@@ -231,15 +219,15 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
         return _hooks;
     }
 
-	private static Map<String, Object> groupingToJSONableMap(Grouping grouping) {
-		Map groupingMap = new HashMap<String, Object>();
-		groupingMap.put("type", grouping.getSetField().toString());
-		if (grouping.is_set_fields()) {
-			groupingMap.put("fields", grouping.get_fields());
-		}
-		return groupingMap;
-	}
-    
+    private static Map<String, Object> groupingToJSONableMap(Grouping grouping) {
+        Map groupingMap = new HashMap<String, Object>();
+        groupingMap.put("type", grouping.getSetField().toString());
+        if (grouping.is_set_fields()) {
+            groupingMap.put("fields", grouping.get_fields());
+        }
+        return groupingMap;
+    }
+
     @Override
     public String toJSONString() {
         Map obj = new HashMap();
@@ -253,39 +241,38 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
         // Convert targets to a JSON serializable format
         Map<String, Map> stringTargets = new HashMap<String, Map>();
         for (Map.Entry<String, Map<String, Grouping>> entry : this.getThisTargets().entrySet()) {
-        	Map stringTargetMap = new HashMap<String, Object>();
-        	for (Map.Entry<String, Grouping> innerEntry : entry.getValue().entrySet()) {
-        		stringTargetMap.put(innerEntry.getKey(), groupingToJSONableMap(innerEntry.getValue()));
-        	}
-        	stringTargets.put(entry.getKey(), stringTargetMap);
+            Map stringTargetMap = new HashMap<String, Object>();
+            for (Map.Entry<String, Grouping> innerEntry : entry.getValue().entrySet()) {
+                stringTargetMap.put(innerEntry.getKey(), groupingToJSONableMap(innerEntry.getValue()));
+            }
+            stringTargets.put(entry.getKey(), stringTargetMap);
         }
         obj.put("stream->target->grouping", stringTargets);
         // Convert sources to a JSON serializable format
         Map<String, Map<String, Object>> stringSources = new HashMap<String, Map<String, Object>>();
         for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
-        	GlobalStreamId gid = entry.getKey();
-        	Map<String, Object> stringSourceMap = stringSources.get(gid.get_componentId());
-        	if (stringSourceMap == null) {
-        		stringSourceMap = new HashMap<String, Object>();
-        		stringSources.put(gid.get_componentId(), stringSourceMap);
-        	}
-        	stringSourceMap.put(gid.get_streamId(), groupingToJSONableMap(entry.getValue()));        	
+            GlobalStreamId gid = entry.getKey();
+            Map<String, Object> stringSourceMap = stringSources.get(gid.get_componentId());
+            if (stringSourceMap == null) {
+                stringSourceMap = new HashMap<String, Object>();
+                stringSources.put(gid.get_componentId(), stringSourceMap);
+            }
+            stringSourceMap.put(gid.get_streamId(), groupingToJSONableMap(entry.getValue()));
         }
         obj.put("source->stream->grouping", stringSources);
         return JSONValue.toJSONString(obj);
     }
 
     /*
-     * Register a IMetric instance.
-     * Storm will then call getValueAndReset on the metric every timeBucketSizeInSecs
-     * and the returned value is sent to all metrics consumers.
-     * You must call this during IBolt::prepare or ISpout::open.
+     * Register a IMetric instance. Storm will then call getValueAndReset on the metric every timeBucketSizeInSecs and the returned value is sent to all metrics
+     * consumers. You must call this during IBolt::prepare or ISpout::open.
+     * 
      * @return The IMetric argument unchanged.
      */
     public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
-        if((Boolean)_openOrPrepareWasCalled.deref() == true) {
-            throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +
-                                       "IBolt::prepare() or ISpout::open() method.");
+        if ((Boolean) _openOrPrepareWasCalled.deref() == true) {
+            throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden "
+                    + "IBolt::prepare() or ISpout::open() method.");
         }
 
         if (metric == null) {
@@ -293,27 +280,27 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
         }
 
         if (timeBucketSizeInSecs <= 0) {
-            throw new IllegalArgumentException("TopologyContext.registerMetric can only be called with timeBucketSizeInSecs " +
-                                               "greater than or equal to 1 second.");
+            throw new IllegalArgumentException("TopologyContext.registerMetric can only be called with timeBucketSizeInSecs "
+                    + "greater than or equal to 1 second.");
         }
 
         if (getRegisteredMetricByName(name) != null) {
-            throw new RuntimeException("The same metric name `" + name + "` was registered twice." );
+            throw new RuntimeException("The same metric name `" + name + "` was registered twice.");
         }
 
         Map m1 = _registeredMetrics;
-        if(!m1.containsKey(timeBucketSizeInSecs)) {
+        if (!m1.containsKey(timeBucketSizeInSecs)) {
             m1.put(timeBucketSizeInSecs, new HashMap());
         }
 
-        Map m2 = (Map)m1.get(timeBucketSizeInSecs);
-        if(!m2.containsKey(_taskId)) {
+        Map m2 = (Map) m1.get(timeBucketSizeInSecs);
+        if (!m2.containsKey(_taskId)) {
             m2.put(_taskId, new HashMap());
         }
 
-        Map m3 = (Map)m2.get(_taskId);
-        if(m3.containsKey(name)) {
-            throw new RuntimeException("The same metric name `" + name + "` was registered twice." );
+        Map m3 = (Map) m2.get(_taskId);
+        if (m3.containsKey(name)) {
+            throw new RuntimeException("The same metric name `" + name + "` was registered twice.");
         } else {
             m3.put(name, metric);
         }
@@ -322,21 +309,18 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     }
 
     /**
-     * Get component's metric from registered metrics by name.
-     * Notice: Normally, one component can only register one metric name once.
-     *         But now registerMetric has a bug(https://issues.apache.org/jira/browse/STORM-254)
-     *         cause the same metric name can register twice.
-     *         So we just return the first metric we meet.
+     * Get component's metric from registered metrics by name. Notice: Normally, one component can only register one metric name once. But now registerMetric
+     * has a bug(https://issues.apache.org/jira/browse/STORM-254) cause the same metric name can register twice. So we just return the first metric we meet.
      */
     public IMetric getRegisteredMetricByName(String name) {
         IMetric metric = null;
 
-        for (Map<Integer, Map<String, IMetric>> taskIdToNameToMetric: _registeredMetrics.values()) {
+        for (Map<Integer, Map<String, IMetric>> taskIdToNameToMetric : _registeredMetrics.values()) {
             Map<String, IMetric> nameToMetric = taskIdToNameToMetric.get(_taskId);
             if (nameToMetric != null) {
                 metric = nameToMetric.get(name);
                 if (metric != null) {
-                    //we just return the first metric we meet
+                    // we just return the first metric we meet
                     break;
                 }
             }
@@ -351,10 +335,21 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) {
         return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs);
     }
+
     /*
      * Convinience method for registering CombinedMetric.
      */
     public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
         return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs);
     }
+
+    public StormClusterState getZkCluster() {
+        return _zkCluster;
+    }
+    /*
+    * Task error report callback
+    * */
+    public void reportError(String errorMsg) throws Exception{
+            _zkCluster.report_task_error(getTopologyId(), _taskId, errorMsg, null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/task/WorkerTopologyContext.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/task/WorkerTopologyContext.java b/jstorm-core/src/main/java/backtype/storm/task/WorkerTopologyContext.java
index de407ac..09c8c8c 100755
--- a/jstorm-core/src/main/java/backtype/storm/task/WorkerTopologyContext.java
+++ b/jstorm-core/src/main/java/backtype/storm/task/WorkerTopologyContext.java
@@ -27,34 +27,23 @@ import java.util.concurrent.ExecutorService;
 
 public class WorkerTopologyContext extends GeneralTopologyContext {
     public static final String SHARED_EXECUTOR = "executor";
-    
+
     private Integer _workerPort;
     private List<Integer> _workerTasks;
     private String _codeDir;
     private String _pidDir;
     Map<String, Object> _userResources;
     Map<String, Object> _defaultResources;
-    
-    public WorkerTopologyContext(
-            StormTopology topology,
-            Map stormConf,
-            Map<Integer, String> taskToComponent,
-            Map<String, List<Integer>> componentToSortedTasks,
-            Map<String, Map<String, Fields>> componentToStreamToFields,
-            String stormId,
-            String codeDir,
-            String pidDir,
-            Integer workerPort,
-            List<Integer> workerTasks,
-            Map<String, Object> defaultResources,
-            Map<String, Object> userResources
-            ) {
+
+    public WorkerTopologyContext(StormTopology topology, Map stormConf, Map<Integer, String> taskToComponent,
+            Map<String, List<Integer>> componentToSortedTasks, Map<String, Map<String, Fields>> componentToStreamToFields, String stormId, String codeDir,
+            String pidDir, Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources, Map<String, Object> userResources) {
         super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId);
         _codeDir = codeDir;
         _defaultResources = defaultResources;
         _userResources = userResources;
         try {
-            if(pidDir!=null) {
+            if (pidDir != null) {
                 _pidDir = new File(pidDir).getCanonicalPath();
             } else {
                 _pidDir = null;
@@ -67,13 +56,12 @@ public class WorkerTopologyContext extends GeneralTopologyContext {
     }
 
     /**
-     * Gets all the task ids that are running in this worker process
-     * (including the task for this task).
+     * Gets all the task ids that are running in this worker process (including the task for this task).
      */
     public List<Integer> getThisWorkerTasks() {
         return _workerTasks;
     }
-    
+
     public Integer getThisWorkerPort() {
         return _workerPort;
     }
@@ -81,28 +69,27 @@ public class WorkerTopologyContext extends GeneralTopologyContext {
     public void setThisWorkerTasks(List<Integer> workerTasks) {
         this._workerTasks = workerTasks;
     }
+
     /**
-     * Gets the location of the external resources for this worker on the
-     * local filesystem. These external resources typically include bolts implemented
-     * in other languages, such as Ruby or Python.
+     * Gets the location of the external resources for this worker on the local filesystem. These external resources typically include bolts implemented in
+     * other languages, such as Ruby or Python.
      */
     public String getCodeDir() {
         return _codeDir;
     }
 
     /**
-     * If this task spawns any subprocesses, those subprocesses must immediately
-     * write their PID to this directory on the local filesystem to ensure that
-     * Storm properly destroys that process when the worker is shutdown.
+     * If this task spawns any subprocesses, those subprocesses must immediately write their PID to this directory on the local filesystem to ensure that Storm
+     * properly destroys that process when the worker is shutdown.
      */
     public String getPIDDir() {
         return _pidDir;
     }
-    
+
     public Object getResource(String name) {
         return _userResources.get(name);
     }
-    
+
     public ExecutorService getSharedExecutor() {
         return (ExecutorService) _defaultResources.get(SHARED_EXECUTOR);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/AckFailDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/AckFailDelegate.java b/jstorm-core/src/main/java/backtype/storm/testing/AckFailDelegate.java
index d65c8bd..fbbcbfc 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/AckFailDelegate.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/AckFailDelegate.java
@@ -21,5 +21,6 @@ import java.io.Serializable;
 
 public interface AckFailDelegate extends Serializable {
     public void ack(Object id);
+
     public void fail(Object id);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/AckFailMapTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/AckFailMapTracker.java b/jstorm-core/src/main/java/backtype/storm/testing/AckFailMapTracker.java
index e16afd8..f3feff3 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/AckFailMapTracker.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/AckFailMapTracker.java
@@ -22,31 +22,31 @@ import java.util.HashSet;
 import java.util.Set;
 
 public class AckFailMapTracker implements AckFailDelegate {
-    
+
     String _acked;
     String _failed;
-    
+
     public AckFailMapTracker() {
         _acked = RegisteredGlobalState.registerState(new HashSet());
         _failed = RegisteredGlobalState.registerState(new HashSet());
     }
-    
+
     public boolean isAcked(Object id) {
-        return ((Set)RegisteredGlobalState.getState(_acked)).contains(id);
+        return ((Set) RegisteredGlobalState.getState(_acked)).contains(id);
     }
-    
+
     public boolean isFailed(Object id) {
-        return ((Set)RegisteredGlobalState.getState(_failed)).contains(id);        
+        return ((Set) RegisteredGlobalState.getState(_failed)).contains(id);
     }
 
     @Override
     public void ack(Object id) {
-        ((Set)RegisteredGlobalState.getState(_acked)).add(id);
+        ((Set) RegisteredGlobalState.getState(_acked)).add(id);
     }
 
     @Override
     public void fail(Object id) {
-        ((Set)RegisteredGlobalState.getState(_failed)).add(id);
+        ((Set) RegisteredGlobalState.getState(_failed)).add(id);
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/AckTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/AckTracker.java b/jstorm-core/src/main/java/backtype/storm/testing/AckTracker.java
index ad80475..10973f1 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/AckTracker.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/AckTracker.java
@@ -24,14 +24,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class AckTracker implements AckFailDelegate {
     private static Map<String, AtomicInteger> acks = new ConcurrentHashMap<String, AtomicInteger>();
-    
+
     private String _id;
-    
+
     public AckTracker() {
         _id = UUID.randomUUID().toString();
         acks.put(_id, new AtomicInteger(0));
     }
-    
+
     @Override
     public void ack(Object id) {
         acks.get(_id).incrementAndGet();
@@ -40,13 +40,13 @@ public class AckTracker implements AckFailDelegate {
     @Override
     public void fail(Object id) {
     }
-    
+
     public int getNumAcks() {
         return acks.get(_id).intValue();
     }
-    
+
     public void resetNumAcks() {
         acks.get(_id).set(0);
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/BatchNumberList.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/BatchNumberList.java b/jstorm-core/src/main/java/backtype/storm/testing/BatchNumberList.java
index 26f964a..2565f25 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/BatchNumberList.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/BatchNumberList.java
@@ -37,16 +37,15 @@ public class BatchNumberList extends BaseBatchBolt {
     }
 
     String _wordComponent;
-    
+
     public BatchNumberList(String wordComponent) {
         _wordComponent = wordComponent;
     }
-    
+
     String word = null;
     List<Integer> intSet = new ArrayList<Integer>();
     BatchOutputCollector _collector;
-    
-    
+
     @Override
     public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
         _collector = collector;
@@ -54,7 +53,7 @@ public class BatchNumberList extends BaseBatchBolt {
 
     @Override
     public void execute(Tuple tuple) {
-        if(tuple.getSourceComponent().equals(_wordComponent)) {
+        if (tuple.getSourceComponent().equals(_wordComponent)) {
             this.word = tuple.getString(1);
         } else {
             intSet.add(tuple.getInteger(1));
@@ -63,10 +62,10 @@ public class BatchNumberList extends BaseBatchBolt {
 
     @Override
     public void finishBatch() {
-        if(word!=null) {
+        if (word != null) {
             Collections.sort(intSet);
             _collector.emit(new Values(word, intSet));
         }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/BatchProcessWord.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/BatchProcessWord.java b/jstorm-core/src/main/java/backtype/storm/testing/BatchProcessWord.java
index 7f3eaf1..819c7c1 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/BatchProcessWord.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/BatchProcessWord.java
@@ -35,5 +35,5 @@ public class BatchProcessWord extends BaseBasicBolt {
     public void execute(Tuple input, BasicOutputCollector collector) {
         collector.emit(new Values(input.getValue(0), input.getString(1).length()));
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/BatchRepeatA.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/BatchRepeatA.java b/jstorm-core/src/main/java/backtype/storm/testing/BatchRepeatA.java
index 107f2ed..f5751d1 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/BatchRepeatA.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/BatchRepeatA.java
@@ -24,21 +24,20 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 
+public class BatchRepeatA extends BaseBasicBolt {
 
-public class BatchRepeatA extends BaseBasicBolt {  
-    
     @Override
     public void execute(Tuple input, BasicOutputCollector collector) {
-       Object id = input.getValue(0);
-       String word = input.getString(1);
-       for(int i=0; i<word.length(); i++) {
-            if(word.charAt(i) == 'a') {
+        Object id = input.getValue(0);
+        String word = input.getString(1);
+        for (int i = 0; i < word.length(); i++) {
+            if (word.charAt(i) == 'a') {
                 collector.emit("multi", new Values(id, word.substring(0, i)));
             }
         }
         collector.emit("single", new Values(id, word));
     }
-    
+
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declareStream("multi", new Fields("id", "word"));

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/BoltTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/BoltTracker.java b/jstorm-core/src/main/java/backtype/storm/testing/BoltTracker.java
index 3fe4e7a..0319ebc 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/BoltTracker.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/BoltTracker.java
@@ -22,7 +22,6 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import java.util.HashMap;
 import java.util.Map;
 
-
 public class BoltTracker extends NonRichBoltTracker implements IRichBolt {
     IRichBolt _richDelegate;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/CompleteTopologyParam.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/CompleteTopologyParam.java b/jstorm-core/src/main/java/backtype/storm/testing/CompleteTopologyParam.java
index f3306cf..8483413 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/CompleteTopologyParam.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/CompleteTopologyParam.java
@@ -23,65 +23,65 @@ import backtype.storm.Config;
  * The param class for the <code>Testing.completeTopology</code>.
  */
 public class CompleteTopologyParam {
-	/**
-	 * The mocked spout sources
-	 */
-	private MockedSources mockedSources;
-	/**
-	 * the config for the topology when it was submitted to the cluster
-	 */
-	private Config stormConf;
-	/**
-	 * whether cleanup the state?
-	 */
-	private Boolean cleanupState;
-	/**
-	 * the topology name you want to submit to the cluster
-	 */
-	private String topologyName;
+    /**
+     * The mocked spout sources
+     */
+    private MockedSources mockedSources;
+    /**
+     * the config for the topology when it was submitted to the cluster
+     */
+    private Config stormConf;
+    /**
+     * whether cleanup the state?
+     */
+    private Boolean cleanupState;
+    /**
+     * the topology name you want to submit to the cluster
+     */
+    private String topologyName;
 
-	/**
-	 * the timeout of topology you want to submit to the cluster
-	 */
-	private Integer timeoutMs;
+    /**
+     * the timeout of topology you want to submit to the cluster
+     */
+    private Integer timeoutMs;
 
-	public MockedSources getMockedSources() {
-		return mockedSources;
-	}
+    public MockedSources getMockedSources() {
+        return mockedSources;
+    }
 
-	public void setMockedSources(MockedSources mockedSources) {
-		this.mockedSources = mockedSources;
-	}
+    public void setMockedSources(MockedSources mockedSources) {
+        this.mockedSources = mockedSources;
+    }
 
-	public Config getStormConf() {
-		return stormConf;
-	}
+    public Config getStormConf() {
+        return stormConf;
+    }
 
-	public void setStormConf(Config stormConf) {
-		this.stormConf = stormConf;
-	}
+    public void setStormConf(Config stormConf) {
+        this.stormConf = stormConf;
+    }
 
-	public Boolean getCleanupState() {
-		return cleanupState;
-	}
+    public Boolean getCleanupState() {
+        return cleanupState;
+    }
 
-	public void setCleanupState(Boolean cleanupState) {
-		this.cleanupState = cleanupState;
-	}
+    public void setCleanupState(Boolean cleanupState) {
+        this.cleanupState = cleanupState;
+    }
 
-	public String getTopologyName() {
-		return topologyName;
-	}
+    public String getTopologyName() {
+        return topologyName;
+    }
 
-	public void setTopologyName(String topologyName) {
-		this.topologyName = topologyName;
-	}
+    public void setTopologyName(String topologyName) {
+        this.topologyName = topologyName;
+    }
 
-	public Integer getTimeoutMs() {
-		return timeoutMs;
-	}
+    public Integer getTimeoutMs() {
+        return timeoutMs;
+    }
 
-	public void setTimeoutMs(Integer timeoutMs) {
-		this.timeoutMs = timeoutMs;
-	}
+    public void setTimeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/CountingBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/CountingBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/CountingBatchBolt.java
index 882801c..3682120 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/CountingBatchBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/CountingBatchBolt.java
@@ -30,7 +30,7 @@ public class CountingBatchBolt extends BaseBatchBolt {
     BatchOutputCollector _collector;
     Object _id;
     int _count = 0;
-    
+
     @Override
     public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
         _collector = collector;
@@ -44,12 +44,12 @@ public class CountingBatchBolt extends BaseBatchBolt {
 
     @Override
     public void finishBatch() {
-        _collector.emit(new Values(_id, _count));        
-    }   
+        _collector.emit(new Values(_id, _count));
+    }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields("tx", "count"));
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/CountingCommitBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/CountingCommitBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/CountingCommitBolt.java
index cb8f7e5..a45f16b 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/CountingCommitBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/CountingCommitBolt.java
@@ -32,7 +32,7 @@ public class CountingCommitBolt extends BaseTransactionalBolt implements ICommit
     BatchOutputCollector _collector;
     TransactionAttempt _id;
     int _count = 0;
-    
+
     @Override
     public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
         _id = id;
@@ -46,8 +46,8 @@ public class CountingCommitBolt extends BaseTransactionalBolt implements ICommit
 
     @Override
     public void finishBatch() {
-        _collector.emit(new Values(_id, _count));        
-    }   
+        _collector.emit(new Values(_id, _count));
+    }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/FeederSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/FeederSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/FeederSpout.java
index 1ffb594..52ba5b7 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/FeederSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/FeederSpout.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
 
-
 public class FeederSpout extends BaseRichSpout {
     private int _id;
     private Fields _outFields;
@@ -44,15 +43,15 @@ public class FeederSpout extends BaseRichSpout {
     public void setAckFailDelegate(AckFailDelegate d) {
         _ackFailDelegate = d;
     }
-    
+
     public void feed(List<Object> tuple) {
         feed(tuple, UUID.randomUUID().toString());
     }
 
     public void feed(List<Object> tuple, Object msgId) {
         InprocMessaging.sendMessage(_id, new Values(tuple, msgId));
-    }    
-    
+    }
+
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
         _collector = collector;
     }
@@ -63,10 +62,10 @@ public class FeederSpout extends BaseRichSpout {
 
     public void nextTuple() {
         List<Object> toEmit = (List<Object>) InprocMessaging.pollMessage(_id);
-        if(toEmit!=null) {
+        if (toEmit != null) {
             List<Object> tuple = (List<Object>) toEmit.get(0);
             Object msgId = toEmit.get(1);
-            
+
             _collector.emit(tuple, msgId);
         } else {
             try {
@@ -78,13 +77,13 @@ public class FeederSpout extends BaseRichSpout {
     }
 
     public void ack(Object msgId) {
-        if(_ackFailDelegate!=null) {
+        if (_ackFailDelegate != null) {
             _ackFailDelegate.ack(msgId);
         }
     }
 
     public void fail(Object msgId) {
-        if(_ackFailDelegate!=null) {
+        if (_ackFailDelegate != null) {
             _ackFailDelegate.fail(msgId);
         }
     }
@@ -96,5 +95,5 @@ public class FeederSpout extends BaseRichSpout {
     @Override
     public Map<String, Object> getComponentConfiguration() {
         return new HashMap<String, Object>();
-    }    
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/FixedTupleSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/FixedTupleSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/FixedTupleSpout.java
index 9527803..01fc3e3 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/FixedTupleSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/FixedTupleSpout.java
@@ -35,17 +35,17 @@ public class FixedTupleSpout implements IRichSpout {
     private static final Map<String, Integer> failed = new HashMap<String, Integer>();
 
     public static int getNumAcked(String stormId) {
-        synchronized(acked) {
+        synchronized (acked) {
             return get(acked, stormId, 0);
         }
     }
 
     public static int getNumFailed(String stormId) {
-        synchronized(failed) {
+        synchronized (failed) {
             return get(failed, stormId, 0);
         }
     }
-    
+
     public static void clear(String stormId) {
         acked.remove(stormId);
         failed.remove(stormId);
@@ -67,16 +67,16 @@ public class FixedTupleSpout implements IRichSpout {
 
     public FixedTupleSpout(List tuples, String fieldName) {
         _id = UUID.randomUUID().toString();
-        synchronized(acked) {
+        synchronized (acked) {
             acked.put(_id, 0);
         }
-        synchronized(failed) {
+        synchronized (failed) {
             failed.put(_id, 0);
         }
         _tuples = new ArrayList<FixedTuple>();
-        for(Object o: tuples) {
+        for (Object o : tuples) {
             FixedTuple ft;
-            if(o instanceof FixedTuple) {
+            if (o instanceof FixedTuple) {
                 ft = (FixedTuple) o;
             } else {
                 ft = new FixedTuple((List) o);
@@ -89,25 +89,25 @@ public class FixedTupleSpout implements IRichSpout {
     public List<FixedTuple> getSourceTuples() {
         return _tuples;
     }
-    
+
     public int getCompleted() {
         int ackedAmt;
         int failedAmt;
-        
-        synchronized(acked) {
+
+        synchronized (acked) {
             ackedAmt = acked.get(_id);
         }
-        synchronized(failed) {
+        synchronized (failed) {
             failedAmt = failed.get(_id);
         }
         return ackedAmt + failedAmt;
     }
-    
+
     public void cleanup() {
-        synchronized(acked) {            
+        synchronized (acked) {
             acked.remove(_id);
-        } 
-        synchronized(failed) {            
+        }
+        synchronized (failed) {
             failed.remove(_id);
         }
     }
@@ -116,15 +116,15 @@ public class FixedTupleSpout implements IRichSpout {
         _context = context;
         List<Integer> tasks = context.getComponentTasks(context.getThisComponentId());
         int startIndex;
-        for(startIndex=0; startIndex<tasks.size(); startIndex++) {
-            if(tasks.get(startIndex)==context.getThisTaskId()) {
+        for (startIndex = 0; startIndex < tasks.size(); startIndex++) {
+            if (tasks.get(startIndex) == context.getThisTaskId()) {
                 break;
             }
         }
         _collector = collector;
         _pending = new HashMap<String, FixedTuple>();
         _serveTuples = new ArrayList<FixedTuple>();
-        for(int i=startIndex; i<_tuples.size(); i+=tasks.size()) {
+        for (int i = startIndex; i < _tuples.size(); i += tasks.size()) {
             _serveTuples.add(_tuples.get(i));
         }
     }
@@ -133,7 +133,7 @@ public class FixedTupleSpout implements IRichSpout {
     }
 
     public void nextTuple() {
-        if(_serveTuples.size()>0) {
+        if (_serveTuples.size() > 0) {
             FixedTuple ft = _serveTuples.remove(0);
             String id = UUID.randomUUID().toString();
             _pending.put(id, ft);
@@ -144,16 +144,16 @@ public class FixedTupleSpout implements IRichSpout {
     }
 
     public void ack(Object msgId) {
-        synchronized(acked) {
+        synchronized (acked) {
             int curr = get(acked, _id, 0);
-            acked.put(_id, curr+1);
+            acked.put(_id, curr + 1);
         }
     }
 
     public void fail(Object msgId) {
-        synchronized(failed) {
+        synchronized (failed) {
             int curr = get(failed, _id, 0);
-            failed.put(_id, curr+1);
+            failed.put(_id, curr + 1);
         }
     }
 
@@ -166,7 +166,7 @@ public class FixedTupleSpout implements IRichSpout {
     }
 
     @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
         if (_fieldName != null) {
             declarer.declare(new Fields(_fieldName));
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/ForwardingMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/ForwardingMetricsConsumer.java b/jstorm-core/src/main/java/backtype/storm/testing/ForwardingMetricsConsumer.java
index 010336e..5cf8830 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/ForwardingMetricsConsumer.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/ForwardingMetricsConsumer.java
@@ -51,45 +51,42 @@ public class ForwardingMetricsConsumer implements IMetricsConsumer {
 
     @Override
     public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
-        String [] parts = ((String)registrationArgument).split(":",2);
+        String[] parts = ((String) registrationArgument).split(":", 2);
         host = parts[0];
         port = Integer.valueOf(parts[1]);
         try {
-          socket = new Socket(host, port);
-          out = socket.getOutputStream();
+            socket = new Socket(host, port);
+            out = socket.getOutputStream();
         } catch (Exception e) {
-          throw new RuntimeException(e);
+            throw new RuntimeException(e);
         }
     }
 
     @Override
     public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
         StringBuilder sb = new StringBuilder();
-        String header = taskInfo.timestamp + "\t" +
-            taskInfo.srcWorkerHost + ":"+ taskInfo.srcWorkerPort + "\t"+
-            taskInfo.srcTaskId + "\t" + taskInfo.srcComponentId + "\t";
+        String header =
+                taskInfo.timestamp + "\t" + taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort + "\t" + taskInfo.srcTaskId + "\t" + taskInfo.srcComponentId
+                        + "\t";
         sb.append(header);
         for (DataPoint p : dataPoints) {
             sb.delete(header.length(), sb.length());
-            sb.append(p.name)
-                .append("\t")
-                .append(p.value)
-                .append("\n");
+            sb.append(p.name).append("\t").append(p.value).append("\n");
             try {
-              out.write(sb.toString().getBytes());
-              out.flush();
+                out.write(sb.toString().getBytes());
+                out.flush();
             } catch (Exception e) {
-              throw new RuntimeException(e);
+                throw new RuntimeException(e);
             }
         }
     }
 
     @Override
-    public void cleanup() { 
-      try {
-        socket.close();
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
+    public void cleanup() {
+        try {
+            socket.close();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/IdentityBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/IdentityBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/IdentityBolt.java
index dcad640..b951d84 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/IdentityBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/IdentityBolt.java
@@ -25,7 +25,7 @@ import backtype.storm.tuple.Tuple;
 
 public class IdentityBolt extends BaseBasicBolt {
     Fields _fields;
-    
+
     public IdentityBolt(Fields fields) {
         _fields = fields;
     }
@@ -38,5 +38,5 @@ public class IdentityBolt extends BaseBasicBolt {
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(_fields);
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java
index 1c4d5b3..7e4b32f 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java
@@ -32,7 +32,7 @@ public class KeyedCountingBatchBolt extends BaseBatchBolt {
     BatchOutputCollector _collector;
     Object _id;
     Map<Object, Integer> _counts = new HashMap<Object, Integer>();
-    
+
     @Override
     public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
         _collector = collector;
@@ -48,14 +48,14 @@ public class KeyedCountingBatchBolt extends BaseBatchBolt {
 
     @Override
     public void finishBatch() {
-        for(Object key: _counts.keySet()) {
+        for (Object key : _counts.keySet()) {
             _collector.emit(new Values(_id, key, _counts.get(key)));
         }
-    }   
+    }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields("tx", "key", "count"));
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java
index 887eb4e..67225cb 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java
@@ -33,7 +33,7 @@ public class KeyedSummingBatchBolt extends BaseBatchBolt {
     BatchOutputCollector _collector;
     Object _id;
     Map<Object, Number> _sums = new HashMap<Object, Number>();
-    
+
     @Override
     public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
         _collector = collector;
@@ -43,19 +43,19 @@ public class KeyedSummingBatchBolt extends BaseBatchBolt {
     @Override
     public void execute(Tuple tuple) {
         Object key = tuple.getValue(1);
-        Number curr = Utils.get(_sums, key, 0);        
+        Number curr = Utils.get(_sums, key, 0);
         _sums.put(key, Numbers.add(curr, tuple.getValue(2)));
     }
 
     @Override
     public void finishBatch() {
-        for(Object key: _sums.keySet()) {
+        for (Object key : _sums.keySet()) {
             _collector.emit(new Values(_id, key, _sums.get(key)));
         }
-    }   
+    }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields("tx", "key", "sum"));
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java
index 3b492e1..75ad375 100644
--- a/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java
@@ -35,13 +35,13 @@ import java.util.Map;
 
 public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout<MemoryTransactionalSpoutMeta> {
     public static String TX_FIELD = MemoryTransactionalSpout.class.getName() + "/id";
-    
+
     private String _id;
     private String _finishedPartitionsId;
     private int _takeAmt;
     private Fields _outFields;
     private Map<Integer, List<List<Object>>> _initialPartitions;
-    
+
     public MemoryTransactionalSpout(Map<Integer, List<List<Object>>> partitions, Fields outFields, int takeAmt) {
         _id = RegisteredGlobalState.registerState(partitions);
         Map<Integer, Boolean> finished = Collections.synchronizedMap(new HashMap<Integer, Boolean>());
@@ -50,17 +50,17 @@ public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout<
         _outFields = outFields;
         _initialPartitions = partitions;
     }
-    
+
     public boolean isExhaustedTuples() {
         Map<Integer, Boolean> statuses = getFinishedStatuses();
-        for(Integer partition: getQueues().keySet()) {
-            if(!statuses.containsKey(partition) || !getFinishedStatuses().get(partition)) {
+        for (Integer partition : getQueues().keySet()) {
+            if (!statuses.containsKey(partition) || !getFinishedStatuses().get(partition)) {
                 return false;
             }
         }
         return true;
     }
-    
+
     class Coordinator implements IPartitionedTransactionalSpout.Coordinator {
 
         @Override
@@ -71,29 +71,31 @@ public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout<
         @Override
         public boolean isReady() {
             return true;
-        }        
-        
+        }
+
         @Override
         public void close() {
-        }        
+        }
     }
-    
+
     class Emitter implements IPartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> {
-        
+
         Integer _maxSpoutPending;
         Map<Integer, Integer> _emptyPartitions = new HashMap<Integer, Integer>();
-        
+
         public Emitter(Map conf) {
             Object c = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
-            if(c==null) _maxSpoutPending = 1;
-            else _maxSpoutPending = Utils.getInt(c);
+            if (c == null)
+                _maxSpoutPending = 1;
+            else
+                _maxSpoutPending = Utils.getInt(c);
         }
-        
-        
+
         @Override
-        public MemoryTransactionalSpoutMeta emitPartitionBatchNew(TransactionAttempt tx, BatchOutputCollector collector, int partition, MemoryTransactionalSpoutMeta lastPartitionMeta) {
+        public MemoryTransactionalSpoutMeta emitPartitionBatchNew(TransactionAttempt tx, BatchOutputCollector collector, int partition,
+                MemoryTransactionalSpoutMeta lastPartitionMeta) {
             int index;
-            if(lastPartitionMeta==null) {
+            if (lastPartitionMeta == null) {
                 index = 0;
             } else {
                 index = lastPartitionMeta.index + lastPartitionMeta.amt;
@@ -102,40 +104,40 @@ public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout<
             int total = queue.size();
             int left = total - index;
             int toTake = Math.min(left, _takeAmt);
-            
+
             MemoryTransactionalSpoutMeta ret = new MemoryTransactionalSpoutMeta(index, toTake);
             emitPartitionBatch(tx, collector, partition, ret);
-            if(toTake==0) {
+            if (toTake == 0) {
                 // this is a pretty hacky way to determine when all the partitions have been committed
                 // wait until we've emitted max-spout-pending empty partitions for the partition
                 int curr = Utils.get(_emptyPartitions, partition, 0) + 1;
                 _emptyPartitions.put(partition, curr);
-                if(curr > _maxSpoutPending) {
+                if (curr > _maxSpoutPending) {
                     Map<Integer, Boolean> finishedStatuses = getFinishedStatuses();
                     // will be null in remote mode
-                    if(finishedStatuses!=null) {
+                    if (finishedStatuses != null) {
                         finishedStatuses.put(partition, true);
                     }
                 }
             }
-            return ret;   
+            return ret;
         }
 
         @Override
         public void emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, MemoryTransactionalSpoutMeta partitionMeta) {
             List<List<Object>> queue = getQueues().get(partition);
-            for(int i=partitionMeta.index; i < partitionMeta.index + partitionMeta.amt; i++) {
+            for (int i = partitionMeta.index; i < partitionMeta.index + partitionMeta.amt; i++) {
                 List<Object> toEmit = new ArrayList<Object>(queue.get(i));
                 toEmit.add(0, tx);
-                collector.emit(toEmit);                
+                collector.emit(toEmit);
             }
         }
-                
+
         @Override
         public void close() {
-        }        
-    } 
-    
+        }
+    }
+
     @Override
     public IPartitionedTransactionalSpout.Coordinator getCoordinator(Map conf, TopologyContext context) {
         return new Coordinator();
@@ -159,22 +161,24 @@ public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout<
         conf.registerSerialization(MemoryTransactionalSpoutMeta.class);
         return conf;
     }
-    
+
     public void startup() {
         getFinishedStatuses().clear();
     }
-    
+
     public void cleanup() {
         RegisteredGlobalState.clearState(_id);
         RegisteredGlobalState.clearState(_finishedPartitionsId);
     }
-    
-    private Map<Integer, List<List<Object>>> getQueues() {   
+
+    private Map<Integer, List<List<Object>>> getQueues() {
         Map<Integer, List<List<Object>>> ret = (Map<Integer, List<List<Object>>>) RegisteredGlobalState.getState(_id);
-        if(ret!=null) return ret;
-        else return _initialPartitions;
+        if (ret != null)
+            return ret;
+        else
+            return _initialPartitions;
     }
-    
+
     private Map<Integer, Boolean> getFinishedStatuses() {
         return (Map<Integer, Boolean>) RegisteredGlobalState.getState(_finishedPartitionsId);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java b/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java
index 29681fb..a00788d 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java
@@ -20,12 +20,12 @@ package backtype.storm.testing;
 public class MemoryTransactionalSpoutMeta {
     int index;
     int amt;
-    
+
     // for kryo compatibility
     public MemoryTransactionalSpoutMeta() {
-        
+
     }
-    
+
     public MemoryTransactionalSpoutMeta(int index, int amt) {
         this.index = index;
         this.amt = amt;
@@ -34,5 +34,5 @@ public class MemoryTransactionalSpoutMeta {
     @Override
     public String toString() {
         return "index: " + index + "; amt: " + amt;
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/MkClusterParam.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/MkClusterParam.java b/jstorm-core/src/main/java/backtype/storm/testing/MkClusterParam.java
index cd677c8..d325377 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/MkClusterParam.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/MkClusterParam.java
@@ -23,35 +23,40 @@ import java.util.Map;
  * The param arg for <code>Testing.withSimulatedTimeCluster</code> and <code>Testing.withTrackedCluster</code>
  */
 public class MkClusterParam {
-	/**
-	 * count of supervisors for the cluster.
-	 */
-	private Integer supervisors;
-	/**
-	 * count of port for each supervisor
-	 */
-	private Integer portsPerSupervisor;
-	/**
-	 * cluster config
-	 */
-	private Map daemonConf;
-	
-	public Integer getSupervisors() {
-		return supervisors;
-	}
-	public void setSupervisors(Integer supervisors) {
-		this.supervisors = supervisors;
-	}
-	public Integer getPortsPerSupervisor() {
-		return portsPerSupervisor;
-	}
-	public void setPortsPerSupervisor(Integer portsPerSupervisor) {
-		this.portsPerSupervisor = portsPerSupervisor;
-	}
-	public Map getDaemonConf() {
-		return daemonConf;
-	}
-	public void setDaemonConf(Map daemonConf) {
-		this.daemonConf = daemonConf;
-	}
+    /**
+     * count of supervisors for the cluster.
+     */
+    private Integer supervisors;
+    /**
+     * count of port for each supervisor
+     */
+    private Integer portsPerSupervisor;
+    /**
+     * cluster config
+     */
+    private Map daemonConf;
+
+    public Integer getSupervisors() {
+        return supervisors;
+    }
+
+    public void setSupervisors(Integer supervisors) {
+        this.supervisors = supervisors;
+    }
+
+    public Integer getPortsPerSupervisor() {
+        return portsPerSupervisor;
+    }
+
+    public void setPortsPerSupervisor(Integer portsPerSupervisor) {
+        this.portsPerSupervisor = portsPerSupervisor;
+    }
+
+    public Map getDaemonConf() {
+        return daemonConf;
+    }
+
+    public void setDaemonConf(Map daemonConf) {
+        this.daemonConf = daemonConf;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/MkTupleParam.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/MkTupleParam.java b/jstorm-core/src/main/java/backtype/storm/testing/MkTupleParam.java
index 34a8c68..a8a4bdf 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/MkTupleParam.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/MkTupleParam.java
@@ -21,31 +21,34 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class MkTupleParam {
-	private String stream;
-	private String component;
-	private List<String> fields;
-	
-	public String getStream() {
-		return stream;
-	}
-	public void setStream(String stream) {
-		this.stream = stream;
-	}
-	
-	public String getComponent() {
-		return component;
-	}
-	public void setComponent(String component) {
-		this.component = component;
-	}
-	
-	public List<String> getFields() {
-		return fields;
-	}
-	public void setFields(String... fields) {
-		this.fields = new ArrayList<String>();
-		for (int i = 0; i < fields.length; i++) {
-			this.fields.add(fields[i]);
-		}
-	}
+    private String stream;
+    private String component;
+    private List<String> fields;
+
+    public String getStream() {
+        return stream;
+    }
+
+    public void setStream(String stream) {
+        this.stream = stream;
+    }
+
+    public String getComponent() {
+        return component;
+    }
+
+    public void setComponent(String component) {
+        this.component = component;
+    }
+
+    public List<String> getFields() {
+        return fields;
+    }
+
+    public void setFields(String... fields) {
+        this.fields = new ArrayList<String>();
+        for (int i = 0; i < fields.length; i++) {
+            this.fields.add(fields[i]);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/MockedSources.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/MockedSources.java b/jstorm-core/src/main/java/backtype/storm/testing/MockedSources.java
index 1fd6b85..48b9ac0 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/MockedSources.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/MockedSources.java
@@ -26,11 +26,11 @@ import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
 
 public class MockedSources {
-	/**
-	 * mocked spout sources for the [spout, stream] pair.
-	 */
+    /**
+     * mocked spout sources for the [spout, stream] pair.
+     */
     private Map<String, List<FixedTuple>> data = new HashMap<String, List<FixedTuple>>();
-    
+
     /**
      * add mock data for the spout.
      * 
@@ -42,18 +42,18 @@ public class MockedSources {
         if (!data.containsKey(spoutId)) {
             data.put(spoutId, new ArrayList<FixedTuple>());
         }
-        
+
         List<FixedTuple> tuples = data.get(spoutId);
         for (int i = 0; i < valueses.length; i++) {
             FixedTuple tuple = new FixedTuple(streamId, valueses[i]);
             tuples.add(tuple);
         }
     }
-    
+
     public void addMockData(String spoutId, Values... valueses) {
         this.addMockData(spoutId, Utils.DEFAULT_STREAM_ID, valueses);
     }
-    
+
     public Map<String, List<FixedTuple>> getData() {
         return this.data;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/NGrouping.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/NGrouping.java b/jstorm-core/src/main/java/backtype/storm/testing/NGrouping.java
index 785ed92..9e7363c 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/NGrouping.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/NGrouping.java
@@ -27,17 +27,17 @@ import java.util.List;
 public class NGrouping implements CustomStreamGrouping {
     int _n;
     List<Integer> _outTasks;
-    
+
     public NGrouping(int n) {
         _n = n;
     }
-    
+
     @Override
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
         targetTasks = new ArrayList<Integer>(targetTasks);
         Collections.sort(targetTasks);
         _outTasks = new ArrayList<Integer>();
-        for(int i=0; i<_n; i++) {
+        for (int i = 0; i < _n; i++) {
             _outTasks.add(targetTasks.get(i));
         }
     }
@@ -46,5 +46,5 @@ public class NGrouping implements CustomStreamGrouping {
     public List<Integer> chooseTasks(int taskId, List<Object> values) {
         return _outTasks;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/NonRichBoltTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/NonRichBoltTracker.java b/jstorm-core/src/main/java/backtype/storm/testing/NonRichBoltTracker.java
index ccbb67f..b489289 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/NonRichBoltTracker.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/NonRichBoltTracker.java
@@ -25,7 +25,6 @@ import backtype.storm.utils.RegisteredGlobalState;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
-
 public class NonRichBoltTracker implements IBolt {
     IBolt _delegate;
     String _trackId;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java
index 1ff01b9..0c91d2b 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java
@@ -37,34 +37,34 @@ import java.util.Map;
  */
 public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransactionalSpout<MemoryTransactionalSpoutMeta> {
     public static String TX_FIELD = MemoryTransactionalSpout.class.getName() + "/id";
-    
+
     private String _id;
     private String _finishedPartitionsId;
     private String _disabledId;
     private int _takeAmt;
     private Fields _outFields;
-    
+
     public OpaqueMemoryTransactionalSpout(Map<Integer, List<List<Object>>> partitions, Fields outFields, int takeAmt) {
         _id = RegisteredGlobalState.registerState(partitions);
-        
+
         Map<Integer, Boolean> finished = Collections.synchronizedMap(new HashMap<Integer, Boolean>());
         _finishedPartitionsId = RegisteredGlobalState.registerState(finished);
-        
+
         Map<Integer, Boolean> disabled = Collections.synchronizedMap(new HashMap<Integer, Boolean>());
         _disabledId = RegisteredGlobalState.registerState(disabled);
-        
+
         _takeAmt = takeAmt;
         _outFields = outFields;
     }
-    
+
     public void setDisabled(Integer partition, boolean disabled) {
         getDisabledStatuses().put(partition, disabled);
     }
-    
+
     public boolean isExhaustedTuples() {
         Map<Integer, Boolean> statuses = getFinishedStatuses();
-        for(Integer partition: getQueues().keySet()) {
-            if(!statuses.containsKey(partition) || !getFinishedStatuses().get(partition)) {
+        for (Integer partition : getQueues().keySet()) {
+            if (!statuses.containsKey(partition) || !getFinishedStatuses().get(partition)) {
                 return false;
             }
         }
@@ -80,7 +80,7 @@ public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransac
     public IOpaquePartitionedTransactionalSpout.Coordinator getCoordinator(Map conf, TopologyContext context) {
         return new Coordinator();
     }
-    
+
     class Coordinator implements IOpaquePartitionedTransactionalSpout.Coordinator {
         @Override
         public boolean isReady() {
@@ -91,24 +91,26 @@ public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransac
         public void close() {
         }
     }
-    
+
     class Emitter implements IOpaquePartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> {
-        
+
         Integer _maxSpoutPending;
         Map<Integer, Integer> _emptyPartitions = new HashMap<Integer, Integer>();
-        
+
         public Emitter(Map conf) {
             Object c = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
-            if(c==null) _maxSpoutPending = 1;
-            else _maxSpoutPending = Utils.getInt(c);
+            if (c == null)
+                _maxSpoutPending = 1;
+            else
+                _maxSpoutPending = Utils.getInt(c);
         }
-        
-        
+
         @Override
-        public MemoryTransactionalSpoutMeta emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, MemoryTransactionalSpoutMeta lastPartitionMeta) {
-            if(!Boolean.FALSE.equals(getDisabledStatuses().get(partition))) {
+        public MemoryTransactionalSpoutMeta emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition,
+                MemoryTransactionalSpoutMeta lastPartitionMeta) {
+            if (!Boolean.FALSE.equals(getDisabledStatuses().get(partition))) {
                 int index;
-                if(lastPartitionMeta==null) {
+                if (lastPartitionMeta == null) {
                     index = 0;
                 } else {
                     index = lastPartitionMeta.index + lastPartitionMeta.amt;
@@ -119,26 +121,26 @@ public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransac
                 int toTake = Math.min(left, _takeAmt);
 
                 MemoryTransactionalSpoutMeta ret = new MemoryTransactionalSpoutMeta(index, toTake);
-                for(int i=ret.index; i < ret.index + ret.amt; i++) {
+                for (int i = ret.index; i < ret.index + ret.amt; i++) {
                     List<Object> toEmit = new ArrayList<Object>(queue.get(i));
                     toEmit.add(0, tx);
-                    collector.emit(toEmit);                
+                    collector.emit(toEmit);
                 }
-                if(toTake==0) {
+                if (toTake == 0) {
                     // this is a pretty hacky way to determine when all the partitions have been committed
                     // wait until we've emitted max-spout-pending empty partitions for the partition
                     int curr = Utils.get(_emptyPartitions, partition, 0) + 1;
                     _emptyPartitions.put(partition, curr);
-                    if(curr > _maxSpoutPending) {
+                    if (curr > _maxSpoutPending) {
                         getFinishedStatuses().put(partition, true);
                     }
                 }
-                return ret; 
+                return ret;
             } else {
                 return null;
             }
         }
-                
+
         @Override
         public void close() {
         }
@@ -147,7 +149,7 @@ public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransac
         public int numPartitions() {
             return getQueues().size();
         }
-    } 
+    }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
@@ -162,20 +164,20 @@ public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransac
         conf.registerSerialization(MemoryTransactionalSpoutMeta.class);
         return conf;
     }
-    
+
     public void startup() {
         getFinishedStatuses().clear();
     }
-    
+
     public void cleanup() {
         RegisteredGlobalState.clearState(_id);
         RegisteredGlobalState.clearState(_finishedPartitionsId);
     }
-    
+
     private Map<Integer, List<List<Object>>> getQueues() {
         return (Map<Integer, List<List<Object>>>) RegisteredGlobalState.getState(_id);
     }
-    
+
     private Map<Integer, Boolean> getFinishedStatuses() {
         return (Map<Integer, Boolean>) RegisteredGlobalState.getState(_finishedPartitionsId);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/PrepareBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/PrepareBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/PrepareBatchBolt.java
index 0bd9833..e9e2a9d 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/PrepareBatchBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/PrepareBatchBolt.java
@@ -26,10 +26,9 @@ import backtype.storm.utils.Utils;
 import java.util.ArrayList;
 import java.util.List;
 
-
 public class PrepareBatchBolt extends BaseBasicBolt {
     Fields _outFields;
-    
+
     public PrepareBatchBolt(Fields outFields) {
         _outFields = outFields;
     }
@@ -47,6 +46,5 @@ public class PrepareBatchBolt extends BaseBasicBolt {
         toEmit.addAll(input.getValues());
         collector.emit(toEmit);
     }
-    
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsBolt.java
index 4b85ce8..fd41283 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsBolt.java
@@ -27,23 +27,23 @@ import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.OutputFieldsDeclarer;
 
 public class PythonShellMetricsBolt extends ShellBolt implements IRichBolt {
-	private static final long serialVersionUID = 1999209252187463355L;
-	
-	public PythonShellMetricsBolt(String[] command) {
-		super(command);
-	}
-
-	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-		super.prepare(stormConf, context, collector);
-		
-		CountShellMetric cMetric = new CountShellMetric();
-		context.registerMetric("my-custom-shell-metric", cMetric, 5);
-	}
-	
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-	}
-
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
+    private static final long serialVersionUID = 1999209252187463355L;
+
+    public PythonShellMetricsBolt(String[] command) {
+        super(command);
+    }
+
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        super.prepare(stormConf, context, collector);
+
+        CountShellMetric cMetric = new CountShellMetric();
+        context.registerMetric("my-custom-shell-metric", cMetric, 5);
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    }
+
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsSpout.java
index 3ccf935..8325fba 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsSpout.java
@@ -28,25 +28,25 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 
 public class PythonShellMetricsSpout extends ShellSpout implements IRichSpout {
-	private static final long serialVersionUID = 1999209252187463355L;
-
-	public PythonShellMetricsSpout(String[] command) {
-		super(command);
-	}
-	
-	@Override
-	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-		super.open(conf, context, collector);
-	
-		CountShellMetric cMetric = new CountShellMetric();
-		context.registerMetric("my-custom-shellspout-metric", cMetric, 5);
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("field1"));
-	}
-
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
+    private static final long serialVersionUID = 1999209252187463355L;
+
+    public PythonShellMetricsSpout(String[] command) {
+        super(command);
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        super.open(conf, context, collector);
+
+        CountShellMetric cMetric = new CountShellMetric();
+        context.registerMetric("my-custom-shellspout-metric", cMetric, 5);
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("field1"));
+    }
+
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/SingleUserSimpleTransport.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/SingleUserSimpleTransport.java b/jstorm-core/src/main/java/backtype/storm/testing/SingleUserSimpleTransport.java
index 4d25ac7..cf9dc4d 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/SingleUserSimpleTransport.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/SingleUserSimpleTransport.java
@@ -23,15 +23,19 @@ import javax.security.auth.Subject;
 import java.security.Principal;
 import java.util.HashSet;
 
-
 public class SingleUserSimpleTransport extends SimpleTransportPlugin {
-   @Override
-   protected Subject getDefaultSubject() {
-       HashSet<Principal> principals = new HashSet<Principal>();
-       principals.add(new Principal() {
-          public String getName() { return "user"; }
-          public String toString() { return "user"; }
-       });
-       return new Subject(true, principals, new HashSet<Object>(), new HashSet<Object>());
-   } 
+    @Override
+    protected Subject getDefaultSubject() {
+        HashSet<Principal> principals = new HashSet<Principal>();
+        principals.add(new Principal() {
+            public String getName() {
+                return "user";
+            }
+
+            public String toString() {
+                return "user";
+            }
+        });
+        return new Subject(true, principals, new HashSet<Object>(), new HashSet<Object>());
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/SpoutTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/SpoutTracker.java b/jstorm-core/src/main/java/backtype/storm/testing/SpoutTracker.java
index 75ba2b8..369e661 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/SpoutTracker.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/SpoutTracker.java
@@ -28,13 +28,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
-
 public class SpoutTracker extends BaseRichSpout {
     IRichSpout _delegate;
     SpoutTrackOutputCollector _tracker;
     String _trackId;
 
-
     private class SpoutTrackOutputCollector implements ISpoutOutputCollector {
         public int transferred = 0;
         public int emitted = 0;
@@ -43,11 +41,11 @@ public class SpoutTracker extends BaseRichSpout {
         public SpoutTrackOutputCollector(SpoutOutputCollector collector) {
             _collector = collector;
         }
-        
+
         private void recordSpoutEmit() {
             Map stats = (Map) RegisteredGlobalState.getState(_trackId);
             ((AtomicInteger) stats.get("spout-emitted")).incrementAndGet();
-            
+
         }
 
         public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
@@ -63,11 +61,10 @@ public class SpoutTracker extends BaseRichSpout {
 
         @Override
         public void reportError(Throwable error) {
-        	_collector.reportError(error);
+            _collector.reportError(error);
         }
     }
 
-
     public SpoutTracker(IRichSpout delegate, String trackId) {
         _delegate = delegate;
         _trackId = trackId;
@@ -95,7 +92,7 @@ public class SpoutTracker extends BaseRichSpout {
     public void fail(Object msgId) {
         _delegate.fail(msgId);
         Map stats = (Map) RegisteredGlobalState.getState(_trackId);
-        ((AtomicInteger) stats.get("processed")).incrementAndGet();        
+        ((AtomicInteger) stats.get("processed")).incrementAndGet();
     }
 
     public void declareOutputFields(OutputFieldsDeclarer declarer) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestAggregatesCounter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestAggregatesCounter.java b/jstorm-core/src/main/java/backtype/storm/testing/TestAggregatesCounter.java
index e8c0a61..76b6874 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestAggregatesCounter.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestAggregatesCounter.java
@@ -29,7 +29,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import static backtype.storm.utils.Utils.tuple;
 
-
 public class TestAggregatesCounter extends BaseRichBolt {
     public static Logger LOG = LoggerFactory.getLogger(TestWordCounter.class);
 
@@ -46,8 +45,8 @@ public class TestAggregatesCounter extends BaseRichBolt {
         int count = (Integer) input.getValues().get(1);
         _counts.put(word, count);
         int globalCount = 0;
-        for(String w: _counts.keySet()) {
-            globalCount+=_counts.get(w);
+        for (String w : _counts.keySet()) {
+            globalCount += _counts.get(w);
         }
         _collector.emit(tuple(globalCount));
         _collector.ack(input);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestConfBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestConfBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/TestConfBolt.java
index 5790fb3..634cbe1 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestConfBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestConfBolt.java
@@ -26,7 +26,6 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import java.util.Map;
 
-
 public class TestConfBolt extends BaseBasicBolt {
     Map<String, Object> _componentConf;
     Map<String, Object> _conf;
@@ -34,16 +33,16 @@ public class TestConfBolt extends BaseBasicBolt {
     public TestConfBolt() {
         this(null);
     }
-        
+
     public TestConfBolt(Map<String, Object> componentConf) {
         _componentConf = componentConf;
-    }        
+    }
 
     @Override
     public void prepare(Map conf, TopologyContext context) {
         _conf = conf;
-    }    
-    
+    }
+
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields("conf", "value"));
@@ -58,5 +57,5 @@ public class TestConfBolt extends BaseBasicBolt {
     @Override
     public Map<String, Object> getComponentConfiguration() {
         return _componentConf;
-    }    
+    }
 }