You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/07/13 16:34:59 UTC

[1/4] storm git commit: Minor type cleanup things in storm.task.TopologyContext.java

Repository: storm
Updated Branches:
  refs/heads/master f63b1ef53 -> 98cbb34f0


Minor type cleanup things in storm.task.TopologyContext.java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1912eaf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1912eaf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1912eaf

Branch: refs/heads/master
Commit: a1912eafab7f4313c7f45f27748fff28cb37d8f0
Parents: 30df970
Author: Dan Blanchard <da...@parsely.com>
Authored: Mon Jul 6 16:45:29 2015 -0400
Committer: Dan Blanchard <da...@parsely.com>
Committed: Tue Jul 7 10:16:05 2015 -0400

----------------------------------------------------------------------
 .../backtype/storm/task/TopologyContext.java    | 38 ++++++++++----------
 1 file changed, 20 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a1912eaf/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index 8124651..b3c6d22 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -29,6 +29,7 @@ import backtype.storm.metric.api.CombinedMetric;
 import backtype.storm.state.ISubscribedState;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -36,6 +37,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.commons.lang.NotImplementedException;
 import org.json.simple.JSONValue;
 
@@ -61,7 +63,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
             Map<String, Map<String, Fields>> componentToStreamToFields,
             String stormId, String codeDir, String pidDir, Integer taskId,
             Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources,
-            Map<String, Object> userResources, Map<String, Object> executorData, Map registeredMetrics,
+            Map<String, Object> userResources, Map<String, Object> executorData, Map<Integer, Map<Integer, Map<String, IMetric>>> registeredMetrics,
             clojure.lang.Atom openOrPrepareWasCalled) {
         super(topology, stormConf, taskToComponent, componentToSortedTasks,
                 componentToStreamToFields, stormId, codeDir, pidDir,
@@ -154,11 +156,11 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
 	}
 
 	/**
-	 * Gets the declared output fields for the specified stream id for the
+	 * Gets the declared output fields for all streams for the
 	 * component this task is a part of.
 	 */
 	public Map<String, List<String>> getThisOutputFieldsForStreams() {
-		Map<String, List<String>> streamToFields = new HashMap<String, List<String>>();
+		Map<String, List<String>> streamToFields = new HashMap<>();
 		for (String stream : this.getThisStreams()) {
 			streamToFields.put(stream, this.getThisOutputFields(stream).toList());
 		}
@@ -178,7 +180,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
      * accesses which resource in a distributed resource to ensure an even distribution.
      */
     public int getThisTaskIndex() {
-        List<Integer> tasks = new ArrayList<Integer>(getComponentTasks(getThisComponentId()));
+        List<Integer> tasks = new ArrayList<>(getComponentTasks(getThisComponentId()));
         Collections.sort(tasks);
         for(int i=0; i<tasks.size(); i++) {
             if(tasks.get(i) == getThisTaskId()) {
@@ -232,7 +234,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     }
 
 	private static Map<String, Object> groupingToJSONableMap(Grouping grouping) {
-		Map groupingMap = new HashMap<String, Object>();
+		Map<String, Object> groupingMap = new HashMap<>();
 		groupingMap.put("type", grouping.getSetField().toString());
 		if (grouping.is_set_fields()) {
 			groupingMap.put("fields", grouping.get_fields());
@@ -242,18 +244,18 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     
     @Override
     public String toJSONString() {
-        Map obj = new HashMap();
+        Map<String, Object> obj = new HashMap<>();
         obj.put("task->component", this.getTaskToComponent());
         obj.put("taskid", this.getThisTaskId());
         obj.put("componentid", this.getThisComponentId());
-        List<String> streamList = new ArrayList<String>();
+        List<String> streamList = new ArrayList<>();
         streamList.addAll(this.getThisStreams());
         obj.put("streams", streamList);
         obj.put("stream->outputfields", this.getThisOutputFieldsForStreams());
         // Convert targets to a JSON serializable format
-        Map<String, Map> stringTargets = new HashMap<String, Map>();
+        Map<String, Map<String, Object>> stringTargets = new HashMap<>();
         for (Map.Entry<String, Map<String, Grouping>> entry : this.getThisTargets().entrySet()) {
-        	Map stringTargetMap = new HashMap<String, Object>();
+        	Map<String, Object> stringTargetMap = new HashMap<>();
         	for (Map.Entry<String, Grouping> innerEntry : entry.getValue().entrySet()) {
         		stringTargetMap.put(innerEntry.getKey(), groupingToJSONableMap(innerEntry.getValue()));
         	}
@@ -261,12 +263,12 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
         }
         obj.put("stream->target->grouping", stringTargets);
         // Convert sources to a JSON serializable format
-        Map<String, Map<String, Object>> stringSources = new HashMap<String, Map<String, Object>>();
+        Map<String, Map<String, Object>> stringSources = new HashMap<>();
         for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
         	GlobalStreamId gid = entry.getKey();
         	Map<String, Object> stringSourceMap = stringSources.get(gid.get_componentId());
         	if (stringSourceMap == null) {
-        		stringSourceMap = new HashMap<String, Object>();
+        		stringSourceMap = new HashMap<>();
         		stringSources.put(gid.get_componentId(), stringSourceMap);
         	}
         	stringSourceMap.put(gid.get_streamId(), groupingToJSONableMap(entry.getValue()));        	
@@ -301,17 +303,17 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
             throw new RuntimeException("The same metric name `" + name + "` was registered twice." );
         }
 
-        Map m1 = _registeredMetrics;
+        Map<Integer, Map<Integer, Map<String, IMetric>>> m1 = _registeredMetrics;
         if(!m1.containsKey(timeBucketSizeInSecs)) {
-            m1.put(timeBucketSizeInSecs, new HashMap());
+            m1.put(timeBucketSizeInSecs, new HashMap<Integer, Map<String, IMetric>>());
         }
 
-        Map m2 = (Map)m1.get(timeBucketSizeInSecs);
+        Map<Integer, Map<String, IMetric>> m2 = m1.get(timeBucketSizeInSecs);
         if(!m2.containsKey(_taskId)) {
-            m2.put(_taskId, new HashMap());
+            m2.put(_taskId, new HashMap<String, IMetric>());
         }
 
-        Map m3 = (Map)m2.get(_taskId);
+        Map<String, IMetric> m3 = m2.get(_taskId);
         if(m3.containsKey(name)) {
             throw new RuntimeException("The same metric name `" + name + "` was registered twice." );
         } else {
@@ -346,13 +348,13 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     }
 
     /*
-     * Convinience method for registering ReducedMetric.
+     * Convenience method for registering ReducedMetric.
      */
     public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) {
         return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs);
     }
     /*
-     * Convinience method for registering CombinedMetric.
+     * Convenience method for registering CombinedMetric.
      */
     public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
         return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs);


[2/4] storm git commit: Add source->stream->fields mapping to multi-lang handshake.

Posted by ka...@apache.org.
Add source->stream->fields mapping to multi-lang handshake.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/427cef54
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/427cef54
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/427cef54

Branch: refs/heads/master
Commit: 427cef54c74204a257bc97a3e8fb793854603aaf
Parents: a1912ea
Author: Dan Blanchard <da...@parsely.com>
Authored: Tue Jul 7 10:25:58 2015 -0400
Committer: Dan Blanchard <da...@parsely.com>
Committed: Tue Jul 7 10:25:58 2015 -0400

----------------------------------------------------------------------
 docs/documentation/Multilang-protocol.md        |  9 ++++++--
 .../backtype/storm/task/TopologyContext.java    | 23 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/427cef54/docs/documentation/Multilang-protocol.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Multilang-protocol.md b/docs/documentation/Multilang-protocol.md
index 017ad32..2a90059 100644
--- a/docs/documentation/Multilang-protocol.md
+++ b/docs/documentation/Multilang-protocol.md
@@ -66,7 +66,7 @@ The initial handshake is the same for both types of shell components:
             "4": "example-bolt2"
         },
         "taskid": 3,
-        // Everything below this line is only available in Storm 0.11.0+
+        // Everything below this line is only available in Storm 0.10.0+
         "componentid": "example-bolt"
         "stream->target->grouping": {
         	"default": {
@@ -82,6 +82,11 @@ The initial handshake is the same for both types of shell components:
 	    		}
 	    	}
 	    }
+	    "source->stream->fields": {
+	    	"example-spout": {
+	    		"default": ["word"]
+	    	}
+	    }
 	}
 }
 ```
@@ -90,7 +95,7 @@ Your script should create an empty file named with its PID in this directory. e.
 the PID is 1234, so an empty file named 1234 is created in the directory. This
 file lets the supervisor know the PID so it can shutdown the process later on.
 
-As of Storm 0.11.0, the context sent by Storm to shell components has been
+As of Storm 0.10.0, the context sent by Storm to shell components has been
 enhanced substantially to include all aspects of the topology context available
 to JVM components.  One key addition is the ability to determine a shell
 component's source and targets (i.e., inputs and outputs) in the topology via

http://git-wip-us.apache.org/repos/asf/storm/blob/427cef54/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index b3c6d22..cefa207 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -191,6 +191,28 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     }
 
     /**
+     * Gets the declared input fields for this component.
+     *
+     * @return A map from sources to streams to fields.
+     */
+    public Map<String, Map<String, List<String>>> getThisInputFields() {
+    	Map<String, Map<String, List<String>>> outputMap = new HashMap<>();
+        for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
+        	String componentId = entry.getKey().get_componentId();
+        	Set<String> streams = getComponentStreams(componentId);
+        	for (String stream : streams) {
+        		Map<String, List<String>> streamFieldMap = outputMap.get(componentId);
+        		if (streamFieldMap == null) {
+        			streamFieldMap = new HashMap<>();
+        			outputMap.put(componentId, streamFieldMap);
+        		}
+        		streamFieldMap.put(stream, getComponentOutputFields(componentId, stream).toList());
+        	}
+        }
+        return outputMap;
+    }
+
+    /**
      * Gets the declared inputs to this component.
      *
      * @return A map from subscribed component/stream to the grouping subscribed with.
@@ -274,6 +296,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
         	stringSourceMap.put(gid.get_streamId(), groupingToJSONableMap(entry.getValue()));        	
         }
         obj.put("source->stream->grouping", stringSources);
+        obj.put("source->stream->fields", this.getThisInputFields());
         return JSONValue.toJSONString(obj);
     }
 


[4/4] storm git commit: add STORM-928 to CHANGELOG.md

Posted by ka...@apache.org.
add STORM-928 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/98cbb34f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/98cbb34f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/98cbb34f

Branch: refs/heads/master
Commit: 98cbb34f01dd829fcdf3ea931d12c50c7f88cf98
Parents: b1388c6
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Jul 13 23:33:25 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Jul 13 23:33:25 2015 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/98cbb34f/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 306eb1e..1ab08c8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -15,6 +15,7 @@
  * STORM-843: [storm-redis] Add Javadoc to storm-redis
  * STORM-866: Use storm.log.dir instead of storm.home in log4j2 config
  * STORM-810: PartitionManager in storm-kafka should commit latest offset before close
+ * STORM-928: Add sources->streams->fields map to Multi-Lang Handshake
 
 ## 0.10.0-beta1
  * STORM-873: Flux does not handle diamond topologies


[3/4] storm git commit: Merge branch 'master' of https://github.com/dan-blanchard/incubator-storm

Posted by ka...@apache.org.
Merge branch 'master' of https://github.com/dan-blanchard/incubator-storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b1388c6d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b1388c6d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b1388c6d

Branch: refs/heads/master
Commit: b1388c6dd3a999ccfdb17390fc3033ff4451783e
Parents: f63b1ef 427cef5
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Jul 13 23:07:26 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Jul 13 23:07:26 2015 +0900

----------------------------------------------------------------------
 docs/documentation/Multilang-protocol.md        |  9 ++-
 .../backtype/storm/task/TopologyContext.java    | 61 ++++++++++++++------
 2 files changed, 50 insertions(+), 20 deletions(-)
----------------------------------------------------------------------