You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/15 20:56:27 UTC

[01/12] storm git commit: Send more topology context to Multi-Lang components via initial handshake.

Repository: storm
Updated Branches:
  refs/heads/0.10.x-branch 22fe79712 -> 208ddbeff


Send more topology context to Multi-Lang components via initial handshake.

This commit adds the following keys to the "context" JSON dictionary sent to
multi-lang tasks as part of their handshake with ShellBolt/ShellSpout:

-  componentid: the component ID for this task, so you don't have to look it up
                in "task->component" by "taskid"
-  streams: a list of all of the streams for this task
-  stream->outputfields:  a mapping from stream names to output fields for that
                          stream
-  stream->target->grouping: a mapping from stream names to the targets for this
                             task to the kind of grouping they use.
-  sources->grouping:  a mapping from the component IDs of the sources to their
                       grouping.

Note on groupings:  groupings are either represented as a string (e.g., "
                    SHUFFLE") or a list of strings for field groupings.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5fad1b1b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5fad1b1b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5fad1b1b

Branch: refs/heads/0.10.x-branch
Commit: 5fad1b1bb206a03a45d253711db46c8ee7258a74
Parents: 22fe797
Author: Dan Blanchard <da...@parsely.com>
Authored: Thu Apr 16 16:15:32 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:03:44 2015 -0400

----------------------------------------------------------------------
 .../backtype/storm/task/TopologyContext.java    | 84 +++++++++++++++-----
 1 file changed, 62 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5fad1b1b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index 4cac3b6..4855f4e 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -55,7 +55,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     private Map<Integer,Map<Integer, Map<String, IMetric>>> _registeredMetrics;
     private clojure.lang.Atom _openOrPrepareWasCalled;
 
-    
+
     public TopologyContext(StormTopology topology, Map stormConf,
             Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
             Map<String, Map<String, Fields>> componentToStreamToFields,
@@ -75,7 +75,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     /**
      * All state from all subscribed state spouts streams will be synced with
      * the provided object.
-     * 
+     *
      * <p>It is recommended that your ISubscribedState object is kept as an instance
      * variable of this object. The recommended usage of this method is as follows:</p>
      *
@@ -129,7 +129,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
 
     /**
      * Gets the task id of this task.
-     * 
+     *
      * @return the task id
      */
     public int getThisTaskId() {
@@ -154,6 +154,18 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     }
 
     /**
+     * Gets the declared output fields for the specified stream id for the component
+     * this task is a part of.
+     */
+    public Map<String, List<String>> getThisOutputFieldsForStreams() {
+    	Map<String, List<String>> streamToFields = new HashMap<String, List<String>>();
+    	for (String stream : this.getThisStreams()) {
+    		streamToFields.put(stream, this.getThisOutputFields(stream).toList());
+    	}
+    	return streamToFields;
+    }
+
+    /**
      * Gets the set of streams declared for the component of this task.
      */
     public Set<String> getThisStreams() {
@@ -175,10 +187,10 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
         }
         throw new RuntimeException("Fatal: could not find this task id in this component");
     }
-    
+
     /**
      * Gets the declared inputs to this component.
-     * 
+     *
      * @return A map from subscribed component/stream to the grouping subscribed with.
      */
     public Map<GlobalStreamId, Grouping> getThisSources() {
@@ -193,11 +205,11 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     public Map<String, Map<String, Grouping>> getThisTargets() {
         return getTargets(getThisComponentId());
     }
-    
+
     public void setTaskData(String name, Object data) {
         _taskData.put(name, data);
     }
-    
+
     public Object getTaskData(String name) {
         return _taskData.get(name);
     }
@@ -205,32 +217,60 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     public void setExecutorData(String name, Object data) {
         _executorData.put(name, data);
     }
-    
+
     public Object getExecutorData(String name) {
         return _executorData.get(name);
-    }    
-    
+    }
+
     public void addTaskHook(ITaskHook hook) {
         hook.prepare(_stormConf, this);
         _hooks.add(hook);
     }
-    
+
     public Collection<ITaskHook> getHooks() {
         return _hooks;
     }
+
+    public Object groupingToJSONableObject(Grouping grouping) {
+    	if (grouping.is_set_fields()) {
+    		return grouping.get_fields();
+    	} else {
+    		return grouping.getSetField().toString();
+    	}
+    }
     
     @Override
     public String toJSONString() {
         Map obj = new HashMap();
         obj.put("task->component", this.getTaskToComponent());
         obj.put("taskid", this.getThisTaskId());
-        // TODO: jsonify StormTopology
-        // at the minimum should send source info
+        obj.put("componentid", this.getThisComponentId());
+        List<String> streamList = new ArrayList<String>();
+        streamList.addAll(this.getThisStreams());
+        obj.put("streams", streamList);
+        obj.put("stream->outputfields", this.getThisOutputFieldsForStreams());
+        // Convert targets to a JSON serializable format
+        Map<String, Map> stringTargets = new HashMap<String, Map>();
+        for (Map.Entry<String, Map<String, Grouping>> entry : this.getThisTargets().entrySet()) {
+        	Map stringTargetMap = new HashMap<String, Object>();
+        	for (Map.Entry<String, Grouping> innerEntry : entry.getValue().entrySet()) {
+        		stringTargetMap.put(innerEntry.getKey(), groupingToJSONableObject(innerEntry.getValue()));
+        	}
+        	stringTargets.put(entry.getKey(), stringTargetMap);
+        }
+        obj.put("stream->target->grouping", stringTargets);
+        // Convert sources to a JSON serializable format
+        Map<String, String> stringSources = new HashMap<String, String>();
+        for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
+        	Map stringSourceMap = new HashMap<String, Object>();
+        	stringSourceMap.put(entry.getKey().toString(), groupingToJSONableObject(entry.getValue()));
+        }
+        obj.put("sources->grouping", stringSources);
         return JSONValue.toJSONString(obj);
     }
 
     /*
-     * Register a IMetric instance. 
+     * Register a IMetric instance.
      * Storm will then call getValueAndReset on the metric every timeBucketSizeInSecs
      * and the returned value is sent to all metrics consumers.
      * You must call this during IBolt::prepare or ISpout::open.
@@ -238,7 +278,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
      */
     public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
         if((Boolean)_openOrPrepareWasCalled.deref() == true) {
-            throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " + 
+            throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +
                                        "IBolt::prepare() or ISpout::open() method.");
         }
 
@@ -250,7 +290,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
             throw new IllegalArgumentException("TopologyContext.registerMetric can only be called with timeBucketSizeInSecs " +
                                                "greater than or equal to 1 second.");
         }
-        
+
         if (getRegisteredMetricByName(name) != null) {
             throw new RuntimeException("The same metric name `" + name + "` was registered twice." );
         }
@@ -278,7 +318,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     /**
      * Get component's metric from registered metrics by name.
      * Notice: Normally, one component can only register one metric name once.
-     *         But now registerMetric has a bug(https://issues.apache.org/jira/browse/STORM-254) 
+     *         But now registerMetric has a bug(https://issues.apache.org/jira/browse/STORM-254)
      *         cause the same metric name can register twice.
      *         So we just return the first metric we meet.
      */
@@ -291,14 +331,14 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
                 metric = nameToMetric.get(name);
                 if (metric != null) {
                     //we just return the first metric we meet
-                    break;  
+                    break;
                 }
             }
-        } 
-        
+        }
+
         return metric;
-    }   
- 
+    }
+
     /*
      * Convinience method for registering ReducedMetric.
      */


[07/12] storm git commit: STORM-807: quote args correctly

Posted by pt...@apache.org.
STORM-807: quote args correctly


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3844fcec
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3844fcec
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3844fcec

Branch: refs/heads/0.10.x-branch
Commit: 3844fcec314f9884c4c0866923c8049985cc498c
Parents: 1276691
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri May 1 09:39:45 2015 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:18:36 2015 -0400

----------------------------------------------------------------------
 bin/storm | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3844fcec/bin/storm
----------------------------------------------------------------------
diff --git a/bin/storm b/bin/storm
index 2adb249..809a83a 100755
--- a/bin/storm
+++ b/bin/storm
@@ -73,4 +73,4 @@ if [ -f "${STORM_CONF_DIR}/storm-env.sh" ]; then
   . "${STORM_CONF_DIR}/storm-env.sh"
 fi
 
-exec $PYTHON ${STORM_BIN_DIR}/storm.py $@
+exec "$PYTHON" "${STORM_BIN_DIR}/storm.py" "$@"


[10/12] storm git commit: STORM-615. Add REST API to upload topology.

Posted by pt...@apache.org.
STORM-615. Add REST API to upload topology.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d3eeb744
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d3eeb744
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d3eeb744

Branch: refs/heads/0.10.x-branch
Commit: d3eeb74435e9838e51b08ef6db143f35fa573b86
Parents: a8d25fb
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Tue Mar 31 17:24:47 2015 -0700
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:25:38 2015 -0400

----------------------------------------------------------------------
 STORM-UI-REST-API.md | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d3eeb744/STORM-UI-REST-API.md
----------------------------------------------------------------------
diff --git a/STORM-UI-REST-API.md b/STORM-UI-REST-API.md
index b94186a..6d1b8bb 100644
--- a/STORM-UI-REST-API.md
+++ b/STORM-UI-REST-API.md
@@ -613,7 +613,6 @@ error response:
 
 uploads a topology.
 
-Caution: This api doesn't work in security mode.
 
 |Parameter |Value   |Description  |
 |----------|--------|-------------|


[02/12] storm git commit: Change sources->grouping to source->stream->grouping in JSON context.

Posted by pt...@apache.org.
Change sources->grouping to source->stream->grouping in JSON context.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6961be7d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6961be7d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6961be7d

Branch: refs/heads/0.10.x-branch
Commit: 6961be7d18f6c0a772a5fbd69f18c7ff51523179
Parents: 5fad1b1
Author: Dan Blanchard <da...@parsely.com>
Authored: Fri Apr 17 11:00:21 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:03:54 2015 -0400

----------------------------------------------------------------------
 .../src/jvm/backtype/storm/task/TopologyContext.java   | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6961be7d/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index 4855f4e..c35d9db 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -260,12 +260,17 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
         }
         obj.put("stream->target->grouping", stringTargets);
         // Convert sources to a JSON serializable format
-        Map<String, String> stringSources = new HashMap<String, String>();
+        Map<String, Map<String, Object>> stringSources = new HashMap<String, Map<String, Object>>();
         for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
-        	Map stringSourceMap = new HashMap<String, Object>();
-        	stringSourceMap.put(entry.getKey().toString(), groupingToJSONableObject(entry.getValue()));
+        	GlobalStreamId gid = entry.getKey();
+        	Map<String, Object> stringSourceMap = stringSources.get(gid.get_componentId());
+        	if (stringSourceMap == null) {
+        		stringSourceMap = new HashMap<String, Object>();
+        		stringSources.put(gid.get_componentId(), stringSourceMap);
+        	}
+        	stringSourceMap.put(gid.get_streamId(), groupingToJSONableObject(entry.getValue()));        	
         }
-        obj.put("sources->grouping", stringSources);
+        obj.put("source->stream->grouping", stringSources);
         return JSONValue.toJSONString(obj);
     }
 


[09/12] storm git commit: STORM-615. Add REST API to upload topology.

Posted by pt...@apache.org.
STORM-615. Add REST API to upload topology.

Conflicts:
	STORM-UI-REST-API.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a8d25fbb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a8d25fbb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a8d25fbb

Branch: refs/heads/0.10.x-branch
Commit: a8d25fbb803e3e8e0828f07dbfa8585e9ac98f4e
Parents: 15c3006
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Mar 12 13:58:31 2015 -0700
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:25:28 2015 -0400

----------------------------------------------------------------------
 STORM-UI-REST-API.md                          | 66 ++++++++++++++++++++++
 storm-core/src/clj/backtype/storm/ui/core.clj | 58 ++++++++++++++++++-
 2 files changed, 122 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a8d25fbb/STORM-UI-REST-API.md
----------------------------------------------------------------------
diff --git a/STORM-UI-REST-API.md b/STORM-UI-REST-API.md
index 2836105..b94186a 100644
--- a/STORM-UI-REST-API.md
+++ b/STORM-UI-REST-API.md
@@ -573,8 +573,74 @@ Sample response:
 }
 ```
 
+### /api/v1/token (GET)
+
+Returns a anti forgery token to use in POST calls
+
+Response fields:
+
+|Field  |Value |Description|
+|antiForgeryToken| String | CSRF token|
+
+Sample response:
+
+```json
+{
+    "antiForgeryToken": "Dygf1UHQF7qL0syKLTKEGSX5y0rZhhQTxS2f/WWwI2PhN1zmRdh8MQ1KTd5CXRmjMVmAJ43eklqYmvD5"
+}
+```
+
 ## POST Operations
 
+### Cross site request forgery (CSRF) prevention in POST requests
+
+In order to prevent CSRF vulnerability, the REST API uses a CSRF token. This is primarily done for the UI, however we
+do not have alternative APIs/paths for UI and non-UI clients.
+
+The token is generated during the `/api/v1/topology/:id` (GET) request. The JSON response for this GET request contains
+a field called "antiForgeryToken". All the post requests below must include a header "x-csrf-token" with the value of
+"antiForgeryToken" from the GET response. In absence of this header with the right token value you will get following
+error response:
+
+```
+{
+    "error" : "Forbidden action.",
+    "errorMessage" : "missing CSRF token."
+}
+```
+
+### /api/v1/uploadTopology (POST)
+
+uploads a topology.
+
+Caution: This api doesn't work in security mode.
+
+|Parameter |Value   |Description  |
+|----------|--------|-------------|
+|topologyConfig |String (required)| topology json config  |
+|topologyJar |String (required)| topology jar file |
+
+Sample topologyConfig json:
+```json
+{"topologyMainClass": "storm.starter.WordCountTopology", "topologyMainClassArgs": ["wordcount1"]}
+```
+
+Examples:
+
+```no-highlight
+curl  -i -b ~/cookiejar.txt -c ~/cookiejar.txt -X POST  
+-H 'x-csrf-token: ycit8Wi89ZdAOo9KKaka/Pvd0vnx8TZzP8xSDDSw8J8bTfyn4jz38VN4Xcb7CF6xigRzDLaGVHbrSj80'  
+-F topologyConfig='{"topologyMainClass": "storm.starter.WordCountTopology", "topologyMainClassArgs": ["wordcount1"]}' 
+-F topologyJar=@examples/storm-starter/storm-starter-topologies-0.10.0-SNAPSHOT.jar 
+http://localhost:8080/api/v1/uploadTopology
+```
+
+Sample Response:
+
+```json
+{"status":"success"}
+```
+
 ### /api/v1/topology/:id/activate (POST)
 
 Activates a topology.

http://git-wip-us.apache.org/repos/asf/storm/blob/a8d25fbb/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 38597ed..8ba92de 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -16,7 +16,9 @@
 
 (ns backtype.storm.ui.core
   (:use compojure.core)
-  (:use ring.middleware.reload)
+  (:use [clojure.java.shell :only [sh]])
+  (:use ring.middleware.reload
+        ring.middleware.multipart-params)
   (:use [ring.middleware.json :only [wrap-json-params]])
   (:use [hiccup core page-helpers])
   (:use [backtype.storm config util log])
@@ -502,6 +504,39 @@
               (hashmap-to-persistent bolts))
        spout-comp-summs bolt-comp-summs window id))))
 
+(defn validate-tplg-submit-params [params]
+  (let [tplg-jar-file (params :topologyJar)
+        tplg-config (if (not-nil? (params :topologyConfig)) (from-json (params :topologyConfig)))]
+    (cond
+     (nil? tplg-jar-file) {:valid false :error "missing topology jar file"}
+     (nil? tplg-config) {:valid false :error "missing topology config"}
+     (nil? (tplg-config "topologyMainClass")) {:valid false :error "topologyMainClass missing in topologyConfig"}
+     :else {:valid true})))
+
+(defn run-tplg-submit-cmd [tplg-jar-file tplg-config user]
+  (let [tplg-main-class (if (not-nil? tplg-config) (trim (tplg-config "topologyMainClass")))
+        tplg-main-class-args (if (not-nil? tplg-config) (clojure.string/join " " (tplg-config "topologyMainClassArgs")))
+        tplg-jvm-opts (if (not-nil? tplg-config) (clojure.string/join " " (tplg-config "topologyJvmOpts")))
+        storm-home (System/getProperty "storm.home")
+        storm-conf-dir (str storm-home file-path-separator "conf")
+        storm-log-dir (if (not-nil? (*STORM-CONF* "storm.log.dir")) (*STORM-CONF* "storm.log.dir")
+                          (str storm-home file-path-separator "logs"))
+        storm-libs (str storm-home file-path-separator "lib" file-path-separator "*")
+        java-cmd (str (System/getProperty "java.home") file-path-separator "bin" file-path-separator "java")
+        storm-cmd (str storm-home file-path-separator "bin" file-path-separator "storm")
+        tplg-cmd-response (sh storm-cmd "jar" tplg-jar-file
+                              tplg-main-class
+                              tplg-main-class-args
+                              (if (not= user "unknown") (str "-c storm.doAsUser=" user) ""))]
+    (log-message "tplg-cmd-response " tplg-cmd-response)
+    (cond
+     (= (tplg-cmd-response :exit) 0) {"status" "success"}
+     (and (not= (tplg-cmd-response :exit) 0)
+          (not-nil? (re-find #"already exists on cluster" (tplg-cmd-response :err)))) {"status" "failed" "error" "Topology with the same name exists in cluster"}
+          (not= (tplg-cmd-response :exit) 0) {"status" "failed" "error" (clojure.string/trim-newline (tplg-cmd-response :err))}
+          :else {"status" "success" "response" "topology deployed"}
+          )))
+
 (defn cluster-configuration []
   (with-nimbus nimbus
     (.getNimbusConf ^Nimbus$Client nimbus)))
@@ -979,7 +1014,25 @@
         (.killTopologyWithOpts nimbus name options)
         (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
     (json-response (topology-op-response id "kill") (m "callback")))
-
+  (POST "/api/v1/uploadTopology" [:as {:keys [cookies servlet-request]} id & params]
+        (assert-authorized-user servlet-request "submitTopology")
+        (let [valid-tplg (validate-tplg-submit-params params)
+              valid (valid-tplg :valid)
+              context (ReqContext/context)]
+          (if http-creds-handler (.populateContext http-creds-handler context servlet-request))
+          (if (= valid true)
+            (let [tplg-file-data (params :topologyJar)
+                  tplg-temp-file (tplg-file-data :tempfile)
+                  tplg-file-name (tplg-file-data :filename)
+                  tplg-jar-file (clojure.string/join [(.getParent tplg-temp-file) file-path-separator tplg-file-name])
+                  tplg-config (if (not-nil? (params :topologyConfig)) (from-json (params :topologyConfig)))
+                  principal (if (.isImpersonating context) (.realPrincipal context) (.principal context))
+                  user (if principal (.getName principal) "unknown")]
+              (.renameTo tplg-temp-file (File. tplg-jar-file))
+              (let [ret (run-tplg-submit-cmd tplg-jar-file tplg-config user)]
+                (json-response ret (params "callback"))))
+            (json-response {"status" "failed" "error" (valid-tplg :error)} (params "callback"))
+            )))
   (GET "/" [:as {cookies :cookies}]
        (resp/redirect "/index.html"))
   (route/resources "/")
@@ -1005,6 +1058,7 @@
 (def app
   (handler/site (-> main-routes
                     (wrap-json-params)
+                    (wrap-multipart-params)
                     (wrap-reload '[backtype.storm.ui.core])
                     catch-errors)))
 


[04/12] storm git commit: Add documentation about new topology context info available as of STORM-789.

Posted by pt...@apache.org.
Add documentation about new topology context info available as of STORM-789.

Also cleaned up some awkward language describing heartbeats.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4255ab5d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4255ab5d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4255ab5d

Branch: refs/heads/0.10.x-branch
Commit: 4255ab5d1eb24b7884b7305e57edcabe6f2e3cbb
Parents: c47d7d4
Author: Dan Blanchard <da...@parsely.com>
Authored: Mon Apr 20 16:33:51 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:04:19 2015 -0400

----------------------------------------------------------------------
 docs/documentation/Multilang-protocol.md | 63 ++++++++++++++++++++-------
 1 file changed, 47 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4255ab5d/docs/documentation/Multilang-protocol.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Multilang-protocol.md b/docs/documentation/Multilang-protocol.md
index 43afd32..017ad32 100644
--- a/docs/documentation/Multilang-protocol.md
+++ b/docs/documentation/Multilang-protocol.md
@@ -49,7 +49,7 @@ STDIN and STDOUT.
 
 The initial handshake is the same for both types of shell components:
 
-* STDIN: Setup info. This is a JSON object with the Storm configuration, Topology context, and a PID directory, like this:
+* STDIN: Setup info. This is a JSON object with the Storm configuration, a PID directory, and a topology context, like this:
 
 ```
 {
@@ -57,15 +57,32 @@ The initial handshake is the same for both types of shell components:
         "topology.message.timeout.secs": 3,
         // etc
     },
+    "pidDir": "...",
     "context": {
         "task->component": {
             "1": "example-spout",
             "2": "__acker",
-            "3": "example-bolt"
+            "3": "example-bolt1",
+            "4": "example-bolt2"
         },
-        "taskid": 3
-    },
-    "pidDir": "..."
+        "taskid": 3,
+        // Everything below this line is only available in Storm 0.11.0+
+        "componentid": "example-bolt"
+        "stream->target->grouping": {
+        	"default": {
+        		"example-bolt2": {
+        			"type": "SHUFFLE"}}},
+        "streams": ["default"],
+ 		"stream->outputfields": {"default": ["word"]},
+	    "source->stream->grouping": {
+	    	"example-spout": {
+	    		"default": {
+	    			"type": "FIELDS",
+	    			"fields": ["word"]
+	    		}
+	    	}
+	    }
+	}
 }
 ```
 
@@ -73,6 +90,15 @@ Your script should create an empty file named with its PID in this directory. e.
 the PID is 1234, so an empty file named 1234 is created in the directory. This
 file lets the supervisor know the PID so it can shutdown the process later on.
 
+As of Storm 0.11.0, the context sent by Storm to shell components has been
+enhanced substantially to include all aspects of the topology context available
+to JVM components.  One key addition is the ability to determine a shell
+component's source and targets (i.e., inputs and outputs) in the topology via
+the `stream->target->grouping` and `source->stream->grouping` dictionaries.  At
+the innermost level of these nested dictionaries, groupings are represented as
+a dictionary that minimally has a `type` key, but can also have a `fields` key
+to specify which fields are involved in a `FIELDS` grouping.
+
 * STDOUT: Your PID, in a JSON object, like `{"pid": 1234}`. The shell component will log the PID to its log.
 
 What happens next depends on the type of component:
@@ -222,30 +248,35 @@ A "log" will log a message in the worker log. It looks like:
 * Note that, as of version 0.7.1, there is no longer any need for a
   shell bolt to 'sync'.
 
-### Handling Heartbeat (0.9.3 and later)
+### Handling Heartbeats (0.9.3 and later)
 
-ShellSpout/ShellBolt has been introduced from [STORM-513](https://issues.apache.org/jira/browse/STORM-513) to prevent hanging/zombie subprocess.
+As of Storm 0.9.3, heartbeats have been between ShellSpout/ShellBolt and their
+multi-lang subprocesses to detect hanging/zombie subprocesses.  Any libraries
+for interfacing with Storm via multi-lang must take the following actions
+regarding hearbeats:
 
-* Spout
+#### Spout
 
-Shell spouts are synchronous, and subprocess always send 'sync' at the end of next() so you don't need to take care of.
-One thing you have to take care of is, don't let subprocess sleep too much from next(), especially keep it less to worker timeout.
+Shell spouts are synchronous, so subprocesses always send `sync` commands at the
+end of `next()`,  so you should not have to do much to support heartbeats for
+spouts.  That said, you must not let subprocesses sleep more than the worker
+timeout during `next()`.
 
-* Bolt
+#### Bolt
 
-Shell bolts are asynchronous, so ShellBolt will send heartbeat tuple periodically.
-Heartbeat tuple looks like:
+Shell bolts are asynchronous, so a ShellBolt will send heartbeat tuples to its
+subprocess periodically.  Heartbeat tuple looks like:
 
 ```
 {
 	"id": "-6955786537413359385",
 	"comp": "1",
-	// heartbeat tuple
 	"stream": "__heartbeat",
-	// it's system task id
+	// this shell bolt's system task id
 	"task": -1,
 	"tuple": []
 }
 ```
 
-When subprocess receives heartbeat tuple, it should send 'sync' to ShellBolt.
+When subprocess receives heartbeat tuple, it must send a `sync` command back to
+ShellBolt.


[11/12] storm git commit: remove CSRF doc brought in by cherry-pick

Posted by pt...@apache.org.
remove CSRF doc brought in by cherry-pick


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/29a32ff5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/29a32ff5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/29a32ff5

Branch: refs/heads/0.10.x-branch
Commit: 29a32ff50519ad1c0cb232be7890f2b635cadfad
Parents: d3eeb74
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 15 14:32:18 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:32:18 2015 -0400

----------------------------------------------------------------------
 STORM-UI-REST-API.md | 35 -----------------------------------
 1 file changed, 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/29a32ff5/STORM-UI-REST-API.md
----------------------------------------------------------------------
diff --git a/STORM-UI-REST-API.md b/STORM-UI-REST-API.md
index 6d1b8bb..e663136 100644
--- a/STORM-UI-REST-API.md
+++ b/STORM-UI-REST-API.md
@@ -573,42 +573,8 @@ Sample response:
 }
 ```
 
-### /api/v1/token (GET)
-
-Returns a anti forgery token to use in POST calls
-
-Response fields:
-
-|Field  |Value |Description|
-|antiForgeryToken| String | CSRF token|
-
-Sample response:
-
-```json
-{
-    "antiForgeryToken": "Dygf1UHQF7qL0syKLTKEGSX5y0rZhhQTxS2f/WWwI2PhN1zmRdh8MQ1KTd5CXRmjMVmAJ43eklqYmvD5"
-}
-```
-
 ## POST Operations
 
-### Cross site request forgery (CSRF) prevention in POST requests
-
-In order to prevent CSRF vulnerability, the REST API uses a CSRF token. This is primarily done for the UI, however we
-do not have alternative APIs/paths for UI and non-UI clients.
-
-The token is generated during the `/api/v1/topology/:id` (GET) request. The JSON response for this GET request contains
-a field called "antiForgeryToken". All the post requests below must include a header "x-csrf-token" with the value of
-"antiForgeryToken" from the GET response. In absence of this header with the right token value you will get following
-error response:
-
-```
-{
-    "error" : "Forbidden action.",
-    "errorMessage" : "missing CSRF token."
-}
-```
-
 ### /api/v1/uploadTopology (POST)
 
 uploads a topology.
@@ -628,7 +594,6 @@ Examples:
 
 ```no-highlight
 curl  -i -b ~/cookiejar.txt -c ~/cookiejar.txt -X POST  
--H 'x-csrf-token: ycit8Wi89ZdAOo9KKaka/Pvd0vnx8TZzP8xSDDSw8J8bTfyn4jz38VN4Xcb7CF6xigRzDLaGVHbrSj80'  
 -F topologyConfig='{"topologyMainClass": "storm.starter.WordCountTopology", "topologyMainClassArgs": ["wordcount1"]}' 
 -F topologyJar=@examples/storm-starter/storm-starter-topologies-0.10.0-SNAPSHOT.jar 
 http://localhost:8080/api/v1/uploadTopology


[06/12] storm git commit: add STORM-686 to CHANGELOG

Posted by pt...@apache.org.
add STORM-686 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/12766913
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/12766913
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/12766913

Branch: refs/heads/0.10.x-branch
Commit: 12766913010670746fed792792bd553dba383e8f
Parents: e06c7f4
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 15 14:14:23 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:14:23 2015 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/12766913/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 20625f3..c27f673 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.10.0
+ * STORM-686. Add worker-launcher to storm-dist.
  * STORM-789: Send more topology context to Multi-Lang components via initial handshake
  * STORM-764: Have option to compress thrift heartbeat
  * JIRA STORM-766 (Include version info in the service page)


[05/12] storm git commit: add STORM-789 to changelog

Posted by pt...@apache.org.
add STORM-789 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e06c7f4f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e06c7f4f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e06c7f4f

Branch: refs/heads/0.10.x-branch
Commit: e06c7f4f3060df02e50cf68c9baf9c0615435f30
Parents: 4255ab5
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 15 14:05:23 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:05:23 2015 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e06c7f4f/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 456b69b..20625f3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.10.0
+ * STORM-789: Send more topology context to Multi-Lang components via initial handshake
  * STORM-764: Have option to compress thrift heartbeat
  * JIRA STORM-766 (Include version info in the service page)
  * STORM-765: Thrift serialization for local state. 


[12/12] storm git commit: add STORM-615 to changelog

Posted by pt...@apache.org.
add STORM-615 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/208ddbef
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/208ddbef
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/208ddbef

Branch: refs/heads/0.10.x-branch
Commit: 208ddbeffa3471e27ad05e8263d65d2ed90d8742
Parents: 29a32ff
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 15 14:34:02 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:34:02 2015 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/208ddbef/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b2b14af..6ba3b32 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,7 @@
 ## 0.10.0
+ * STORM-615: Add REST API to upload topology.
  * STORM-807: quote args correctly in /bin/storm
- * STORM-686. Add worker-launcher to storm-dist.
+ * STORM-686: Add worker-launcher to storm-dist.
  * STORM-789: Send more topology context to Multi-Lang components via initial handshake
  * STORM-764: Have option to compress thrift heartbeat
  * JIRA STORM-766 (Include version info in the service page)


[03/12] storm git commit: Address comments for PR #525

Posted by pt...@apache.org.
Address comments for PR #525

- Fixed a couple style/formatting issues
- Made groupings get converted to dictionaries that contain 'type' and 'fields'
  keys (where only 'type' is required).


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c47d7d42
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c47d7d42
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c47d7d42

Branch: refs/heads/0.10.x-branch
Commit: c47d7d42d1a769277dfc4b6e11e5e07e3279b707
Parents: 6961be7
Author: Dan Blanchard <da...@parsely.com>
Authored: Mon Apr 20 15:52:43 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:04:07 2015 -0400

----------------------------------------------------------------------
 .../backtype/storm/task/TopologyContext.java    | 57 ++++++++++----------
 1 file changed, 29 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c47d7d42/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index c35d9db..8124651 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -145,25 +145,25 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
         return getComponentId(_taskId);
     }
 
-    /**
-     * Gets the declared output fields for the specified stream id for the component
-     * this task is a part of.
-     */
-    public Fields getThisOutputFields(String streamId) {
-        return getComponentOutputFields(getThisComponentId(), streamId);
-    }
-
-    /**
-     * Gets the declared output fields for the specified stream id for the component
-     * this task is a part of.
-     */
-    public Map<String, List<String>> getThisOutputFieldsForStreams() {
-    	Map<String, List<String>> streamToFields = new HashMap<String, List<String>>();
-    	for (String stream : this.getThisStreams()) {
-    		streamToFields.put(stream, this.getThisOutputFields(stream).toList());
-    	}
-    	return streamToFields;
-    }
+	/**
+	 * Gets the declared output fields for the specified stream id for the
+	 * component this task is a part of.
+	 */
+	public Fields getThisOutputFields(String streamId) {
+		return getComponentOutputFields(getThisComponentId(), streamId);
+	}
+
+	/**
+	 * Gets the declared output fields for the specified stream id for the
+	 * component this task is a part of.
+	 */
+	public Map<String, List<String>> getThisOutputFieldsForStreams() {
+		Map<String, List<String>> streamToFields = new HashMap<String, List<String>>();
+		for (String stream : this.getThisStreams()) {
+			streamToFields.put(stream, this.getThisOutputFields(stream).toList());
+		}
+		return streamToFields;
+	}
 
     /**
      * Gets the set of streams declared for the component of this task.
@@ -231,13 +231,14 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
         return _hooks;
     }
 
-    public Object groupingToJSONableObject(Grouping grouping) {
-    	if (grouping.is_set_fields()) {
-    		return grouping.get_fields();
-    	} else {
-    		return grouping.getSetField().toString();
-    	}
-    }
+	private static Map<String, Object> groupingToJSONableMap(Grouping grouping) {
+		Map groupingMap = new HashMap<String, Object>();
+		groupingMap.put("type", grouping.getSetField().toString());
+		if (grouping.is_set_fields()) {
+			groupingMap.put("fields", grouping.get_fields());
+		}
+		return groupingMap;
+	}
     
     @Override
     public String toJSONString() {
@@ -254,7 +255,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
         for (Map.Entry<String, Map<String, Grouping>> entry : this.getThisTargets().entrySet()) {
         	Map stringTargetMap = new HashMap<String, Object>();
         	for (Map.Entry<String, Grouping> innerEntry : entry.getValue().entrySet()) {
-        		stringTargetMap.put(innerEntry.getKey(), groupingToJSONableObject(innerEntry.getValue()));
+        		stringTargetMap.put(innerEntry.getKey(), groupingToJSONableMap(innerEntry.getValue()));
         	}
         	stringTargets.put(entry.getKey(), stringTargetMap);
         }
@@ -268,7 +269,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
         		stringSourceMap = new HashMap<String, Object>();
         		stringSources.put(gid.get_componentId(), stringSourceMap);
         	}
-        	stringSourceMap.put(gid.get_streamId(), groupingToJSONableObject(entry.getValue()));        	
+        	stringSourceMap.put(gid.get_streamId(), groupingToJSONableMap(entry.getValue()));        	
         }
         obj.put("source->stream->grouping", stringSources);
         return JSONValue.toJSONString(obj);


[08/12] storm git commit: add STORM-807 to changelog

Posted by pt...@apache.org.
add STORM-807 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/15c3006f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/15c3006f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/15c3006f

Branch: refs/heads/0.10.x-branch
Commit: 15c3006fee117f33f754e7f3d7f1f658f22820f7
Parents: 3844fce
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 15 14:19:33 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:19:33 2015 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/15c3006f/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c27f673..b2b14af 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.10.0
+ * STORM-807: quote args correctly in /bin/storm
  * STORM-686. Add worker-launcher to storm-dist.
  * STORM-789: Send more topology context to Multi-Lang components via initial handshake
  * STORM-764: Have option to compress thrift heartbeat