You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:08 UTC
[30/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/task/TopologyContext.java b/jstorm-core/src/main/java/backtype/storm/task/TopologyContext.java
index 8124651..e8390f6 100755
--- a/jstorm-core/src/main/java/backtype/storm/task/TopologyContext.java
+++ b/jstorm-core/src/main/java/backtype/storm/task/TopologyContext.java
@@ -21,86 +21,76 @@ import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import backtype.storm.hooks.ITaskHook;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.ReducedMetric;
-import backtype.storm.metric.api.CombinedMetric;
+import backtype.storm.metric.api.*;
import backtype.storm.state.ISubscribedState;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import com.alibaba.jstorm.cluster.StormClusterState;
import org.apache.commons.lang.NotImplementedException;
import org.json.simple.JSONValue;
+import java.util.*;
+
/**
- * A TopologyContext is given to bolts and spouts in their "prepare" and "open"
- * methods, respectively. This object provides information about the component's
+ * A TopologyContext is given to bolts and spouts in their "prepare" and "open" methods, respectively. This object provides information about the component's
* place within the topology, such as task ids, inputs and outputs, etc.
- *
- * <p>The TopologyContext is also used to declare ISubscribedState objects to
- * synchronize state with StateSpouts this object is subscribed to.</p>
+ *
+ * <p>
+ * The TopologyContext is also used to declare ISubscribedState objects to synchronize state with StateSpouts this object is subscribed to.
+ * </p>
*/
public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
private Integer _taskId;
private Map<String, Object> _taskData = new HashMap<String, Object>();
private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
private Map<String, Object> _executorData;
- private Map<Integer,Map<Integer, Map<String, IMetric>>> _registeredMetrics;
+ private Map<Integer, Map<Integer, Map<String, IMetric>>> _registeredMetrics;
private clojure.lang.Atom _openOrPrepareWasCalled;
-
-
- public TopologyContext(StormTopology topology, Map stormConf,
- Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
- Map<String, Map<String, Fields>> componentToStreamToFields,
- String stormId, String codeDir, String pidDir, Integer taskId,
- Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources,
- Map<String, Object> userResources, Map<String, Object> executorData, Map registeredMetrics,
- clojure.lang.Atom openOrPrepareWasCalled) {
- super(topology, stormConf, taskToComponent, componentToSortedTasks,
- componentToStreamToFields, stormId, codeDir, pidDir,
- workerPort, workerTasks, defaultResources, userResources);
+ private StormClusterState _zkCluster;
+
+ public TopologyContext(StormTopology topology, Map stormConf, Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
+ Map<String, Map<String, Fields>> componentToStreamToFields, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort,
+ List<Integer> workerTasks, Map<String, Object> defaultResources, Map<String, Object> userResources, Map<String, Object> executorData,
+ Map registeredMetrics, clojure.lang.Atom openOrPrepareWasCalled, StormClusterState zkCluster) {
+ super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId, codeDir, pidDir, workerPort, workerTasks,
+ defaultResources, userResources);
_taskId = taskId;
_executorData = executorData;
_registeredMetrics = registeredMetrics;
_openOrPrepareWasCalled = openOrPrepareWasCalled;
+ _zkCluster = zkCluster;
}
/**
- * All state from all subscribed state spouts streams will be synced with
- * the provided object.
- *
- * <p>It is recommended that your ISubscribedState object is kept as an instance
- * variable of this object. The recommended usage of this method is as follows:</p>
- *
+ * All state from all subscribed state spouts streams will be synced with the provided object.
+ *
+ * <p>
+ * It is recommended that your ISubscribedState object is kept as an instance variable of this object. The recommended usage of this method is as follows:
+ * </p>
+ *
* <p>
* _myState = context.setAllSubscribedState(new MyState());
* </p>
+ *
* @param obj Provided ISubscribedState implementation
* @return Returns the ISubscribedState object provided
*/
public <T extends ISubscribedState> T setAllSubscribedState(T obj) {
- //check that only subscribed to one component/stream for statespout
- //setsubscribedstate appropriately
+ // check that only subscribed to one component/stream for statespout
+ // setsubscribedstate appropriately
throw new NotImplementedException();
}
-
/**
- * Synchronizes the default stream from the specified state spout component
- * id with the provided ISubscribedState object.
- *
- * <p>The recommended usage of this method is as follows:</p>
+ * Synchronizes the default stream from the specified state spout component id with the provided ISubscribedState object.
+ *
+ * <p>
+ * The recommended usage of this method is as follows:
+ * </p>
* <p>
* _myState = context.setSubscribedState(componentId, new MyState());
* </p>
- *
+ *
* @param componentId the id of the StateSpout component to subscribe to
* @param obj Provided ISubscribedState implementation
* @return Returns the ISubscribedState object provided
@@ -110,14 +100,15 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
}
/**
- * Synchronizes the specified stream from the specified state spout component
- * id with the provided ISubscribedState object.
- *
- * <p>The recommended usage of this method is as follows:</p>
+ * Synchronizes the specified stream from the specified state spout component id with the provided ISubscribedState object.
+ *
+ * <p>
+ * The recommended usage of this method is as follows:
+ * </p>
* <p>
* _myState = context.setSubscribedState(componentId, streamId, new MyState());
* </p>
- *
+ *
* @param componentId the id of the StateSpout component to subscribe to
* @param streamId the stream to subscribe to
* @param obj Provided ISubscribedState implementation
@@ -129,7 +120,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
/**
* Gets the task id of this task.
- *
+ *
* @return the task id
*/
public int getThisTaskId() {
@@ -137,33 +128,31 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
}
/**
- * Gets the component id for this task. The component id maps
- * to a component id specified for a Spout or Bolt in the topology definition.
+ * Gets the component id for this task. The component id maps to a component id specified for a Spout or Bolt in the topology definition.
+ *
* @return
*/
public String getThisComponentId() {
return getComponentId(_taskId);
}
- /**
- * Gets the declared output fields for the specified stream id for the
- * component this task is a part of.
- */
- public Fields getThisOutputFields(String streamId) {
- return getComponentOutputFields(getThisComponentId(), streamId);
- }
-
- /**
- * Gets the declared output fields for the specified stream id for the
- * component this task is a part of.
- */
- public Map<String, List<String>> getThisOutputFieldsForStreams() {
- Map<String, List<String>> streamToFields = new HashMap<String, List<String>>();
- for (String stream : this.getThisStreams()) {
- streamToFields.put(stream, this.getThisOutputFields(stream).toList());
- }
- return streamToFields;
- }
+ /**
+ * Gets the declared output fields for the specified stream id for the component this task is a part of.
+ */
+ public Fields getThisOutputFields(String streamId) {
+ return getComponentOutputFields(getThisComponentId(), streamId);
+ }
+
+ /**
+ * Gets the declared output fields for the specified stream id for the component this task is a part of.
+ */
+ public Map<String, List<String>> getThisOutputFieldsForStreams() {
+ Map<String, List<String>> streamToFields = new HashMap<String, List<String>>();
+ for (String stream : this.getThisStreams()) {
+ streamToFields.put(stream, this.getThisOutputFields(stream).toList());
+ }
+ return streamToFields;
+ }
/**
* Gets the set of streams declared for the component of this task.
@@ -173,15 +162,14 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
}
/**
- * Gets the index of this task id in getComponentTasks(getThisComponentId()).
- * An example use case for this method is determining which task
- * accesses which resource in a distributed resource to ensure an even distribution.
+ * Gets the index of this task id in getComponentTasks(getThisComponentId()). An example use case for this method is determining which task accesses which
+ * resource in a distributed resource to ensure an even distribution.
*/
public int getThisTaskIndex() {
List<Integer> tasks = new ArrayList<Integer>(getComponentTasks(getThisComponentId()));
Collections.sort(tasks);
- for(int i=0; i<tasks.size(); i++) {
- if(tasks.get(i) == getThisTaskId()) {
+ for (int i = 0; i < tasks.size(); i++) {
+ if (tasks.get(i) == getThisTaskId()) {
return i;
}
}
@@ -190,7 +178,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
/**
* Gets the declared inputs to this component.
- *
+ *
* @return A map from subscribed component/stream to the grouping subscribed with.
*/
public Map<GlobalStreamId, Grouping> getThisSources() {
@@ -199,7 +187,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
/**
* Gets information about who is consuming the outputs of this component, and how.
- *
+ *
* @return Map from stream id to component id to the Grouping used.
*/
public Map<String, Map<String, Grouping>> getThisTargets() {
@@ -231,15 +219,15 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
return _hooks;
}
- private static Map<String, Object> groupingToJSONableMap(Grouping grouping) {
- Map groupingMap = new HashMap<String, Object>();
- groupingMap.put("type", grouping.getSetField().toString());
- if (grouping.is_set_fields()) {
- groupingMap.put("fields", grouping.get_fields());
- }
- return groupingMap;
- }
-
+ private static Map<String, Object> groupingToJSONableMap(Grouping grouping) {
+ Map groupingMap = new HashMap<String, Object>();
+ groupingMap.put("type", grouping.getSetField().toString());
+ if (grouping.is_set_fields()) {
+ groupingMap.put("fields", grouping.get_fields());
+ }
+ return groupingMap;
+ }
+
@Override
public String toJSONString() {
Map obj = new HashMap();
@@ -253,39 +241,38 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
// Convert targets to a JSON serializable format
Map<String, Map> stringTargets = new HashMap<String, Map>();
for (Map.Entry<String, Map<String, Grouping>> entry : this.getThisTargets().entrySet()) {
- Map stringTargetMap = new HashMap<String, Object>();
- for (Map.Entry<String, Grouping> innerEntry : entry.getValue().entrySet()) {
- stringTargetMap.put(innerEntry.getKey(), groupingToJSONableMap(innerEntry.getValue()));
- }
- stringTargets.put(entry.getKey(), stringTargetMap);
+ Map stringTargetMap = new HashMap<String, Object>();
+ for (Map.Entry<String, Grouping> innerEntry : entry.getValue().entrySet()) {
+ stringTargetMap.put(innerEntry.getKey(), groupingToJSONableMap(innerEntry.getValue()));
+ }
+ stringTargets.put(entry.getKey(), stringTargetMap);
}
obj.put("stream->target->grouping", stringTargets);
// Convert sources to a JSON serializable format
Map<String, Map<String, Object>> stringSources = new HashMap<String, Map<String, Object>>();
for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
- GlobalStreamId gid = entry.getKey();
- Map<String, Object> stringSourceMap = stringSources.get(gid.get_componentId());
- if (stringSourceMap == null) {
- stringSourceMap = new HashMap<String, Object>();
- stringSources.put(gid.get_componentId(), stringSourceMap);
- }
- stringSourceMap.put(gid.get_streamId(), groupingToJSONableMap(entry.getValue()));
+ GlobalStreamId gid = entry.getKey();
+ Map<String, Object> stringSourceMap = stringSources.get(gid.get_componentId());
+ if (stringSourceMap == null) {
+ stringSourceMap = new HashMap<String, Object>();
+ stringSources.put(gid.get_componentId(), stringSourceMap);
+ }
+ stringSourceMap.put(gid.get_streamId(), groupingToJSONableMap(entry.getValue()));
}
obj.put("source->stream->grouping", stringSources);
return JSONValue.toJSONString(obj);
}
/*
- * Register a IMetric instance.
- * Storm will then call getValueAndReset on the metric every timeBucketSizeInSecs
- * and the returned value is sent to all metrics consumers.
- * You must call this during IBolt::prepare or ISpout::open.
+ * Register a IMetric instance. Storm will then call getValueAndReset on the metric every timeBucketSizeInSecs and the returned value is sent to all metrics
+ * consumers. You must call this during IBolt::prepare or ISpout::open.
+ *
* @return The IMetric argument unchanged.
*/
public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
- if((Boolean)_openOrPrepareWasCalled.deref() == true) {
- throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +
- "IBolt::prepare() or ISpout::open() method.");
+ if ((Boolean) _openOrPrepareWasCalled.deref() == true) {
+ throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden "
+ + "IBolt::prepare() or ISpout::open() method.");
}
if (metric == null) {
@@ -293,27 +280,27 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
}
if (timeBucketSizeInSecs <= 0) {
- throw new IllegalArgumentException("TopologyContext.registerMetric can only be called with timeBucketSizeInSecs " +
- "greater than or equal to 1 second.");
+ throw new IllegalArgumentException("TopologyContext.registerMetric can only be called with timeBucketSizeInSecs "
+ + "greater than or equal to 1 second.");
}
if (getRegisteredMetricByName(name) != null) {
- throw new RuntimeException("The same metric name `" + name + "` was registered twice." );
+ throw new RuntimeException("The same metric name `" + name + "` was registered twice.");
}
Map m1 = _registeredMetrics;
- if(!m1.containsKey(timeBucketSizeInSecs)) {
+ if (!m1.containsKey(timeBucketSizeInSecs)) {
m1.put(timeBucketSizeInSecs, new HashMap());
}
- Map m2 = (Map)m1.get(timeBucketSizeInSecs);
- if(!m2.containsKey(_taskId)) {
+ Map m2 = (Map) m1.get(timeBucketSizeInSecs);
+ if (!m2.containsKey(_taskId)) {
m2.put(_taskId, new HashMap());
}
- Map m3 = (Map)m2.get(_taskId);
- if(m3.containsKey(name)) {
- throw new RuntimeException("The same metric name `" + name + "` was registered twice." );
+ Map m3 = (Map) m2.get(_taskId);
+ if (m3.containsKey(name)) {
+ throw new RuntimeException("The same metric name `" + name + "` was registered twice.");
} else {
m3.put(name, metric);
}
@@ -322,21 +309,18 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
}
/**
- * Get component's metric from registered metrics by name.
- * Notice: Normally, one component can only register one metric name once.
- * But now registerMetric has a bug(https://issues.apache.org/jira/browse/STORM-254)
- * cause the same metric name can register twice.
- * So we just return the first metric we meet.
+ * Get component's metric from registered metrics by name. Notice: Normally, one component can only register one metric name once. But now registerMetric
+ * has a bug(https://issues.apache.org/jira/browse/STORM-254) cause the same metric name can register twice. So we just return the first metric we meet.
*/
public IMetric getRegisteredMetricByName(String name) {
IMetric metric = null;
- for (Map<Integer, Map<String, IMetric>> taskIdToNameToMetric: _registeredMetrics.values()) {
+ for (Map<Integer, Map<String, IMetric>> taskIdToNameToMetric : _registeredMetrics.values()) {
Map<String, IMetric> nameToMetric = taskIdToNameToMetric.get(_taskId);
if (nameToMetric != null) {
metric = nameToMetric.get(name);
if (metric != null) {
- //we just return the first metric we meet
+ // we just return the first metric we meet
break;
}
}
@@ -351,10 +335,21 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) {
return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs);
}
+
/*
* Convinience method for registering CombinedMetric.
*/
public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs);
}
+
+ public StormClusterState getZkCluster() {
+ return _zkCluster;
+ }
+ /*
+ * Task error report callback
+ * */
+ public void reportError(String errorMsg) throws Exception{
+ _zkCluster.report_task_error(getTopologyId(), _taskId, errorMsg, null);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/task/WorkerTopologyContext.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/task/WorkerTopologyContext.java b/jstorm-core/src/main/java/backtype/storm/task/WorkerTopologyContext.java
index de407ac..09c8c8c 100755
--- a/jstorm-core/src/main/java/backtype/storm/task/WorkerTopologyContext.java
+++ b/jstorm-core/src/main/java/backtype/storm/task/WorkerTopologyContext.java
@@ -27,34 +27,23 @@ import java.util.concurrent.ExecutorService;
public class WorkerTopologyContext extends GeneralTopologyContext {
public static final String SHARED_EXECUTOR = "executor";
-
+
private Integer _workerPort;
private List<Integer> _workerTasks;
private String _codeDir;
private String _pidDir;
Map<String, Object> _userResources;
Map<String, Object> _defaultResources;
-
- public WorkerTopologyContext(
- StormTopology topology,
- Map stormConf,
- Map<Integer, String> taskToComponent,
- Map<String, List<Integer>> componentToSortedTasks,
- Map<String, Map<String, Fields>> componentToStreamToFields,
- String stormId,
- String codeDir,
- String pidDir,
- Integer workerPort,
- List<Integer> workerTasks,
- Map<String, Object> defaultResources,
- Map<String, Object> userResources
- ) {
+
+ public WorkerTopologyContext(StormTopology topology, Map stormConf, Map<Integer, String> taskToComponent,
+ Map<String, List<Integer>> componentToSortedTasks, Map<String, Map<String, Fields>> componentToStreamToFields, String stormId, String codeDir,
+ String pidDir, Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources, Map<String, Object> userResources) {
super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId);
_codeDir = codeDir;
_defaultResources = defaultResources;
_userResources = userResources;
try {
- if(pidDir!=null) {
+ if (pidDir != null) {
_pidDir = new File(pidDir).getCanonicalPath();
} else {
_pidDir = null;
@@ -67,13 +56,12 @@ public class WorkerTopologyContext extends GeneralTopologyContext {
}
/**
- * Gets all the task ids that are running in this worker process
- * (including the task for this task).
+ * Gets all the task ids that are running in this worker process (including the task for this task).
*/
public List<Integer> getThisWorkerTasks() {
return _workerTasks;
}
-
+
public Integer getThisWorkerPort() {
return _workerPort;
}
@@ -81,28 +69,27 @@ public class WorkerTopologyContext extends GeneralTopologyContext {
public void setThisWorkerTasks(List<Integer> workerTasks) {
this._workerTasks = workerTasks;
}
+
/**
- * Gets the location of the external resources for this worker on the
- * local filesystem. These external resources typically include bolts implemented
- * in other languages, such as Ruby or Python.
+ * Gets the location of the external resources for this worker on the local filesystem. These external resources typically include bolts implemented in
+ * other languages, such as Ruby or Python.
*/
public String getCodeDir() {
return _codeDir;
}
/**
- * If this task spawns any subprocesses, those subprocesses must immediately
- * write their PID to this directory on the local filesystem to ensure that
- * Storm properly destroys that process when the worker is shutdown.
+ * If this task spawns any subprocesses, those subprocesses must immediately write their PID to this directory on the local filesystem to ensure that Storm
+ * properly destroys that process when the worker is shutdown.
*/
public String getPIDDir() {
return _pidDir;
}
-
+
public Object getResource(String name) {
return _userResources.get(name);
}
-
+
public ExecutorService getSharedExecutor() {
return (ExecutorService) _defaultResources.get(SHARED_EXECUTOR);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/AckFailDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/AckFailDelegate.java b/jstorm-core/src/main/java/backtype/storm/testing/AckFailDelegate.java
index d65c8bd..fbbcbfc 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/AckFailDelegate.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/AckFailDelegate.java
@@ -21,5 +21,6 @@ import java.io.Serializable;
public interface AckFailDelegate extends Serializable {
public void ack(Object id);
+
public void fail(Object id);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/AckFailMapTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/AckFailMapTracker.java b/jstorm-core/src/main/java/backtype/storm/testing/AckFailMapTracker.java
index e16afd8..f3feff3 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/AckFailMapTracker.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/AckFailMapTracker.java
@@ -22,31 +22,31 @@ import java.util.HashSet;
import java.util.Set;
public class AckFailMapTracker implements AckFailDelegate {
-
+
String _acked;
String _failed;
-
+
public AckFailMapTracker() {
_acked = RegisteredGlobalState.registerState(new HashSet());
_failed = RegisteredGlobalState.registerState(new HashSet());
}
-
+
public boolean isAcked(Object id) {
- return ((Set)RegisteredGlobalState.getState(_acked)).contains(id);
+ return ((Set) RegisteredGlobalState.getState(_acked)).contains(id);
}
-
+
public boolean isFailed(Object id) {
- return ((Set)RegisteredGlobalState.getState(_failed)).contains(id);
+ return ((Set) RegisteredGlobalState.getState(_failed)).contains(id);
}
@Override
public void ack(Object id) {
- ((Set)RegisteredGlobalState.getState(_acked)).add(id);
+ ((Set) RegisteredGlobalState.getState(_acked)).add(id);
}
@Override
public void fail(Object id) {
- ((Set)RegisteredGlobalState.getState(_failed)).add(id);
+ ((Set) RegisteredGlobalState.getState(_failed)).add(id);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/AckTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/AckTracker.java b/jstorm-core/src/main/java/backtype/storm/testing/AckTracker.java
index ad80475..10973f1 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/AckTracker.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/AckTracker.java
@@ -24,14 +24,14 @@ import java.util.concurrent.atomic.AtomicInteger;
public class AckTracker implements AckFailDelegate {
private static Map<String, AtomicInteger> acks = new ConcurrentHashMap<String, AtomicInteger>();
-
+
private String _id;
-
+
public AckTracker() {
_id = UUID.randomUUID().toString();
acks.put(_id, new AtomicInteger(0));
}
-
+
@Override
public void ack(Object id) {
acks.get(_id).incrementAndGet();
@@ -40,13 +40,13 @@ public class AckTracker implements AckFailDelegate {
@Override
public void fail(Object id) {
}
-
+
public int getNumAcks() {
return acks.get(_id).intValue();
}
-
+
public void resetNumAcks() {
acks.get(_id).set(0);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/BatchNumberList.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/BatchNumberList.java b/jstorm-core/src/main/java/backtype/storm/testing/BatchNumberList.java
index 26f964a..2565f25 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/BatchNumberList.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/BatchNumberList.java
@@ -37,16 +37,15 @@ public class BatchNumberList extends BaseBatchBolt {
}
String _wordComponent;
-
+
public BatchNumberList(String wordComponent) {
_wordComponent = wordComponent;
}
-
+
String word = null;
List<Integer> intSet = new ArrayList<Integer>();
BatchOutputCollector _collector;
-
-
+
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
_collector = collector;
@@ -54,7 +53,7 @@ public class BatchNumberList extends BaseBatchBolt {
@Override
public void execute(Tuple tuple) {
- if(tuple.getSourceComponent().equals(_wordComponent)) {
+ if (tuple.getSourceComponent().equals(_wordComponent)) {
this.word = tuple.getString(1);
} else {
intSet.add(tuple.getInteger(1));
@@ -63,10 +62,10 @@ public class BatchNumberList extends BaseBatchBolt {
@Override
public void finishBatch() {
- if(word!=null) {
+ if (word != null) {
Collections.sort(intSet);
_collector.emit(new Values(word, intSet));
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/BatchProcessWord.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/BatchProcessWord.java b/jstorm-core/src/main/java/backtype/storm/testing/BatchProcessWord.java
index 7f3eaf1..819c7c1 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/BatchProcessWord.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/BatchProcessWord.java
@@ -35,5 +35,5 @@ public class BatchProcessWord extends BaseBasicBolt {
public void execute(Tuple input, BasicOutputCollector collector) {
collector.emit(new Values(input.getValue(0), input.getString(1).length()));
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/BatchRepeatA.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/BatchRepeatA.java b/jstorm-core/src/main/java/backtype/storm/testing/BatchRepeatA.java
index 107f2ed..f5751d1 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/BatchRepeatA.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/BatchRepeatA.java
@@ -24,21 +24,20 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
+public class BatchRepeatA extends BaseBasicBolt {
-public class BatchRepeatA extends BaseBasicBolt {
-
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
- Object id = input.getValue(0);
- String word = input.getString(1);
- for(int i=0; i<word.length(); i++) {
- if(word.charAt(i) == 'a') {
+ Object id = input.getValue(0);
+ String word = input.getString(1);
+ for (int i = 0; i < word.length(); i++) {
+ if (word.charAt(i) == 'a') {
collector.emit("multi", new Values(id, word.substring(0, i)));
}
}
collector.emit("single", new Values(id, word));
}
-
+
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("multi", new Fields("id", "word"));
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/BoltTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/BoltTracker.java b/jstorm-core/src/main/java/backtype/storm/testing/BoltTracker.java
index 3fe4e7a..0319ebc 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/BoltTracker.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/BoltTracker.java
@@ -22,7 +22,6 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import java.util.HashMap;
import java.util.Map;
-
public class BoltTracker extends NonRichBoltTracker implements IRichBolt {
IRichBolt _richDelegate;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/CompleteTopologyParam.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/CompleteTopologyParam.java b/jstorm-core/src/main/java/backtype/storm/testing/CompleteTopologyParam.java
index f3306cf..8483413 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/CompleteTopologyParam.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/CompleteTopologyParam.java
@@ -23,65 +23,65 @@ import backtype.storm.Config;
* The param class for the <code>Testing.completeTopology</code>.
*/
public class CompleteTopologyParam {
- /**
- * The mocked spout sources
- */
- private MockedSources mockedSources;
- /**
- * the config for the topology when it was submitted to the cluster
- */
- private Config stormConf;
- /**
- * whether cleanup the state?
- */
- private Boolean cleanupState;
- /**
- * the topology name you want to submit to the cluster
- */
- private String topologyName;
+ /**
+ * The mocked spout sources
+ */
+ private MockedSources mockedSources;
+ /**
+ * the config for the topology when it was submitted to the cluster
+ */
+ private Config stormConf;
+ /**
+ * whether cleanup the state?
+ */
+ private Boolean cleanupState;
+ /**
+ * the topology name you want to submit to the cluster
+ */
+ private String topologyName;
- /**
- * the timeout of topology you want to submit to the cluster
- */
- private Integer timeoutMs;
+ /**
+ * the timeout of topology you want to submit to the cluster
+ */
+ private Integer timeoutMs;
- public MockedSources getMockedSources() {
- return mockedSources;
- }
+ public MockedSources getMockedSources() {
+ return mockedSources;
+ }
- public void setMockedSources(MockedSources mockedSources) {
- this.mockedSources = mockedSources;
- }
+ public void setMockedSources(MockedSources mockedSources) {
+ this.mockedSources = mockedSources;
+ }
- public Config getStormConf() {
- return stormConf;
- }
+ public Config getStormConf() {
+ return stormConf;
+ }
- public void setStormConf(Config stormConf) {
- this.stormConf = stormConf;
- }
+ public void setStormConf(Config stormConf) {
+ this.stormConf = stormConf;
+ }
- public Boolean getCleanupState() {
- return cleanupState;
- }
+ public Boolean getCleanupState() {
+ return cleanupState;
+ }
- public void setCleanupState(Boolean cleanupState) {
- this.cleanupState = cleanupState;
- }
+ public void setCleanupState(Boolean cleanupState) {
+ this.cleanupState = cleanupState;
+ }
- public String getTopologyName() {
- return topologyName;
- }
+ public String getTopologyName() {
+ return topologyName;
+ }
- public void setTopologyName(String topologyName) {
- this.topologyName = topologyName;
- }
+ public void setTopologyName(String topologyName) {
+ this.topologyName = topologyName;
+ }
- public Integer getTimeoutMs() {
- return timeoutMs;
- }
+ public Integer getTimeoutMs() {
+ return timeoutMs;
+ }
- public void setTimeoutMs(Integer timeoutMs) {
- this.timeoutMs = timeoutMs;
- }
+ public void setTimeoutMs(Integer timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/CountingBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/CountingBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/CountingBatchBolt.java
index 882801c..3682120 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/CountingBatchBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/CountingBatchBolt.java
@@ -30,7 +30,7 @@ public class CountingBatchBolt extends BaseBatchBolt {
BatchOutputCollector _collector;
Object _id;
int _count = 0;
-
+
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
_collector = collector;
@@ -44,12 +44,12 @@ public class CountingBatchBolt extends BaseBatchBolt {
@Override
public void finishBatch() {
- _collector.emit(new Values(_id, _count));
- }
+ _collector.emit(new Values(_id, _count));
+ }
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tx", "count"));
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/CountingCommitBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/CountingCommitBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/CountingCommitBolt.java
index cb8f7e5..a45f16b 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/CountingCommitBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/CountingCommitBolt.java
@@ -32,7 +32,7 @@ public class CountingCommitBolt extends BaseTransactionalBolt implements ICommit
BatchOutputCollector _collector;
TransactionAttempt _id;
int _count = 0;
-
+
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
_id = id;
@@ -46,8 +46,8 @@ public class CountingCommitBolt extends BaseTransactionalBolt implements ICommit
@Override
public void finishBatch() {
- _collector.emit(new Values(_id, _count));
- }
+ _collector.emit(new Values(_id, _count));
+ }
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/FeederSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/FeederSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/FeederSpout.java
index 1ffb594..52ba5b7 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/FeederSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/FeederSpout.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.UUID;
-
public class FeederSpout extends BaseRichSpout {
private int _id;
private Fields _outFields;
@@ -44,15 +43,15 @@ public class FeederSpout extends BaseRichSpout {
public void setAckFailDelegate(AckFailDelegate d) {
_ackFailDelegate = d;
}
-
+
public void feed(List<Object> tuple) {
feed(tuple, UUID.randomUUID().toString());
}
public void feed(List<Object> tuple, Object msgId) {
InprocMessaging.sendMessage(_id, new Values(tuple, msgId));
- }
-
+ }
+
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
@@ -63,10 +62,10 @@ public class FeederSpout extends BaseRichSpout {
public void nextTuple() {
List<Object> toEmit = (List<Object>) InprocMessaging.pollMessage(_id);
- if(toEmit!=null) {
+ if (toEmit != null) {
List<Object> tuple = (List<Object>) toEmit.get(0);
Object msgId = toEmit.get(1);
-
+
_collector.emit(tuple, msgId);
} else {
try {
@@ -78,13 +77,13 @@ public class FeederSpout extends BaseRichSpout {
}
public void ack(Object msgId) {
- if(_ackFailDelegate!=null) {
+ if (_ackFailDelegate != null) {
_ackFailDelegate.ack(msgId);
}
}
public void fail(Object msgId) {
- if(_ackFailDelegate!=null) {
+ if (_ackFailDelegate != null) {
_ackFailDelegate.fail(msgId);
}
}
@@ -96,5 +95,5 @@ public class FeederSpout extends BaseRichSpout {
@Override
public Map<String, Object> getComponentConfiguration() {
return new HashMap<String, Object>();
- }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/FixedTupleSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/FixedTupleSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/FixedTupleSpout.java
index 9527803..01fc3e3 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/FixedTupleSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/FixedTupleSpout.java
@@ -35,17 +35,17 @@ public class FixedTupleSpout implements IRichSpout {
private static final Map<String, Integer> failed = new HashMap<String, Integer>();
public static int getNumAcked(String stormId) {
- synchronized(acked) {
+ synchronized (acked) {
return get(acked, stormId, 0);
}
}
public static int getNumFailed(String stormId) {
- synchronized(failed) {
+ synchronized (failed) {
return get(failed, stormId, 0);
}
}
-
+
public static void clear(String stormId) {
acked.remove(stormId);
failed.remove(stormId);
@@ -67,16 +67,16 @@ public class FixedTupleSpout implements IRichSpout {
public FixedTupleSpout(List tuples, String fieldName) {
_id = UUID.randomUUID().toString();
- synchronized(acked) {
+ synchronized (acked) {
acked.put(_id, 0);
}
- synchronized(failed) {
+ synchronized (failed) {
failed.put(_id, 0);
}
_tuples = new ArrayList<FixedTuple>();
- for(Object o: tuples) {
+ for (Object o : tuples) {
FixedTuple ft;
- if(o instanceof FixedTuple) {
+ if (o instanceof FixedTuple) {
ft = (FixedTuple) o;
} else {
ft = new FixedTuple((List) o);
@@ -89,25 +89,25 @@ public class FixedTupleSpout implements IRichSpout {
public List<FixedTuple> getSourceTuples() {
return _tuples;
}
-
+
public int getCompleted() {
int ackedAmt;
int failedAmt;
-
- synchronized(acked) {
+
+ synchronized (acked) {
ackedAmt = acked.get(_id);
}
- synchronized(failed) {
+ synchronized (failed) {
failedAmt = failed.get(_id);
}
return ackedAmt + failedAmt;
}
-
+
public void cleanup() {
- synchronized(acked) {
+ synchronized (acked) {
acked.remove(_id);
- }
- synchronized(failed) {
+ }
+ synchronized (failed) {
failed.remove(_id);
}
}
@@ -116,15 +116,15 @@ public class FixedTupleSpout implements IRichSpout {
_context = context;
List<Integer> tasks = context.getComponentTasks(context.getThisComponentId());
int startIndex;
- for(startIndex=0; startIndex<tasks.size(); startIndex++) {
- if(tasks.get(startIndex)==context.getThisTaskId()) {
+ for (startIndex = 0; startIndex < tasks.size(); startIndex++) {
+ if (tasks.get(startIndex) == context.getThisTaskId()) {
break;
}
}
_collector = collector;
_pending = new HashMap<String, FixedTuple>();
_serveTuples = new ArrayList<FixedTuple>();
- for(int i=startIndex; i<_tuples.size(); i+=tasks.size()) {
+ for (int i = startIndex; i < _tuples.size(); i += tasks.size()) {
_serveTuples.add(_tuples.get(i));
}
}
@@ -133,7 +133,7 @@ public class FixedTupleSpout implements IRichSpout {
}
public void nextTuple() {
- if(_serveTuples.size()>0) {
+ if (_serveTuples.size() > 0) {
FixedTuple ft = _serveTuples.remove(0);
String id = UUID.randomUUID().toString();
_pending.put(id, ft);
@@ -144,16 +144,16 @@ public class FixedTupleSpout implements IRichSpout {
}
public void ack(Object msgId) {
- synchronized(acked) {
+ synchronized (acked) {
int curr = get(acked, _id, 0);
- acked.put(_id, curr+1);
+ acked.put(_id, curr + 1);
}
}
public void fail(Object msgId) {
- synchronized(failed) {
+ synchronized (failed) {
int curr = get(failed, _id, 0);
- failed.put(_id, curr+1);
+ failed.put(_id, curr + 1);
}
}
@@ -166,7 +166,7 @@ public class FixedTupleSpout implements IRichSpout {
}
@Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
if (_fieldName != null) {
declarer.declare(new Fields(_fieldName));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/ForwardingMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/ForwardingMetricsConsumer.java b/jstorm-core/src/main/java/backtype/storm/testing/ForwardingMetricsConsumer.java
index 010336e..5cf8830 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/ForwardingMetricsConsumer.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/ForwardingMetricsConsumer.java
@@ -51,45 +51,42 @@ public class ForwardingMetricsConsumer implements IMetricsConsumer {
@Override
public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
- String [] parts = ((String)registrationArgument).split(":",2);
+ String[] parts = ((String) registrationArgument).split(":", 2);
host = parts[0];
port = Integer.valueOf(parts[1]);
try {
- socket = new Socket(host, port);
- out = socket.getOutputStream();
+ socket = new Socket(host, port);
+ out = socket.getOutputStream();
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RuntimeException(e);
}
}
@Override
public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
StringBuilder sb = new StringBuilder();
- String header = taskInfo.timestamp + "\t" +
- taskInfo.srcWorkerHost + ":"+ taskInfo.srcWorkerPort + "\t"+
- taskInfo.srcTaskId + "\t" + taskInfo.srcComponentId + "\t";
+ String header =
+ taskInfo.timestamp + "\t" + taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort + "\t" + taskInfo.srcTaskId + "\t" + taskInfo.srcComponentId
+ + "\t";
sb.append(header);
for (DataPoint p : dataPoints) {
sb.delete(header.length(), sb.length());
- sb.append(p.name)
- .append("\t")
- .append(p.value)
- .append("\n");
+ sb.append(p.name).append("\t").append(p.value).append("\n");
try {
- out.write(sb.toString().getBytes());
- out.flush();
+ out.write(sb.toString().getBytes());
+ out.flush();
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RuntimeException(e);
}
}
}
@Override
- public void cleanup() {
- try {
- socket.close();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ public void cleanup() {
+ try {
+ socket.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/IdentityBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/IdentityBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/IdentityBolt.java
index dcad640..b951d84 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/IdentityBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/IdentityBolt.java
@@ -25,7 +25,7 @@ import backtype.storm.tuple.Tuple;
public class IdentityBolt extends BaseBasicBolt {
Fields _fields;
-
+
public IdentityBolt(Fields fields) {
_fields = fields;
}
@@ -38,5 +38,5 @@ public class IdentityBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(_fields);
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java
index 1c4d5b3..7e4b32f 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java
@@ -32,7 +32,7 @@ public class KeyedCountingBatchBolt extends BaseBatchBolt {
BatchOutputCollector _collector;
Object _id;
Map<Object, Integer> _counts = new HashMap<Object, Integer>();
-
+
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
_collector = collector;
@@ -48,14 +48,14 @@ public class KeyedCountingBatchBolt extends BaseBatchBolt {
@Override
public void finishBatch() {
- for(Object key: _counts.keySet()) {
+ for (Object key : _counts.keySet()) {
_collector.emit(new Values(_id, key, _counts.get(key)));
}
- }
+ }
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tx", "key", "count"));
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java
index 887eb4e..67225cb 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java
@@ -33,7 +33,7 @@ public class KeyedSummingBatchBolt extends BaseBatchBolt {
BatchOutputCollector _collector;
Object _id;
Map<Object, Number> _sums = new HashMap<Object, Number>();
-
+
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
_collector = collector;
@@ -43,19 +43,19 @@ public class KeyedSummingBatchBolt extends BaseBatchBolt {
@Override
public void execute(Tuple tuple) {
Object key = tuple.getValue(1);
- Number curr = Utils.get(_sums, key, 0);
+ Number curr = Utils.get(_sums, key, 0);
_sums.put(key, Numbers.add(curr, tuple.getValue(2)));
}
@Override
public void finishBatch() {
- for(Object key: _sums.keySet()) {
+ for (Object key : _sums.keySet()) {
_collector.emit(new Values(_id, key, _sums.get(key)));
}
- }
+ }
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tx", "key", "sum"));
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java
index 3b492e1..75ad375 100644
--- a/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java
@@ -35,13 +35,13 @@ import java.util.Map;
public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout<MemoryTransactionalSpoutMeta> {
public static String TX_FIELD = MemoryTransactionalSpout.class.getName() + "/id";
-
+
private String _id;
private String _finishedPartitionsId;
private int _takeAmt;
private Fields _outFields;
private Map<Integer, List<List<Object>>> _initialPartitions;
-
+
public MemoryTransactionalSpout(Map<Integer, List<List<Object>>> partitions, Fields outFields, int takeAmt) {
_id = RegisteredGlobalState.registerState(partitions);
Map<Integer, Boolean> finished = Collections.synchronizedMap(new HashMap<Integer, Boolean>());
@@ -50,17 +50,17 @@ public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout<
_outFields = outFields;
_initialPartitions = partitions;
}
-
+
public boolean isExhaustedTuples() {
Map<Integer, Boolean> statuses = getFinishedStatuses();
- for(Integer partition: getQueues().keySet()) {
- if(!statuses.containsKey(partition) || !getFinishedStatuses().get(partition)) {
+ for (Integer partition : getQueues().keySet()) {
+ if (!statuses.containsKey(partition) || !getFinishedStatuses().get(partition)) {
return false;
}
}
return true;
}
-
+
class Coordinator implements IPartitionedTransactionalSpout.Coordinator {
@Override
@@ -71,29 +71,31 @@ public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout<
@Override
public boolean isReady() {
return true;
- }
-
+ }
+
@Override
public void close() {
- }
+ }
}
-
+
class Emitter implements IPartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> {
-
+
Integer _maxSpoutPending;
Map<Integer, Integer> _emptyPartitions = new HashMap<Integer, Integer>();
-
+
public Emitter(Map conf) {
Object c = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
- if(c==null) _maxSpoutPending = 1;
- else _maxSpoutPending = Utils.getInt(c);
+ if (c == null)
+ _maxSpoutPending = 1;
+ else
+ _maxSpoutPending = Utils.getInt(c);
}
-
-
+
@Override
- public MemoryTransactionalSpoutMeta emitPartitionBatchNew(TransactionAttempt tx, BatchOutputCollector collector, int partition, MemoryTransactionalSpoutMeta lastPartitionMeta) {
+ public MemoryTransactionalSpoutMeta emitPartitionBatchNew(TransactionAttempt tx, BatchOutputCollector collector, int partition,
+ MemoryTransactionalSpoutMeta lastPartitionMeta) {
int index;
- if(lastPartitionMeta==null) {
+ if (lastPartitionMeta == null) {
index = 0;
} else {
index = lastPartitionMeta.index + lastPartitionMeta.amt;
@@ -102,40 +104,40 @@ public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout<
int total = queue.size();
int left = total - index;
int toTake = Math.min(left, _takeAmt);
-
+
MemoryTransactionalSpoutMeta ret = new MemoryTransactionalSpoutMeta(index, toTake);
emitPartitionBatch(tx, collector, partition, ret);
- if(toTake==0) {
+ if (toTake == 0) {
// this is a pretty hacky way to determine when all the partitions have been committed
// wait until we've emitted max-spout-pending empty partitions for the partition
int curr = Utils.get(_emptyPartitions, partition, 0) + 1;
_emptyPartitions.put(partition, curr);
- if(curr > _maxSpoutPending) {
+ if (curr > _maxSpoutPending) {
Map<Integer, Boolean> finishedStatuses = getFinishedStatuses();
// will be null in remote mode
- if(finishedStatuses!=null) {
+ if (finishedStatuses != null) {
finishedStatuses.put(partition, true);
}
}
}
- return ret;
+ return ret;
}
@Override
public void emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, MemoryTransactionalSpoutMeta partitionMeta) {
List<List<Object>> queue = getQueues().get(partition);
- for(int i=partitionMeta.index; i < partitionMeta.index + partitionMeta.amt; i++) {
+ for (int i = partitionMeta.index; i < partitionMeta.index + partitionMeta.amt; i++) {
List<Object> toEmit = new ArrayList<Object>(queue.get(i));
toEmit.add(0, tx);
- collector.emit(toEmit);
+ collector.emit(toEmit);
}
}
-
+
@Override
public void close() {
- }
- }
-
+ }
+ }
+
@Override
public IPartitionedTransactionalSpout.Coordinator getCoordinator(Map conf, TopologyContext context) {
return new Coordinator();
@@ -159,22 +161,24 @@ public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout<
conf.registerSerialization(MemoryTransactionalSpoutMeta.class);
return conf;
}
-
+
public void startup() {
getFinishedStatuses().clear();
}
-
+
public void cleanup() {
RegisteredGlobalState.clearState(_id);
RegisteredGlobalState.clearState(_finishedPartitionsId);
}
-
- private Map<Integer, List<List<Object>>> getQueues() {
+
+ private Map<Integer, List<List<Object>>> getQueues() {
Map<Integer, List<List<Object>>> ret = (Map<Integer, List<List<Object>>>) RegisteredGlobalState.getState(_id);
- if(ret!=null) return ret;
- else return _initialPartitions;
+ if (ret != null)
+ return ret;
+ else
+ return _initialPartitions;
}
-
+
private Map<Integer, Boolean> getFinishedStatuses() {
return (Map<Integer, Boolean>) RegisteredGlobalState.getState(_finishedPartitionsId);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java b/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java
index 29681fb..a00788d 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java
@@ -20,12 +20,12 @@ package backtype.storm.testing;
public class MemoryTransactionalSpoutMeta {
int index;
int amt;
-
+
// for kryo compatibility
public MemoryTransactionalSpoutMeta() {
-
+
}
-
+
public MemoryTransactionalSpoutMeta(int index, int amt) {
this.index = index;
this.amt = amt;
@@ -34,5 +34,5 @@ public class MemoryTransactionalSpoutMeta {
@Override
public String toString() {
return "index: " + index + "; amt: " + amt;
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/MkClusterParam.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/MkClusterParam.java b/jstorm-core/src/main/java/backtype/storm/testing/MkClusterParam.java
index cd677c8..d325377 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/MkClusterParam.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/MkClusterParam.java
@@ -23,35 +23,40 @@ import java.util.Map;
* The param arg for <code>Testing.withSimulatedTimeCluster</code> and <code>Testing.withTrackedCluster</code>
*/
public class MkClusterParam {
- /**
- * count of supervisors for the cluster.
- */
- private Integer supervisors;
- /**
- * count of port for each supervisor
- */
- private Integer portsPerSupervisor;
- /**
- * cluster config
- */
- private Map daemonConf;
-
- public Integer getSupervisors() {
- return supervisors;
- }
- public void setSupervisors(Integer supervisors) {
- this.supervisors = supervisors;
- }
- public Integer getPortsPerSupervisor() {
- return portsPerSupervisor;
- }
- public void setPortsPerSupervisor(Integer portsPerSupervisor) {
- this.portsPerSupervisor = portsPerSupervisor;
- }
- public Map getDaemonConf() {
- return daemonConf;
- }
- public void setDaemonConf(Map daemonConf) {
- this.daemonConf = daemonConf;
- }
+ /**
+ * count of supervisors for the cluster.
+ */
+ private Integer supervisors;
+ /**
+ * count of port for each supervisor
+ */
+ private Integer portsPerSupervisor;
+ /**
+ * cluster config
+ */
+ private Map daemonConf;
+
+ public Integer getSupervisors() {
+ return supervisors;
+ }
+
+ public void setSupervisors(Integer supervisors) {
+ this.supervisors = supervisors;
+ }
+
+ public Integer getPortsPerSupervisor() {
+ return portsPerSupervisor;
+ }
+
+ public void setPortsPerSupervisor(Integer portsPerSupervisor) {
+ this.portsPerSupervisor = portsPerSupervisor;
+ }
+
+ public Map getDaemonConf() {
+ return daemonConf;
+ }
+
+ public void setDaemonConf(Map daemonConf) {
+ this.daemonConf = daemonConf;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/MkTupleParam.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/MkTupleParam.java b/jstorm-core/src/main/java/backtype/storm/testing/MkTupleParam.java
index 34a8c68..a8a4bdf 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/MkTupleParam.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/MkTupleParam.java
@@ -21,31 +21,34 @@ import java.util.ArrayList;
import java.util.List;
public class MkTupleParam {
- private String stream;
- private String component;
- private List<String> fields;
-
- public String getStream() {
- return stream;
- }
- public void setStream(String stream) {
- this.stream = stream;
- }
-
- public String getComponent() {
- return component;
- }
- public void setComponent(String component) {
- this.component = component;
- }
-
- public List<String> getFields() {
- return fields;
- }
- public void setFields(String... fields) {
- this.fields = new ArrayList<String>();
- for (int i = 0; i < fields.length; i++) {
- this.fields.add(fields[i]);
- }
- }
+ private String stream;
+ private String component;
+ private List<String> fields;
+
+ public String getStream() {
+ return stream;
+ }
+
+ public void setStream(String stream) {
+ this.stream = stream;
+ }
+
+ public String getComponent() {
+ return component;
+ }
+
+ public void setComponent(String component) {
+ this.component = component;
+ }
+
+ public List<String> getFields() {
+ return fields;
+ }
+
+ public void setFields(String... fields) {
+ this.fields = new ArrayList<String>();
+ for (int i = 0; i < fields.length; i++) {
+ this.fields.add(fields[i]);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/MockedSources.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/MockedSources.java b/jstorm-core/src/main/java/backtype/storm/testing/MockedSources.java
index 1fd6b85..48b9ac0 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/MockedSources.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/MockedSources.java
@@ -26,11 +26,11 @@ import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class MockedSources {
- /**
- * mocked spout sources for the [spout, stream] pair.
- */
+ /**
+ * mocked spout sources for the [spout, stream] pair.
+ */
private Map<String, List<FixedTuple>> data = new HashMap<String, List<FixedTuple>>();
-
+
/**
* add mock data for the spout.
*
@@ -42,18 +42,18 @@ public class MockedSources {
if (!data.containsKey(spoutId)) {
data.put(spoutId, new ArrayList<FixedTuple>());
}
-
+
List<FixedTuple> tuples = data.get(spoutId);
for (int i = 0; i < valueses.length; i++) {
FixedTuple tuple = new FixedTuple(streamId, valueses[i]);
tuples.add(tuple);
}
}
-
+
public void addMockData(String spoutId, Values... valueses) {
this.addMockData(spoutId, Utils.DEFAULT_STREAM_ID, valueses);
}
-
+
public Map<String, List<FixedTuple>> getData() {
return this.data;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/NGrouping.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/NGrouping.java b/jstorm-core/src/main/java/backtype/storm/testing/NGrouping.java
index 785ed92..9e7363c 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/NGrouping.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/NGrouping.java
@@ -27,17 +27,17 @@ import java.util.List;
public class NGrouping implements CustomStreamGrouping {
int _n;
List<Integer> _outTasks;
-
+
public NGrouping(int n) {
_n = n;
}
-
+
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
targetTasks = new ArrayList<Integer>(targetTasks);
Collections.sort(targetTasks);
_outTasks = new ArrayList<Integer>();
- for(int i=0; i<_n; i++) {
+ for (int i = 0; i < _n; i++) {
_outTasks.add(targetTasks.get(i));
}
}
@@ -46,5 +46,5 @@ public class NGrouping implements CustomStreamGrouping {
public List<Integer> chooseTasks(int taskId, List<Object> values) {
return _outTasks;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/NonRichBoltTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/NonRichBoltTracker.java b/jstorm-core/src/main/java/backtype/storm/testing/NonRichBoltTracker.java
index ccbb67f..b489289 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/NonRichBoltTracker.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/NonRichBoltTracker.java
@@ -25,7 +25,6 @@ import backtype.storm.utils.RegisteredGlobalState;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-
public class NonRichBoltTracker implements IBolt {
IBolt _delegate;
String _trackId;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java
index 1ff01b9..0c91d2b 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java
@@ -37,34 +37,34 @@ import java.util.Map;
*/
public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransactionalSpout<MemoryTransactionalSpoutMeta> {
public static String TX_FIELD = MemoryTransactionalSpout.class.getName() + "/id";
-
+
private String _id;
private String _finishedPartitionsId;
private String _disabledId;
private int _takeAmt;
private Fields _outFields;
-
+
public OpaqueMemoryTransactionalSpout(Map<Integer, List<List<Object>>> partitions, Fields outFields, int takeAmt) {
_id = RegisteredGlobalState.registerState(partitions);
-
+
Map<Integer, Boolean> finished = Collections.synchronizedMap(new HashMap<Integer, Boolean>());
_finishedPartitionsId = RegisteredGlobalState.registerState(finished);
-
+
Map<Integer, Boolean> disabled = Collections.synchronizedMap(new HashMap<Integer, Boolean>());
_disabledId = RegisteredGlobalState.registerState(disabled);
-
+
_takeAmt = takeAmt;
_outFields = outFields;
}
-
+
public void setDisabled(Integer partition, boolean disabled) {
getDisabledStatuses().put(partition, disabled);
}
-
+
public boolean isExhaustedTuples() {
Map<Integer, Boolean> statuses = getFinishedStatuses();
- for(Integer partition: getQueues().keySet()) {
- if(!statuses.containsKey(partition) || !getFinishedStatuses().get(partition)) {
+ for (Integer partition : getQueues().keySet()) {
+ if (!statuses.containsKey(partition) || !getFinishedStatuses().get(partition)) {
return false;
}
}
@@ -80,7 +80,7 @@ public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransac
public IOpaquePartitionedTransactionalSpout.Coordinator getCoordinator(Map conf, TopologyContext context) {
return new Coordinator();
}
-
+
class Coordinator implements IOpaquePartitionedTransactionalSpout.Coordinator {
@Override
public boolean isReady() {
@@ -91,24 +91,26 @@ public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransac
public void close() {
}
}
-
+
class Emitter implements IOpaquePartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> {
-
+
Integer _maxSpoutPending;
Map<Integer, Integer> _emptyPartitions = new HashMap<Integer, Integer>();
-
+
public Emitter(Map conf) {
Object c = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
- if(c==null) _maxSpoutPending = 1;
- else _maxSpoutPending = Utils.getInt(c);
+ if (c == null)
+ _maxSpoutPending = 1;
+ else
+ _maxSpoutPending = Utils.getInt(c);
}
-
-
+
@Override
- public MemoryTransactionalSpoutMeta emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, MemoryTransactionalSpoutMeta lastPartitionMeta) {
- if(!Boolean.FALSE.equals(getDisabledStatuses().get(partition))) {
+ public MemoryTransactionalSpoutMeta emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition,
+ MemoryTransactionalSpoutMeta lastPartitionMeta) {
+ if (!Boolean.FALSE.equals(getDisabledStatuses().get(partition))) {
int index;
- if(lastPartitionMeta==null) {
+ if (lastPartitionMeta == null) {
index = 0;
} else {
index = lastPartitionMeta.index + lastPartitionMeta.amt;
@@ -119,26 +121,26 @@ public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransac
int toTake = Math.min(left, _takeAmt);
MemoryTransactionalSpoutMeta ret = new MemoryTransactionalSpoutMeta(index, toTake);
- for(int i=ret.index; i < ret.index + ret.amt; i++) {
+ for (int i = ret.index; i < ret.index + ret.amt; i++) {
List<Object> toEmit = new ArrayList<Object>(queue.get(i));
toEmit.add(0, tx);
- collector.emit(toEmit);
+ collector.emit(toEmit);
}
- if(toTake==0) {
+ if (toTake == 0) {
// this is a pretty hacky way to determine when all the partitions have been committed
// wait until we've emitted max-spout-pending empty partitions for the partition
int curr = Utils.get(_emptyPartitions, partition, 0) + 1;
_emptyPartitions.put(partition, curr);
- if(curr > _maxSpoutPending) {
+ if (curr > _maxSpoutPending) {
getFinishedStatuses().put(partition, true);
}
}
- return ret;
+ return ret;
} else {
return null;
}
}
-
+
@Override
public void close() {
}
@@ -147,7 +149,7 @@ public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransac
public int numPartitions() {
return getQueues().size();
}
- }
+ }
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
@@ -162,20 +164,20 @@ public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransac
conf.registerSerialization(MemoryTransactionalSpoutMeta.class);
return conf;
}
-
+
public void startup() {
getFinishedStatuses().clear();
}
-
+
public void cleanup() {
RegisteredGlobalState.clearState(_id);
RegisteredGlobalState.clearState(_finishedPartitionsId);
}
-
+
private Map<Integer, List<List<Object>>> getQueues() {
return (Map<Integer, List<List<Object>>>) RegisteredGlobalState.getState(_id);
}
-
+
private Map<Integer, Boolean> getFinishedStatuses() {
return (Map<Integer, Boolean>) RegisteredGlobalState.getState(_finishedPartitionsId);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/PrepareBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/PrepareBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/PrepareBatchBolt.java
index 0bd9833..e9e2a9d 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/PrepareBatchBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/PrepareBatchBolt.java
@@ -26,10 +26,9 @@ import backtype.storm.utils.Utils;
import java.util.ArrayList;
import java.util.List;
-
public class PrepareBatchBolt extends BaseBasicBolt {
Fields _outFields;
-
+
public PrepareBatchBolt(Fields outFields) {
_outFields = outFields;
}
@@ -47,6 +46,5 @@ public class PrepareBatchBolt extends BaseBasicBolt {
toEmit.addAll(input.getValues());
collector.emit(toEmit);
}
-
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsBolt.java
index 4b85ce8..fd41283 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsBolt.java
@@ -27,23 +27,23 @@ import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
public class PythonShellMetricsBolt extends ShellBolt implements IRichBolt {
- private static final long serialVersionUID = 1999209252187463355L;
-
- public PythonShellMetricsBolt(String[] command) {
- super(command);
- }
-
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- super.prepare(stormConf, context, collector);
-
- CountShellMetric cMetric = new CountShellMetric();
- context.registerMetric("my-custom-shell-metric", cMetric, 5);
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
-
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
+ private static final long serialVersionUID = 1999209252187463355L;
+
+ public PythonShellMetricsBolt(String[] command) {
+ super(command);
+ }
+
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ super.prepare(stormConf, context, collector);
+
+ CountShellMetric cMetric = new CountShellMetric();
+ context.registerMetric("my-custom-shell-metric", cMetric, 5);
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsSpout.java
index 3ccf935..8325fba 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsSpout.java
@@ -28,25 +28,25 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
public class PythonShellMetricsSpout extends ShellSpout implements IRichSpout {
- private static final long serialVersionUID = 1999209252187463355L;
-
- public PythonShellMetricsSpout(String[] command) {
- super(command);
- }
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- super.open(conf, context, collector);
-
- CountShellMetric cMetric = new CountShellMetric();
- context.registerMetric("my-custom-shellspout-metric", cMetric, 5);
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("field1"));
- }
-
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
+ private static final long serialVersionUID = 1999209252187463355L;
+
+ public PythonShellMetricsSpout(String[] command) {
+ super(command);
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ super.open(conf, context, collector);
+
+ CountShellMetric cMetric = new CountShellMetric();
+ context.registerMetric("my-custom-shellspout-metric", cMetric, 5);
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("field1"));
+ }
+
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/SingleUserSimpleTransport.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/SingleUserSimpleTransport.java b/jstorm-core/src/main/java/backtype/storm/testing/SingleUserSimpleTransport.java
index 4d25ac7..cf9dc4d 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/SingleUserSimpleTransport.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/SingleUserSimpleTransport.java
@@ -23,15 +23,19 @@ import javax.security.auth.Subject;
import java.security.Principal;
import java.util.HashSet;
-
public class SingleUserSimpleTransport extends SimpleTransportPlugin {
- @Override
- protected Subject getDefaultSubject() {
- HashSet<Principal> principals = new HashSet<Principal>();
- principals.add(new Principal() {
- public String getName() { return "user"; }
- public String toString() { return "user"; }
- });
- return new Subject(true, principals, new HashSet<Object>(), new HashSet<Object>());
- }
+ @Override
+ protected Subject getDefaultSubject() {
+ HashSet<Principal> principals = new HashSet<Principal>();
+ principals.add(new Principal() {
+ public String getName() {
+ return "user";
+ }
+
+ public String toString() {
+ return "user";
+ }
+ });
+ return new Subject(true, principals, new HashSet<Object>(), new HashSet<Object>());
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/SpoutTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/SpoutTracker.java b/jstorm-core/src/main/java/backtype/storm/testing/SpoutTracker.java
index 75ba2b8..369e661 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/SpoutTracker.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/SpoutTracker.java
@@ -28,13 +28,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-
public class SpoutTracker extends BaseRichSpout {
IRichSpout _delegate;
SpoutTrackOutputCollector _tracker;
String _trackId;
-
private class SpoutTrackOutputCollector implements ISpoutOutputCollector {
public int transferred = 0;
public int emitted = 0;
@@ -43,11 +41,11 @@ public class SpoutTracker extends BaseRichSpout {
public SpoutTrackOutputCollector(SpoutOutputCollector collector) {
_collector = collector;
}
-
+
private void recordSpoutEmit() {
Map stats = (Map) RegisteredGlobalState.getState(_trackId);
((AtomicInteger) stats.get("spout-emitted")).incrementAndGet();
-
+
}
public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
@@ -63,11 +61,10 @@ public class SpoutTracker extends BaseRichSpout {
@Override
public void reportError(Throwable error) {
- _collector.reportError(error);
+ _collector.reportError(error);
}
}
-
public SpoutTracker(IRichSpout delegate, String trackId) {
_delegate = delegate;
_trackId = trackId;
@@ -95,7 +92,7 @@ public class SpoutTracker extends BaseRichSpout {
public void fail(Object msgId) {
_delegate.fail(msgId);
Map stats = (Map) RegisteredGlobalState.getState(_trackId);
- ((AtomicInteger) stats.get("processed")).incrementAndGet();
+ ((AtomicInteger) stats.get("processed")).incrementAndGet();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestAggregatesCounter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestAggregatesCounter.java b/jstorm-core/src/main/java/backtype/storm/testing/TestAggregatesCounter.java
index e8c0a61..76b6874 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestAggregatesCounter.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestAggregatesCounter.java
@@ -29,7 +29,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static backtype.storm.utils.Utils.tuple;
-
public class TestAggregatesCounter extends BaseRichBolt {
public static Logger LOG = LoggerFactory.getLogger(TestWordCounter.class);
@@ -46,8 +45,8 @@ public class TestAggregatesCounter extends BaseRichBolt {
int count = (Integer) input.getValues().get(1);
_counts.put(word, count);
int globalCount = 0;
- for(String w: _counts.keySet()) {
- globalCount+=_counts.get(w);
+ for (String w : _counts.keySet()) {
+ globalCount += _counts.get(w);
}
_collector.emit(tuple(globalCount));
_collector.ack(input);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestConfBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestConfBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/TestConfBolt.java
index 5790fb3..634cbe1 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestConfBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestConfBolt.java
@@ -26,7 +26,6 @@ import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
-
public class TestConfBolt extends BaseBasicBolt {
Map<String, Object> _componentConf;
Map<String, Object> _conf;
@@ -34,16 +33,16 @@ public class TestConfBolt extends BaseBasicBolt {
public TestConfBolt() {
this(null);
}
-
+
public TestConfBolt(Map<String, Object> componentConf) {
_componentConf = componentConf;
- }
+ }
@Override
public void prepare(Map conf, TopologyContext context) {
_conf = conf;
- }
-
+ }
+
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("conf", "value"));
@@ -58,5 +57,5 @@ public class TestConfBolt extends BaseBasicBolt {
@Override
public Map<String, Object> getComponentConfiguration() {
return _componentConf;
- }
+ }
}