You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/18 17:12:39 UTC

[2/5] storm git commit: backport thrift.clj to Thrift.java

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/thrift.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/thrift.clj b/storm-core/src/clj/org/apache/storm/thrift.clj
deleted file mode 100644
index 779c1d1..0000000
--- a/storm-core/src/clj/org/apache/storm/thrift.clj
+++ /dev/null
@@ -1,286 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.thrift
-  (:import [java.util HashMap]
-           [java.io Serializable]
-           [org.apache.storm.generated NodeInfo Assignment])
-  (:import [org.apache.storm.generated JavaObject Grouping Nimbus StormTopology
-            StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface
-            ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo
-            GlobalStreamId ComponentObject ComponentObject$_Fields
-            ShellComponent SupervisorInfo])
-  (:import [org.apache.storm.utils Utils NimbusClient ConfigUtils])
-  (:import [org.apache.storm Constants])
-  (:import [org.apache.storm.security.auth ReqContext])
-  (:import [org.apache.storm.grouping CustomStreamGrouping])
-  (:import [org.apache.storm.topology TopologyBuilder])
-  (:import [org.apache.storm.clojure RichShellBolt RichShellSpout])
-  (:import [org.apache.thrift.transport TTransport]
-           (org.json.simple JSONValue))
-  (:use [org.apache.storm util config log zookeeper]))
-
-(defn instantiate-java-object
-  [^JavaObject obj]
-  (let [name (symbol (.get_full_class_name obj))
-        args (map (memfn getFieldValue) (.get_args_list obj))]
-    (eval `(new ~name ~@args))))
-
-(def grouping-constants
-  {Grouping$_Fields/FIELDS :fields
-   Grouping$_Fields/SHUFFLE :shuffle
-   Grouping$_Fields/ALL :all
-   Grouping$_Fields/NONE :none
-   Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized
-   Grouping$_Fields/CUSTOM_OBJECT :custom-object
-   Grouping$_Fields/DIRECT :direct
-   Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle})
-
-(defn grouping-type
-  [^Grouping grouping]
-  (grouping-constants (.getSetField grouping)))
-
-(defn field-grouping
-  [^Grouping grouping]
-  (when-not (= (grouping-type grouping) :fields)
-    (throw (IllegalArgumentException. "Tried to get grouping fields from non fields grouping")))
-  (.get_fields grouping))
-
-(defn global-grouping?
-  [^Grouping grouping]
-  (and (= :fields (grouping-type grouping))
-       (empty? (field-grouping grouping))))
-
-(defn parallelism-hint
-  [^ComponentCommon component-common]
-  (let [phint (.get_parallelism_hint component-common)]
-    (if-not (.is_set_parallelism_hint component-common) 1 phint)))
-
-(defn nimbus-client-and-conn
-  ([host port]
-    (nimbus-client-and-conn host port nil))
-  ([host port as-user]
-  (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user)
-  (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
-        nimbusClient (NimbusClient. conf host port nil as-user)
-        client (.getClient nimbusClient)
-        transport (.transport nimbusClient)]
-        [client transport] )))
-
-(defmacro with-nimbus-connection
-  [[client-sym host port] & body]
-  `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port)]
-    (try
-      ~@body
-    (finally (.close conn#)))))
-
-(defmacro with-configured-nimbus-connection
-  [client-sym & body]
-  `(let [conf# (clojurify-structure (ConfigUtils/readStormConfig))
-         context# (ReqContext/context)
-         user# (if (.principal context#) (.getName (.principal context#)))
-         nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#)
-         ~client-sym (.getClient nimbusClient#)
-         conn# (.transport nimbusClient#)
-         ]
-     (try
-       ~@body
-     (finally (.close conn#)))))
-
-(defn direct-output-fields
-  [fields]
-  (StreamInfo. fields true))
-
-(defn output-fields
-  [fields]
-  (StreamInfo. fields false))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn mk-output-spec
-  [output-spec]
-  (let [output-spec (if (map? output-spec)
-                      output-spec
-                      {Utils/DEFAULT_STREAM_ID output-spec})]
-    (map-val
-      (fn [out]
-        (if (instance? StreamInfo out)
-          out
-          (StreamInfo. out false)))
-      output-spec)))
-
-(defnk mk-plain-component-common
-  [inputs output-spec parallelism-hint :conf nil]
-  (let [ret (ComponentCommon. (HashMap. inputs) (HashMap. (mk-output-spec output-spec)))]
-    (when parallelism-hint
-      (.set_parallelism_hint ret parallelism-hint))
-    (when conf
-      (.set_json_conf ret (JSONValue/toJSONString conf)))
-    ret))
-
-(defnk mk-spout-spec*
-  [spout outputs :p nil :conf nil]
-  (SpoutSpec. (ComponentObject/serialized_java (Utils/javaSerialize spout))
-              (mk-plain-component-common {} outputs p :conf conf)))
-
-(defn mk-shuffle-grouping
-  []
-  (Grouping/shuffle (NullStruct.)))
-
-(defn mk-local-or-shuffle-grouping
-  []
-  (Grouping/local_or_shuffle (NullStruct.)))
-
-(defn mk-fields-grouping
-  [fields]
-  (Grouping/fields fields))
-
-(defn mk-global-grouping
-  []
-  (mk-fields-grouping []))
-
-(defn mk-direct-grouping
-  []
-  (Grouping/direct (NullStruct.)))
-
-(defn mk-all-grouping
-  []
-  (Grouping/all (NullStruct.)))
-
-(defn mk-none-grouping
-  []
-  (Grouping/none (NullStruct.)))
-
-(defn deserialized-component-object
-  [^ComponentObject obj]
-  (when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA)
-    (throw (RuntimeException. "Cannot deserialize non-java-serialized object")))
-  (Utils/javaDeserialize (.get_serialized_java obj) Serializable))
-
-(defn serialize-component-object
-  [obj]
-  (ComponentObject/serialized_java (Utils/javaSerialize obj)))
-
-(defn- mk-grouping
-  [grouping-spec]
-  (cond (nil? grouping-spec)
-        (mk-none-grouping)
-
-        (instance? Grouping grouping-spec)
-        grouping-spec
-
-        (instance? CustomStreamGrouping grouping-spec)
-        (Grouping/custom_serialized (Utils/javaSerialize grouping-spec))
-
-        (instance? JavaObject grouping-spec)
-        (Grouping/custom_object grouping-spec)
-
-        (sequential? grouping-spec)
-        (mk-fields-grouping grouping-spec)
-
-        (= grouping-spec :shuffle)
-        (mk-shuffle-grouping)
-
-        (= grouping-spec :local-or-shuffle)
-        (mk-local-or-shuffle-grouping)
-        (= grouping-spec :none)
-        (mk-none-grouping)
-
-        (= grouping-spec :all)
-        (mk-all-grouping)
-
-        (= grouping-spec :global)
-        (mk-global-grouping)
-
-        (= grouping-spec :direct)
-        (mk-direct-grouping)
-
-        true
-        (throw (IllegalArgumentException.
-                 (str grouping-spec " is not a valid grouping")))))
-
-(defn- mk-inputs
-  [inputs]
-  (into {} (for [[stream-id grouping-spec] inputs]
-             [(if (sequential? stream-id)
-                (GlobalStreamId. (first stream-id) (second stream-id))
-                (GlobalStreamId. stream-id Utils/DEFAULT_STREAM_ID))
-              (mk-grouping grouping-spec)])))
-
-(defnk mk-bolt-spec*
-  [inputs bolt outputs :p nil :conf nil]
-  (let [common (mk-plain-component-common (mk-inputs inputs) outputs p :conf conf)]
-    (Bolt. (ComponentObject/serialized_java (Utils/javaSerialize bolt))
-           common)))
-
-(defnk mk-spout-spec
-  [spout :parallelism-hint nil :p nil :conf nil]
-  (let [parallelism-hint (if p p parallelism-hint)]
-    {:obj spout :p parallelism-hint :conf conf}))
-
-(defn- shell-component-params
-  [command script-or-output-spec kwargs]
-  (if (string? script-or-output-spec)
-    [(into-array String [command script-or-output-spec])
-     (first kwargs)
-     (rest kwargs)]
-    [(into-array String command)
-     script-or-output-spec
-     kwargs]))
-
-(defnk mk-bolt-spec
-  [inputs bolt :parallelism-hint nil :p nil :conf nil]
-  (let [parallelism-hint (if p p parallelism-hint)]
-    {:obj bolt :inputs inputs :p parallelism-hint :conf conf}))
-
-(defn mk-shell-bolt-spec
-  [inputs command script-or-output-spec & kwargs]
-  (let [[command output-spec kwargs]
-        (shell-component-params command script-or-output-spec kwargs)]
-    (apply mk-bolt-spec inputs
-           (RichShellBolt. command (mk-output-spec output-spec)) kwargs)))
-
-(defn mk-shell-spout-spec
-  [command script-or-output-spec & kwargs]
-  (let [[command output-spec kwargs]
-        (shell-component-params command script-or-output-spec kwargs)]
-    (apply mk-spout-spec
-           (RichShellSpout. command (mk-output-spec output-spec)) kwargs)))
-
-(defn- add-inputs
-  [declarer inputs]
-  (doseq [[id grouping] (mk-inputs inputs)]
-    (.grouping declarer id grouping)))
-
-(defn mk-topology
-  ([spout-map bolt-map]
-   (let [builder (TopologyBuilder.)]
-     (doseq [[name {spout :obj p :p conf :conf}] spout-map]
-       (-> builder (.setSpout name spout (if-not (nil? p) (int p) p)) (.addConfigurations conf)))
-     (doseq [[name {bolt :obj p :p conf :conf inputs :inputs}] bolt-map]
-       (-> builder (.setBolt name bolt (if-not (nil? p) (int p) p)) (.addConfigurations conf) (add-inputs inputs)))
-     (.createTopology builder)))
-  ([spout-map bolt-map state-spout-map]
-   (mk-topology spout-map bolt-map)))
-
-;; clojurify-structure is needed or else every element becomes the same after successive calls
-;; don't know why this happens
-(def STORM-TOPOLOGY-FIELDS
-  (-> StormTopology/metaDataMap clojurify-structure keys))
-
-(def SPOUT-FIELDS
-  [StormTopology$_Fields/SPOUTS
-   StormTopology$_Fields/STATE_SPOUTS])
-

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 1bf85d4..5b5acdb 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -49,7 +49,7 @@
   (:require [compojure.route :as route]
             [compojure.handler :as handler]
             [ring.util.response :as resp]
-            [org.apache.storm [thrift :as thrift]])
+            [org.apache.storm.internal [thrift :as thrift]])
   (:require [metrics.meters :refer [defmeter mark!]])
   (:import [org.apache.commons.lang StringEscapeUtils])
   (:import [org.apache.logging.log4j Level])

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/jvm/org/apache/storm/Thrift.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Thrift.java b/storm-core/src/jvm/org/apache/storm/Thrift.java
new file mode 100644
index 0000000..cde822f
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/Thrift.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm;
+
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.JavaObjectArg;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
+import org.apache.storm.generated.StreamInfo;
+
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.HashMap;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.storm.generated.JavaObject;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StormTopology._Fields;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.NullStruct;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.ComponentObject;
+
+import org.apache.storm.task.IBolt;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.SpoutDeclarer;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.utils.Utils;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.topology.TopologyBuilder;
+
+public class Thrift {
+    private static Logger LOG = LoggerFactory.getLogger(Thrift.class);
+
+    private static StormTopology._Fields[] STORM_TOPOLOGY_FIELDS = null;
+    private static StormTopology._Fields[] SPOUT_FIELDS =
+            { StormTopology._Fields.SPOUTS, StormTopology._Fields.STATE_SPOUTS };
+
+    static {
+        Set<_Fields> keys = StormTopology.metaDataMap.keySet();
+        keys.toArray(STORM_TOPOLOGY_FIELDS = new StormTopology._Fields[keys.size()]);
+    }
+
+    public static StormTopology._Fields[] getTopologyFields() {
+        return STORM_TOPOLOGY_FIELDS;
+    }
+
+    public static StormTopology._Fields[] getSpoutFields() {
+        return SPOUT_FIELDS;
+    }
+
+    public static class SpoutDetails {
+        private IRichSpout spout;
+        private Integer parallelism;
+        private Map conf;
+
+        public SpoutDetails(IRichSpout spout, Integer parallelism, Map conf) {
+            this.spout = spout;
+            this.parallelism = parallelism;
+            this.conf = conf;
+        }
+
+        public IRichSpout getSpout() {
+            return spout;
+        }
+
+        public Integer getParallelism() {
+            return parallelism;
+        }
+
+        public Map getConf() {
+            return conf;
+        }
+    }
+
+    public static class BoltDetails {
+        private Object bolt;
+        private Map conf;
+        private Integer parallelism;
+        private Map<GlobalStreamId, Grouping> inputs;
+
+        public BoltDetails(Object bolt, Map conf, Integer parallelism,
+                           Map<GlobalStreamId, Grouping> inputs) {
+            this.bolt = bolt;
+            this.conf = conf;
+            this.parallelism = parallelism;
+            this.inputs = inputs;
+        }
+
+        public Object getBolt() {
+            return bolt;
+        }
+
+        public Map getConf() {
+            return conf;
+        }
+
+        public Map<GlobalStreamId, Grouping> getInputs() {
+            return inputs;
+        }
+
+        public Integer getParallelism() {
+            return parallelism;
+        }
+    }
+
+    public static StreamInfo directOutputFields(List<String> fields) {
+        return new StreamInfo(fields, true);
+    }
+
+    public static StreamInfo outputFields(List<String> fields) {
+        return new StreamInfo(fields, false);
+    }
+
+    public static Grouping prepareShuffleGrouping() {
+        return Grouping.shuffle(new NullStruct());
+    }
+
+    public static Grouping prepareLocalOrShuffleGrouping() {
+        return Grouping.local_or_shuffle(new NullStruct());
+    }
+
+    public static Grouping prepareFieldsGrouping(List<String> fields) {
+        return Grouping.fields(fields);
+    }
+
+    public static Grouping prepareGlobalGrouping() {
+        return prepareFieldsGrouping(new ArrayList<String>());
+    }
+
+    public static Grouping prepareDirectGrouping() {
+        return Grouping.direct(new NullStruct());
+    }
+
+    public static Grouping prepareAllGrouping() {
+        return Grouping.all(new NullStruct());
+    }
+
+    public static Grouping prepareNoneGrouping() {
+        return Grouping.none(new NullStruct());
+    }
+
+    public static Grouping prepareCustomStreamGrouping(Object obj) {
+        return Grouping.custom_serialized(Utils.javaSerialize(obj));
+    }
+
+    public static Grouping prepareCustomJavaObjectGrouping(JavaObject obj) {
+        return Grouping.custom_object(obj);
+    }
+
+    public static Object instantiateJavaObject(JavaObject obj) {
+
+        List<JavaObjectArg> args = obj.get_args_list();
+        Class[] paraTypes = new Class[args.size()];
+        Object[] paraValues = new Object[args.size()];
+        for (int i = 0; i < args.size(); i++) {
+            JavaObjectArg arg = args.get(i);
+            paraValues[i] = arg.getFieldValue();
+
+            if (arg.getSetField().equals(JavaObjectArg._Fields.INT_ARG)) {
+                paraTypes[i] = Integer.class;
+            } else if (arg.getSetField().equals(JavaObjectArg._Fields.LONG_ARG)) {
+                paraTypes[i] = Long.class;
+            } else if (arg.getSetField().equals(JavaObjectArg._Fields.STRING_ARG)) {
+                paraTypes[i] = String.class;
+            } else if (arg.getSetField().equals(JavaObjectArg._Fields.BOOL_ARG)) {
+                paraTypes[i] = Boolean.class;
+            } else if (arg.getSetField().equals(JavaObjectArg._Fields.BINARY_ARG)) {
+                paraTypes[i] = ByteBuffer.class;
+            } else if (arg.getSetField().equals(JavaObjectArg._Fields.DOUBLE_ARG)) {
+                paraTypes[i] = Double.class;
+            } else {
+                paraTypes[i] = Object.class;
+            }
+        }
+
+        try {
+            Class clazz = Class.forName(obj.get_full_class_name());
+            Constructor cons = clazz.getConstructor(paraTypes);
+            return cons.newInstance(paraValues);
+        } catch (Exception e) {
+            LOG.error("java object instantiation failed", e);
+        }
+
+        return null;
+
+    }
+
+    public static Grouping._Fields groupingType(Grouping grouping) {
+        return grouping.getSetField();
+    }
+
+    public static List<String> fieldGrouping(Grouping grouping) {
+        if (!Grouping._Fields.FIELDS.equals(groupingType(grouping))) {
+            throw new IllegalArgumentException("Tried to get grouping fields from non fields grouping");
+        }
+        return grouping.get_fields();
+    }
+
+    public static boolean isGlobalGrouping(Grouping grouping) {
+        if (Grouping._Fields.FIELDS.equals(groupingType(grouping))) {
+            return fieldGrouping(grouping).isEmpty();
+        }
+
+        return false;
+    }
+
+    public static int getParallelismHint(ComponentCommon componentCommon) {
+        if (!componentCommon.is_set_parallelism_hint()) {
+            return 1;
+        } else {
+            return componentCommon.get_parallelism_hint();
+        }
+    }
+
+    public static ComponentObject serializeComponentObject(Object obj) {
+        return ComponentObject.serialized_java(Utils.javaSerialize(obj));
+    }
+
+    public static Object deserializeComponentObject(ComponentObject obj) {
+        if (obj.getSetField() != ComponentObject._Fields.SERIALIZED_JAVA) {
+            throw new RuntimeException("Cannot deserialize non-java-serialized object");
+        }
+        return Utils.javaDeserialize(obj.get_serialized_java(), Serializable.class);
+    }
+
+    public static ComponentCommon prepareComponentCommon(Map<GlobalStreamId, Grouping> inputs, Map<String,
+            StreamInfo> outputs, Integer parallelismHint) {
+        return prepareComponentCommon(inputs, outputs, parallelismHint, null);
+    }
+
+    public static ComponentCommon prepareComponentCommon(Map<GlobalStreamId, Grouping> inputs, Map<String, StreamInfo> outputs,
+                                                         Integer parallelismHint, Map conf) {
+        Map<GlobalStreamId, Grouping> mappedInputs = new HashMap<>();
+        Map<String, StreamInfo> mappedOutputs = new HashMap<>();
+        if (inputs != null && !inputs.isEmpty()) {
+            mappedInputs.putAll(inputs);
+        }
+        if (outputs !=null && !outputs.isEmpty()) {
+            mappedOutputs.putAll(outputs);
+        }
+        ComponentCommon component = new ComponentCommon(mappedInputs, mappedOutputs);
+        if (parallelismHint != null) {
+            component.set_parallelism_hint(parallelismHint);
+        }
+        if (conf != null) {
+            component.set_json_conf(JSONValue.toJSONString(conf));
+        }
+        return component;
+    }
+
+    public static SpoutSpec prepareSerializedSpoutDetails(IRichSpout spout, Map<String, StreamInfo> outputs) {
+        return new SpoutSpec(ComponentObject.serialized_java
+                (Utils.javaSerialize(spout)), prepareComponentCommon(new HashMap(), outputs, null, null));
+    }
+
+    public static Bolt prepareSerializedBoltDetails(Map<GlobalStreamId, Grouping> inputs, IBolt bolt, Map<String, StreamInfo> outputs,
+                                                    Integer parallelismHint, Map conf) {
+        ComponentCommon common = prepareComponentCommon(inputs, outputs, parallelismHint, conf);
+        return new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common);
+    }
+
+    public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt) {
+        return prepareBoltDetails(inputs, bolt, null, null);
+    }
+
+    public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt,
+                                                 Integer parallelismHint) {
+        return prepareBoltDetails(inputs, bolt, parallelismHint, null);
+    }
+
+    public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt,
+                                                 Integer parallelismHint, Map conf) {
+        BoltDetails details = new BoltDetails(bolt, conf, parallelismHint, inputs);
+        return details;
+    }
+
+    public static SpoutDetails prepareSpoutDetails(IRichSpout spout) {
+        return prepareSpoutDetails(spout, null, null);
+    }
+
+    public static SpoutDetails prepareSpoutDetails(IRichSpout spout, Integer parallelismHint) {
+        return prepareSpoutDetails(spout, parallelismHint, null);
+    }
+
+    public static SpoutDetails prepareSpoutDetails(IRichSpout spout, Integer parallelismHint, Map conf) {
+        SpoutDetails details = new SpoutDetails(spout, parallelismHint, conf);
+        return details;
+    }
+
+    public static StormTopology buildTopology(HashMap<String, SpoutDetails> spoutMap,
+                                              HashMap<String, BoltDetails> boltMap, HashMap<String, StateSpoutSpec> stateMap) {
+        return buildTopology(spoutMap, boltMap);
+    }
+
+    private static void addInputs(BoltDeclarer declarer, Map<GlobalStreamId, Grouping> inputs) {
+        for(Entry<GlobalStreamId, Grouping> entry : inputs.entrySet()) {
+            declarer.grouping(entry.getKey(), entry.getValue());
+        }
+    }
+
+    public static StormTopology buildTopology(Map<String, SpoutDetails> spoutMap, Map<String, BoltDetails> boltMap) {
+        TopologyBuilder builder = new TopologyBuilder();
+        for (Entry<String, SpoutDetails> entry : spoutMap.entrySet()) {
+            String spoutID = entry.getKey();
+            SpoutDetails spec = entry.getValue();
+            SpoutDeclarer spoutDeclarer = builder.setSpout(spoutID, spec.getSpout(), spec.getParallelism());
+            spoutDeclarer.addConfigurations(spec.getConf());
+        }
+        for (Entry<String, BoltDetails> entry : boltMap.entrySet()) {
+            String spoutID = entry.getKey();
+            BoltDetails spec = entry.getValue();
+            BoltDeclarer boltDeclarer = null;
+            if (spec.bolt instanceof IRichBolt) {
+                boltDeclarer = builder.setBolt(spoutID, (IRichBolt)spec.getBolt(), spec.getParallelism());
+            } else {
+                boltDeclarer = builder.setBolt(spoutID, (IBasicBolt)spec.getBolt(), spec.getParallelism());
+            }
+            boltDeclarer.addConfigurations(spec.getConf());
+            addInputs(boltDeclarer, spec.getInputs());
+        }
+        return builder.createTopology();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/jvm/org/apache/storm/testing/NGrouping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/NGrouping.java b/storm-core/src/jvm/org/apache/storm/testing/NGrouping.java
index 45b263d..06853fe 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/NGrouping.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/NGrouping.java
@@ -28,9 +28,7 @@ public class NGrouping implements CustomStreamGrouping {
     int _n;
     List<Integer> _outTasks;
     
-    public NGrouping(int n) {
-        _n = n;
-    }
+    public NGrouping(Integer n) {_n = n;}
     
     @Override
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java b/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java
index eaf7dc8..4beec48 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java
@@ -28,18 +28,22 @@ import org.apache.storm.topology.OutputFieldsDeclarer;
 
 public class PythonShellMetricsBolt extends ShellBolt implements IRichBolt {
 	private static final long serialVersionUID = 1999209252187463355L;
-	
-	public PythonShellMetricsBolt(String[] command) {
-		super(command);
+
+	public PythonShellMetricsBolt(String[] args) {
+		super(args);
 	}
 
+    public PythonShellMetricsBolt(String command, String file) {
+        super(command, file);
+    }
+
 	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
 		super.prepare(stormConf, context, collector);
-		
+
 		CountShellMetric cMetric = new CountShellMetric();
 		context.registerMetric("my-custom-shell-metric", cMetric, 5);
 	}
-	
+
 	public void declareOutputFields(OutputFieldsDeclarer declarer) {
 	}
 

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java b/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java
index ed6de14..657baa6 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java
@@ -33,11 +33,15 @@ public class PythonShellMetricsSpout extends ShellSpout implements IRichSpout {
 	public PythonShellMetricsSpout(String[] command) {
 		super(command);
 	}
-	
+
+    public PythonShellMetricsSpout(String command, String file) {
+        super(command, file);
+    }
+
 	@Override
 	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
 		super.open(conf, context, collector);
-	
+
 		CountShellMetric cMetric = new CountShellMetric();
 		context.registerMetric("my-custom-shellspout-metric", cMetric, 5);
 	}

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 9a849ea..eca9690 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -52,7 +52,6 @@ import org.apache.thrift.TSerializer;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
-import org.eclipse.jetty.util.log.Log;
 import org.json.simple.JSONValue;
 import org.json.simple.parser.ParseException;
 import org.slf4j.Logger;
@@ -1456,6 +1455,13 @@ public class Utils {
         return number & Integer.MAX_VALUE;
     }
 
+    public static GlobalStreamId getGlobalStreamId(String streamId, String componentId) {
+        if (componentId == null) {
+            return new GlobalStreamId(streamId, DEFAULT_STREAM_ID);
+        }
+        return new GlobalStreamId(streamId, componentId);
+    }
+
     public static RuntimeException wrapInRuntime(Exception e){
         if (e instanceof RuntimeException){
             return (RuntimeException)e;

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index 5ba6651..6dce7d6 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -15,26 +15,37 @@
 ;; limitations under the License.
 (ns integration.org.apache.storm.integration-test
   (:use [clojure test])
-  (:import [org.apache.storm Config])
+  (:import [org.apache.storm Config Thrift])
   (:import [org.apache.storm.topology TopologyBuilder])
   (:import [org.apache.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions])
   (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount
             TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
   (:import [org.apache.storm.tuple Fields])
-  (:use [org.apache.storm testing config clojure])
+  (:use [org.apache.storm testing config util])
+  (:use [org.apache.storm.internal clojure])
   (:use [org.apache.storm.daemon common])
-  (:require [org.apache.storm [thrift :as thrift]]))
+  (:import [org.apache.storm Thrift])
+  (:import [org.apache.storm.utils Utils]))
 
 (deftest test-basic-topology
   (doseq [zmq-on? [true false]]
     (with-simulated-time-local-cluster [cluster :supervisors 4
                                         :daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}]
-      (let [topology (thrift/mk-topology
-                      {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                      {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
-                       "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
-                       "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
-                       })
+      (let [topology (Thrift/buildTopology
+                      {"1" (Thrift/prepareSpoutDetails
+                             (TestWordSpout. true) (Integer. 3))}
+                      {"2" (Thrift/prepareBoltDetails
+                             {(Utils/getGlobalStreamId "1" nil)
+                              (Thrift/prepareFieldsGrouping ["word"])}
+                             (TestWordCounter.) (Integer. 4))
+                       "3" (Thrift/prepareBoltDetails
+                             {(Utils/getGlobalStreamId "1" nil)
+                              (Thrift/prepareGlobalGrouping)}
+                             (TestGlobalCount.))
+                       "4" (Thrift/prepareBoltDetails
+                             {(Utils/getGlobalStreamId "2" nil)
+                              (Thrift/prepareGlobalGrouping)}
+                             (TestAggregatesCounter.))})
             results (complete-topology cluster
                                        topology
                                        :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
@@ -60,12 +71,14 @@
 
 (deftest test-multi-tasks-per-executor
   (with-simulated-time-local-cluster [cluster :supervisors 4]
-    (let [topology (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true))}
-                    {"2" (thrift/mk-bolt-spec {"1" :all} emit-task-id
-                      :parallelism-hint 3
-                      :conf {TOPOLOGY-TASKS 6})
-                     })
+    (let [topology (Thrift/buildTopology
+                    {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true))}
+                    {"2" (Thrift/prepareBoltDetails
+                           {(Utils/getGlobalStreamId "1" nil)
+                            (Thrift/prepareAllGrouping)}
+                           emit-task-id
+                           (Integer. 3)
+                           {TOPOLOGY-TASKS 6})})
           results (complete-topology cluster
                                      topology
                                      :mock-sources {"1" [["a"]]})]
@@ -98,9 +111,11 @@
     (let [feeder (feeder-spout ["field1"])
           tracker (AckFailMapTracker.)
           _ (.setAckFailDelegate feeder tracker)
-          topology (thrift/mk-topology
-                     {"1" (thrift/mk-spout-spec feeder)}
-                     {"2" (thrift/mk-bolt-spec {"1" :global} ack-every-other)})]      
+          topology (Thrift/buildTopology
+                     {"1" (Thrift/prepareSpoutDetails feeder)}
+                     {"2" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareGlobalGrouping)} ack-every-other)})]
       (submit-local-topology (:nimbus cluster)
                              "timeout-tester"
                              {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
@@ -117,24 +132,36 @@
       )))
 
 (defn mk-validate-topology-1 []
-  (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                    {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+  (Thrift/buildTopology
+                    {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
+                    {"2" (Thrift/prepareBoltDetails
+                           {(Utils/getGlobalStreamId "1" nil)
+                            (Thrift/prepareFieldsGrouping ["word"])}
+                           (TestWordCounter.) (Integer. 4))}))
 
 (defn mk-invalidate-topology-1 []
-  (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                    {"2" (thrift/mk-bolt-spec {"3" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+  (Thrift/buildTopology
+                    {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
+                    {"2" (Thrift/prepareBoltDetails
+                           {(Utils/getGlobalStreamId "3" nil)
+                            (Thrift/prepareFieldsGrouping ["word"])}
+                           (TestWordCounter.) (Integer. 4))}))
 
 (defn mk-invalidate-topology-2 []
-  (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                    {"2" (thrift/mk-bolt-spec {"1" ["non-exists-field"]} (TestWordCounter.) :parallelism-hint 4)}))
+  (Thrift/buildTopology
+                    {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
+                    {"2" (Thrift/prepareBoltDetails
+                           {(Utils/getGlobalStreamId "1" nil)
+                            (Thrift/prepareFieldsGrouping ["non-exists-field"])}
+                           (TestWordCounter.) (Integer. 4))}))
 
 (defn mk-invalidate-topology-3 []
-  (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                    {"2" (thrift/mk-bolt-spec {["1" "non-exists-stream"] ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+  (Thrift/buildTopology
+                    {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
+                    {"2" (Thrift/prepareBoltDetails
+                           {(Utils/getGlobalStreamId "1" "non-exists-stream")
+                            (Thrift/prepareFieldsGrouping ["word"])}
+                           (TestWordCounter.) (Integer. 4))}))
 
 (defn try-complete-wc-topology [cluster topology]
   (try (do
@@ -164,10 +191,15 @@
 (deftest test-system-stream
   ;; this test works because mocking a spout splits up the tuples evenly among the tasks
   (with-simulated-time-local-cluster [cluster]
-      (let [topology (thrift/mk-topology
-                      {"1" (thrift/mk-spout-spec (TestWordSpout. true) :p 3)}
-                      {"2" (thrift/mk-bolt-spec {"1" ["word"] ["1" "__system"] :global} identity-bolt :p 1)
-                       })
+      (let [topology (Thrift/buildTopology
+                       {"1" (Thrift/prepareSpoutDetails
+                              (TestWordSpout. true) (Integer. 3))}
+                       {"2" (Thrift/prepareBoltDetails
+                              {(Utils/getGlobalStreamId "1" nil)
+                               (Thrift/prepareFieldsGrouping ["word"])
+                               (Utils/getGlobalStreamId "1" "__system")
+                               (Thrift/prepareGlobalGrouping)}
+                               identity-bolt (Integer. 1))})
             results (complete-topology cluster
                                        topology
                                        :mock-sources {"1" [["a"] ["b"] ["c"]]}
@@ -218,20 +250,38 @@
           [feeder3 checker3] (ack-tracking-feeder ["num"])
           tracked (mk-tracked-topology
                    cluster
-                   (topology
-                     {"1" (spout-spec feeder1)
-                      "2" (spout-spec feeder2)
-                      "3" (spout-spec feeder3)}
-                     {"4" (bolt-spec {"1" :shuffle} (branching-bolt 2))
-                      "5" (bolt-spec {"2" :shuffle} (branching-bolt 4))
-                      "6" (bolt-spec {"3" :shuffle} (branching-bolt 1))
-                      "7" (bolt-spec
-                            {"4" :shuffle
-                            "5" :shuffle
-                            "6" :shuffle}
+                   (Thrift/buildTopology
+                     {"1" (Thrift/prepareSpoutDetails feeder1)
+                      "2" (Thrift/prepareSpoutDetails feeder2)
+                      "3" (Thrift/prepareSpoutDetails feeder3)}
+                     {"4" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareShuffleGrouping)}
+                            (branching-bolt 2))
+                      "5" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "2" nil)
+                             (Thrift/prepareShuffleGrouping)}
+                            (branching-bolt 4))
+                      "6" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "3" nil)
+                             (Thrift/prepareShuffleGrouping)}
+                            (branching-bolt 1))
+                      "7" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "4" nil)
+                             (Thrift/prepareShuffleGrouping)
+                             (Utils/getGlobalStreamId "5" nil)
+                             (Thrift/prepareShuffleGrouping)
+                             (Utils/getGlobalStreamId "6" nil)
+                             (Thrift/prepareShuffleGrouping)}
                             (agg-bolt 3))
-                      "8" (bolt-spec {"7" :shuffle} (branching-bolt 2))
-                      "9" (bolt-spec {"8" :shuffle} ack-bolt)}
+                      "8" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "7" nil)
+                             (Thrift/prepareShuffleGrouping)}
+                            (branching-bolt 2))
+                      "9" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "8" nil)
+                             (Thrift/prepareShuffleGrouping)}
+                            ack-bolt)}
                      ))]
       (submit-local-topology (:nimbus cluster)
                              "acking-test1"
@@ -268,13 +318,21 @@
     (let [[feeder checker] (ack-tracking-feeder ["num"])
           tracked (mk-tracked-topology
                    cluster
-                   (topology
-                     {"1" (spout-spec feeder)}
-                     {"2" (bolt-spec {"1" :shuffle} identity-bolt)
-                      "3" (bolt-spec {"1" :shuffle} identity-bolt)
-                      "4" (bolt-spec
-                            {"2" :shuffle
-                             "3" :shuffle}
+                   (Thrift/buildTopology
+                     {"1" (Thrift/prepareSpoutDetails feeder)}
+                     {"2" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareShuffleGrouping)}
+                            identity-bolt)
+                      "3" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareShuffleGrouping)}
+                            identity-bolt)
+                      "4" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "2" nil)
+                             (Thrift/prepareShuffleGrouping)
+                             (Utils/getGlobalStreamId "3" nil)
+                             (Thrift/prepareShuffleGrouping)}
                              (agg-bolt 4))}))]
       (submit-local-topology (:nimbus cluster)
                              "test-acking2"
@@ -314,10 +372,13 @@
     (let [feeder (feeder-spout ["field1"])
           tracker (AckFailMapTracker.)
           _ (.setAckFailDelegate feeder tracker)
-          topology (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec feeder)
-                     "2" (thrift/mk-spout-spec open-tracked-spout)}
-                    {"3" (thrift/mk-bolt-spec {"1" :global} prepare-tracked-bolt)})]
+          topology (Thrift/buildTopology
+                    {"1" (Thrift/prepareSpoutDetails feeder)
+                     "2" (Thrift/prepareSpoutDetails open-tracked-spout)}
+                    {"3" (Thrift/prepareBoltDetails
+                           {(Utils/getGlobalStreamId "1" nil)
+                            (Thrift/prepareGlobalGrouping)}
+                           prepare-tracked-bolt)})]
       (reset! bolt-prepared? false)
       (reset! spout-opened? false)      
       
@@ -343,10 +404,16 @@
     (let [[feeder checker] (ack-tracking-feeder ["num"])
           tracked (mk-tracked-topology
                    cluster
-                   (topology
-                     {"1" (spout-spec feeder)}
-                     {"2" (bolt-spec {"1" :shuffle} dup-anchor)
-                      "3" (bolt-spec {"2" :shuffle} ack-bolt)}))]
+                   (Thrift/buildTopology
+                     {"1" (Thrift/prepareSpoutDetails feeder)}
+                     {"2" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareShuffleGrouping)}
+                            dup-anchor)
+                      "3" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "2" nil)
+                             (Thrift/prepareShuffleGrouping)}
+                            ack-bolt)}))]
       (submit-local-topology (:nimbus cluster)
                              "test"
                              {}
@@ -362,36 +429,6 @@
       (checker 3)
       )))
 
-;; (defspout ConstantSpout ["val"] {:prepare false}
-;;   [collector]
-;;   (Time/sleep 100)
-;;   (emit-spout! collector [1]))
-
-;; (def errored (atom false))
-;; (def restarted (atom false))
-
-;; (defbolt local-error-checker {} [tuple collector]
-;;   (when-not @errored
-;;     (reset! errored true)
-;;     (println "erroring")
-;;     (throw (RuntimeException.)))
-;;   (when-not @restarted (println "restarted"))
-;;   (reset! restarted true))
-
-;; (deftest test-no-halt-local-mode
-;;   (with-simulated-time-local-cluster [cluster]
-;;       (let [topology (topology
-;;                       {1 (spout-spec ConstantSpout)}
-;;                       {2 (bolt-spec {1 :shuffle} local-error-checker)
-;;                        })]
-;;         (submit-local-topology (:nimbus cluster)
-;;                                "test"
-;;                                {}
-;;                                topology)
-;;         (while (not @restarted)
-;;           (advance-time-ms! 100))
-;;         )))
-
 (defspout IncSpout ["word"]
   [conf context collector]
   (let [state (atom 0)]
@@ -416,23 +453,6 @@
        )
      )))
 
-;; (deftest test-clojure-spout
-;;   (with-local-cluster [cluster]
-;;     (let [nimbus (:nimbus cluster)
-;;           top (topology
-;;                {1 (spout-spec IncSpout)}
-;;                {}
-;;                )]
-;;       (submit-local-topology nimbus
-;;                              "spout-test"
-;;                              {TOPOLOGY-DEBUG true
-;;                               TOPOLOGY-MESSAGE-TIMEOUT-SECS 3}
-;;                              top)
-;;       (Thread/sleep 10000)
-;;       (.killTopology nimbus "spout-test")
-;;       (Thread/sleep 10000)
-;;       )))
-
 (deftest test-kryo-decorators-config
   (with-simulated-time-local-cluster [cluster
                                       :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
@@ -513,11 +533,13 @@
 
 (deftest test-hooks
   (with-simulated-time-local-cluster [cluster]
-    (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])))
-                              }
-                             {"2" (bolt-spec {"1" :shuffle}
-                                             hooks-bolt)
-                              })
+    (let [topology (Thrift/buildTopology
+                     {"1" (Thrift/prepareSpoutDetails
+                            (TestPlannerSpout. (Fields. ["conf"])))}
+                     {"2" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareShuffleGrouping)}
+                            hooks-bolt)})
           results (complete-topology cluster
                                      topology
                                      :mock-sources {"1" [[1]
@@ -545,9 +567,12 @@
             [feeder checker] (ack-tracking-feeder ["num"])
             tracked (mk-tracked-topology
                      cluster
-                     (topology
-                       {"1" (spout-spec feeder)}
-                       {"2" (bolt-spec {"1" :shuffle} report-errors-bolt)}))
+                     (Thrift/buildTopology
+                       {"1" (Thrift/prepareSpoutDetails feeder)}
+                       {"2" (Thrift/prepareBoltDetails
+                              {(Utils/getGlobalStreamId "1" nil)
+                               (Thrift/prepareShuffleGrouping)}
+                              report-errors-bolt)}))
             _       (submit-local-topology (:nimbus cluster)
                                              "test-errors"
                                              {TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS 10

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
index e86e893..3b1a48b 100644
--- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
@@ -15,15 +15,19 @@
 ;; limitations under the License.
 (ns integration.org.apache.storm.testing4j-test
   (:use [clojure.test])
-  (:use [org.apache.storm config clojure testing])
+  (:use [org.apache.storm config testing util])
+  (:use [org.apache.storm.internal clojure])
   (:require [integration.org.apache.storm.integration-test :as it])
-  (:require [org.apache.storm.thrift :as thrift])
-  (:import [org.apache.storm Testing Config ILocalCluster])
+  (:require [org.apache.storm.internal.thrift :as thrift])
+  (:import [org.apache.storm Testing Config ILocalCluster]
+           [org.apache.storm.generated GlobalStreamId])
   (:import [org.apache.storm.tuple Values Tuple])
   (:import [org.apache.storm.utils Time Utils])
   (:import [org.apache.storm.testing MkClusterParam TestJob MockedSources TestWordSpout
             TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam
-            AckFailMapTracker MkTupleParam]))
+            AckFailMapTracker MkTupleParam])
+  (:import [org.apache.storm.utils Utils])
+  (:import [org.apache.storm Thrift]))
 
 (deftest test-with-simulated-time
   (is (= false (Time/isSimulating)))
@@ -69,12 +73,20 @@
     (Testing/withSimulatedTimeLocalCluster
      (reify TestJob
        (^void run [this ^ILocalCluster cluster]
-         (let [topology (thrift/mk-topology
-                         {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                         {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
-                          "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
-                          "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
-                          })
+         (let [topology (Thrift/buildTopology
+                         {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
+                         {"2" (Thrift/prepareBoltDetails
+                                {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+                                 (Thrift/prepareFieldsGrouping ["word"])}
+                                (TestWordCounter.) (Integer. 4))
+                          "3" (Thrift/prepareBoltDetails
+                                {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+                                 (Thrift/prepareGlobalGrouping)}
+                                (TestGlobalCount.))
+                          "4" (Thrift/prepareBoltDetails
+                                {(GlobalStreamId. "2" Utils/DEFAULT_STREAM_ID)
+                                 (Thrift/prepareGlobalGrouping)}
+                                (TestAggregatesCounter.))})
                mocked-sources (doto (MockedSources.)
                                 (.addMockData "1" (into-array Values [(Values. (into-array ["nathan"]))
                                                                       (Values. (into-array ["bob"]))
@@ -106,13 +118,21 @@
        (let [[feeder checker] (it/ack-tracking-feeder ["num"])
              tracked (Testing/mkTrackedTopology
                       cluster
-                      (topology
-                       {"1" (spout-spec feeder)}
-                       {"2" (bolt-spec {"1" :shuffle} it/identity-bolt)
-                        "3" (bolt-spec {"1" :shuffle} it/identity-bolt)
-                        "4" (bolt-spec
-                             {"2" :shuffle
-                              "3" :shuffle}
+                      (Thrift/buildTopology
+                       {"1" (Thrift/prepareSpoutDetails feeder)}
+                       {"2" (Thrift/prepareBoltDetails
+                              {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+                               (Thrift/prepareShuffleGrouping)}
+                              it/identity-bolt)
+                        "3" (Thrift/prepareBoltDetails
+                              {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+                               (Thrift/prepareShuffleGrouping)}
+                              it/identity-bolt)
+                        "4" (Thrift/prepareBoltDetails
+                             {(GlobalStreamId. "2" Utils/DEFAULT_STREAM_ID)
+                              (Thrift/prepareShuffleGrouping)
+                              (GlobalStreamId. "3" Utils/DEFAULT_STREAM_ID)
+                              (Thrift/prepareShuffleGrouping)}
                              (it/agg-bolt 4))}))]
          (.submitTopology cluster
                           "test-acking2"
@@ -139,9 +159,12 @@
          (let [feeder (feeder-spout ["field1"])
                tracker (AckFailMapTracker.)
                _ (.setAckFailDelegate feeder tracker)
-               topology (thrift/mk-topology
-                         {"1" (thrift/mk-spout-spec feeder)}
-                         {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
+               topology (Thrift/buildTopology
+                         {"1" (Thrift/prepareSpoutDetails feeder)}
+                         {"2" (Thrift/prepareBoltDetails
+                                {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+                                 (Thrift/prepareGlobalGrouping)}
+                                it/ack-every-other)})
                storm-conf (doto (Config.)
                             (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10))]
            (.submitTopology cluster
@@ -170,9 +193,12 @@
           (let [feeder (feeder-spout ["field1"])
                 tracker (AckFailMapTracker.)
                 _ (.setAckFailDelegate feeder tracker)
-                topology (thrift/mk-topology
-                           {"1" (thrift/mk-spout-spec feeder)}
-                           {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
+                topology (Thrift/buildTopology
+                           {"1" (Thrift/prepareSpoutDetails feeder)}
+                           {"2" (Thrift/prepareBoltDetails
+                                  {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+                                   (Thrift/prepareGlobalGrouping)}
+                                  it/ack-every-other)})
                 storm-conf (doto (Config.)
                              (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10)
                              (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))]

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/clojure_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/clojure_test.clj b/storm-core/test/clj/org/apache/storm/clojure_test.clj
index ccec825..13fdeb7 100644
--- a/storm-core/test/clj/org/apache/storm/clojure_test.clj
+++ b/storm-core/test/clj/org/apache/storm/clojure_test.clj
@@ -17,10 +17,12 @@
   (:use [clojure test])
   (:import [org.apache.storm.testing TestWordSpout TestPlannerSpout]
            [org.apache.storm.tuple Fields])
-  (:use [org.apache.storm testing clojure config])
+  (:use [org.apache.storm testing config])
+  (:use [org.apache.storm.internal clojure])
   (:use [org.apache.storm.daemon common])
-  (:require [org.apache.storm [thrift :as thrift]]))
-
+  (:require [org.apache.storm.internal [thrift :as thrift]])
+  (:import [org.apache.storm Thrift])
+  (:import [org.apache.storm.utils Utils]))
 
 (defbolt lalala-bolt1 ["word"] [[val :as tuple] collector]
   (let [ret (str val "lalala")]
@@ -56,15 +58,21 @@
 (deftest test-clojure-bolt
   (with-simulated-time-local-cluster [cluster :supervisors 4]
     (let [nimbus (:nimbus cluster)
-          topology (thrift/mk-topology
-                      {"1" (thrift/mk-spout-spec (TestWordSpout. false))}
-                      {"2" (thrift/mk-bolt-spec {"1" :shuffle}
-                                              lalala-bolt1)
-                       "3" (thrift/mk-bolt-spec {"1" :local-or-shuffle}
-                                              lalala-bolt2)
-                       "4" (thrift/mk-bolt-spec {"1" :shuffle}
-                                              (lalala-bolt3 "_nathan_"))}
-                      )
+          topology (Thrift/buildTopology
+                      {"1" (Thrift/prepareSpoutDetails (TestWordSpout. false))}
+                      {"2" (Thrift/prepareBoltDetails
+                             {(Utils/getGlobalStreamId "1" nil)
+                              (Thrift/prepareShuffleGrouping)}
+                             lalala-bolt1)
+                       "3" (Thrift/prepareBoltDetails
+                             {(Utils/getGlobalStreamId "1" nil)
+                              (Thrift/prepareLocalOrShuffleGrouping)}
+                             lalala-bolt2)
+                       "4" (Thrift/prepareBoltDetails
+                             {(Utils/getGlobalStreamId "1" nil)
+                              (Thrift/prepareShuffleGrouping)}
+                             (lalala-bolt3 "_nathan_"))}
+                     )
           results (complete-topology cluster
                                      topology
                                      :mock-sources {"1" [["david"]
@@ -91,11 +99,12 @@
 
 (deftest test-map-emit
   (with-simulated-time-local-cluster [cluster :supervisors 4]
-    (let [topology (thrift/mk-topology
-                      {"words" (thrift/mk-spout-spec (TestWordSpout. false))}
-                      {"out" (thrift/mk-bolt-spec {"words" :shuffle}
-                                              punctuator-bolt)}
-                      )
+    (let [topology (Thrift/buildTopology
+                      {"words" (Thrift/prepareSpoutDetails (TestWordSpout. false))}
+                      {"out" (Thrift/prepareBoltDetails
+                               {(Utils/getGlobalStreamId "words" nil)
+                                (Thrift/prepareShuffleGrouping)}
+                               punctuator-bolt)})
           results (complete-topology cluster
                                      topology
                                      :mock-sources {"words" [["foo"] ["bar"]]}
@@ -115,14 +124,19 @@
 
 (deftest test-component-specific-config-clojure
   (with-simulated-time-local-cluster [cluster]
-    (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])) :conf {TOPOLOGY-MESSAGE-TIMEOUT-SECS 40})
-                              }
-                             {"2" (bolt-spec {"1" :shuffle}
-                                             (conf-query-bolt {"fake.config" 1
-                                                               TOPOLOGY-MAX-TASK-PARALLELISM 2
-                                                               TOPOLOGY-MAX-SPOUT-PENDING 10})
-                                             :conf {TOPOLOGY-MAX-SPOUT-PENDING 3})
-                              })
+    (let [topology (Thrift/buildTopology
+                     {"1" (Thrift/prepareSpoutDetails
+                            (TestPlannerSpout. (Fields. ["conf"]))
+                            nil
+                            {TOPOLOGY-MESSAGE-TIMEOUT-SECS 40})}
+                     {"2" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareShuffleGrouping)}
+                            (conf-query-bolt {"fake.config" 1
+                                              TOPOLOGY-MAX-TASK-PARALLELISM 2
+                                              TOPOLOGY-MAX-SPOUT-PENDING 10})
+                            nil
+                            {TOPOLOGY-MAX-SPOUT-PENDING 3})})
           results (complete-topology cluster
                                      topology
                                      :topology-name "test123"

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index b146cb0..18e3a80 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -30,7 +30,8 @@
   (:require [conjure.core])
   (:use [conjure core])
   (:use [clojure test])
-  (:use [org.apache.storm cluster config util testing thrift log]))
+  (:use [org.apache.storm cluster config util testing log])
+  (:use [org.apache.storm.internal thrift]))
 
 (defn mk-config [zk-port]
   (merge (clojurify-structure (ConfigUtils/readStormConfig))

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/drpc_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/drpc_test.clj b/storm-core/test/clj/org/apache/storm/drpc_test.clj
index 3dcef7a..6024674 100644
--- a/storm-core/test/clj/org/apache/storm/drpc_test.clj
+++ b/storm-core/test/clj/org/apache/storm/drpc_test.clj
@@ -16,7 +16,8 @@
 (ns org.apache.storm.drpc-test
   (:use [clojure test])
   (:import [org.apache.storm.drpc ReturnResults DRPCSpout
-            LinearDRPCTopologyBuilder])
+            LinearDRPCTopologyBuilder]
+           [org.apache.storm.utils ConfigUtils Utils])
   (:import [org.apache.storm.topology FailedException])
   (:import [org.apache.storm.coordination CoordinatedBolt$FinishedCallback])
   (:import [org.apache.storm LocalDRPC LocalCluster])
@@ -25,7 +26,9 @@
            [org.apache.storm.utils.staticmocking ConfigUtilsInstaller])
   (:import [org.apache.storm.generated DRPCExecutionException])
   (:import [java.util.concurrent ConcurrentLinkedQueue])
-  (:use [org.apache.storm config testing clojure])
+  (:import [org.apache.storm Thrift])
+  (:use [org.apache.storm config testing])
+  (:use [org.apache.storm.internal clojure])
   (:use [org.apache.storm.daemon common drpc])
   (:use [conjure core]))
 
@@ -40,12 +43,16 @@
   (let [drpc (LocalDRPC.)
         spout (DRPCSpout. "test" drpc)
         cluster (LocalCluster.)
-        topology (topology
-                  {"1" (spout-spec spout)}
-                  {"2" (bolt-spec {"1" :shuffle}
-                                exclamation-bolt)
-                   "3" (bolt-spec {"2" :shuffle}
-                                (ReturnResults.))})]
+        topology (Thrift/buildTopology
+                  {"1" (Thrift/prepareSpoutDetails spout)}
+                  {"2" (Thrift/prepareBoltDetails
+                         {(Utils/getGlobalStreamId "1" nil)
+                          (Thrift/prepareShuffleGrouping)}
+                         exclamation-bolt)
+                   "3" (Thrift/prepareBoltDetails
+                         {(Utils/getGlobalStreamId "2" nil)
+                          (Thrift/prepareGlobalGrouping)}
+                         (ReturnResults.))})]
     (.submitTopology cluster "test" {} topology)
 
     (is (= "aaa!!!" (.execute drpc "test" "aaa")))

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/grouping_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj
index f2a3f4b..61caf68 100644
--- a/storm-core/test/clj/org/apache/storm/grouping_test.clj
+++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj
@@ -18,9 +18,11 @@
   (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestWordBytesCounter NGrouping]
            [org.apache.storm.generated JavaObject JavaObjectArg])
   (:import [org.apache.storm.grouping LoadMapping])
-  (:use [org.apache.storm testing clojure log config])
+  (:use [org.apache.storm testing log config])
+  (:use [org.apache.storm.internal clojure])
   (:use [org.apache.storm.daemon common executor])
-  (:require [org.apache.storm [thrift :as thrift]]))
+  (:import [org.apache.storm Thrift])
+  (:import [org.apache.storm.utils Utils]))
 
 (deftest test-shuffle
  (let [shuffle-fn (mk-shuffle-grouper [(int 1) (int 2)] {TOPOLOGY-DISABLE-LOADAWARE-MESSAGING true} nil "comp" "stream")
@@ -77,12 +79,13 @@
   (with-simulated-time-local-cluster [cluster :supervisors 4]
     (let [spout-phint 4
           bolt-phint 6
-          topology (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true)
-                                               :parallelism-hint spout-phint)}
-                    {"2" (thrift/mk-bolt-spec {"1" ["word"]}
-                                              (TestWordBytesCounter.)
-                                              :parallelism-hint bolt-phint)
+          topology (Thrift/buildTopology
+                    {"1" (Thrift/prepareSpoutDetails
+                           (TestWordSpout. true) (Integer. spout-phint))}
+                    {"2" (Thrift/prepareBoltDetails
+                           {(Utils/getGlobalStreamId "1" nil)
+                            (Thrift/prepareFieldsGrouping ["word"])}
+                           (TestWordBytesCounter.) (Integer. spout-phint))
                      })
           results (complete-topology
                     cluster
@@ -101,12 +104,13 @@
   (with-simulated-time-local-cluster [cluster :supervisors 4]
     (let [spout-phint 4
           bolt-phint 6
-          topology (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true)
-                                               :parallelism-hint spout-phint)}
-                    {"2" (thrift/mk-bolt-spec {"1" ["word"]}
-                                              (TestWordBytesCounter.)
-                                              :parallelism-hint bolt-phint)
+          topology (Thrift/buildTopology
+                    {"1" (Thrift/prepareSpoutDetails
+                           (TestWordSpout. true) (Integer. spout-phint))}
+                    {"2" (Thrift/prepareBoltDetails
+                           {(Utils/getGlobalStreamId "1" nil)
+                            (Thrift/prepareFieldsGrouping ["word"])}
+                           (TestWordBytesCounter.) (Integer. bolt-phint))
                      })
           results (complete-topology
                     cluster
@@ -127,15 +131,21 @@
 
 (deftest test-custom-groupings
   (with-simulated-time-local-cluster [cluster]
-    (let [topology (topology
-                    {"1" (spout-spec (TestWordSpout. true))}
-                    {"2" (bolt-spec {"1" (NGrouping. 2)}
-                                  id-bolt
-                                  :p 4)
-                     "3" (bolt-spec {"1" (JavaObject. "org.apache.storm.testing.NGrouping"
-                                                      [(JavaObjectArg/int_arg 3)])}
-                                  id-bolt
-                                  :p 6)
+    (let [topology (Thrift/buildTopology
+                    {"1" (Thrift/prepareSpoutDetails
+                           (TestWordSpout. true))}
+                    {"2" (Thrift/prepareBoltDetails
+                           {(Utils/getGlobalStreamId "1" nil)
+                            (Thrift/prepareCustomStreamGrouping (NGrouping. (Integer. 2)))}
+                           id-bolt
+                           (Integer. 4))
+                     "3" (Thrift/prepareBoltDetails
+                           {(Utils/getGlobalStreamId "1" nil)
+                            (Thrift/prepareCustomJavaObjectGrouping
+                              (JavaObject. "org.apache.storm.testing.NGrouping"
+                              [(JavaObjectArg/int_arg 3)]))}
+                           id-bolt
+                           (Integer. 6))
                      })
           results (complete-topology cluster
                                      topology

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
index f75a8e3..7fffd34 100644
--- a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
@@ -15,10 +15,11 @@
 ;; limitations under the License.
 (ns org.apache.storm.messaging.netty-integration-test
   (:use [clojure test])
-  (:import [org.apache.storm.messaging TransportFactory])
+  (:import [org.apache.storm.messaging TransportFactory]
+           [org.apache.storm Thrift])
   (:import [org.apache.storm.testing TestWordSpout TestGlobalCount])
-  (:use [org.apache.storm testing config])
-  (:require [org.apache.storm [thrift :as thrift]]))
+  (:import [org.apache.storm.utils Utils])
+  (:use [org.apache.storm testing util config]))
 
 (deftest test-integration
   (with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710
@@ -31,10 +32,13 @@
                                                     STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
                                                     STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
                                                     STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1}]
-    (let [topology (thrift/mk-topology
-                     {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
-                     {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
-                                               :parallelism-hint 6)})
+    (let [topology (Thrift/buildTopology
+                     {"1" (Thrift/prepareSpoutDetails
+                            (TestWordSpout. true) (Integer. 4))}
+                     {"2" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareShuffleGrouping)}
+                            (TestGlobalCount.) (Integer. 6))})
           results (complete-topology cluster
                                      topology
                                      ;; important for test that

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/messaging_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/messaging_test.clj b/storm-core/test/clj/org/apache/storm/messaging_test.clj
index e987688..402ea7f 100644
--- a/storm-core/test/clj/org/apache/storm/messaging_test.clj
+++ b/storm-core/test/clj/org/apache/storm/messaging_test.clj
@@ -18,7 +18,8 @@
   (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestEventLogSpout TestEventOrderCheckBolt])
   (:use [org.apache.storm testing config])
   (:use [org.apache.storm.daemon common])
-  (:require [org.apache.storm [thrift :as thrift]]))
+  (:import [org.apache.storm Thrift])
+  (:import [org.apache.storm.utils Utils]))
 
 (deftest test-local-transport
   (doseq [transport-on? [false true]] 
@@ -28,10 +29,13 @@
                                                       (if transport-on? true false) 
                                                       STORM-MESSAGING-TRANSPORT 
                                                       "org.apache.storm.messaging.netty.Context"}]
-      (let [topology (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 2)}
-                       {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
-                                                 :parallelism-hint 6)
+      (let [topology (Thrift/buildTopology
+                       {"1" (Thrift/prepareSpoutDetails
+                              (TestWordSpout. true) (Integer. 2))}
+                       {"2" (Thrift/prepareBoltDetails
+                              {(Utils/getGlobalStreamId "1" nil)
+                               (Thrift/prepareShuffleGrouping)}
+                              (TestGlobalCount.) (Integer. 6))
                         })
             results (complete-topology cluster
                                        topology

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/metrics_test.clj b/storm-core/test/clj/org/apache/storm/metrics_test.clj
index 9f051f6..c186288 100644
--- a/storm-core/test/clj/org/apache/storm/metrics_test.clj
+++ b/storm-core/test/clj/org/apache/storm/metrics_test.clj
@@ -25,10 +25,12 @@
   (:import [org.apache.storm.metric.api.rpc CountShellMetric])
   (:import [org.apache.storm.utils Utils])
   
-  (:use [org.apache.storm testing clojure config])
+  (:use [org.apache.storm testing config])
+  (:use [org.apache.storm.internal clojure])
   (:use [org.apache.storm.daemon common])
   (:use [org.apache.storm.metric testing])
-  (:require [org.apache.storm [thrift :as thrift]]))
+  (:import [org.apache.storm Thrift])
+  (:import [org.apache.storm.utils Utils]))
 
 (defbolt acking-bolt {} {:prepare true}
   [conf context collector]  
@@ -105,9 +107,12 @@
                            "storm.zookeeper.session.timeout" 60000
                            }]
     (let [feeder (feeder-spout ["field1"])
-          topology (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec feeder)}
-                    {"2" (thrift/mk-bolt-spec {"1" :global} count-acks)})]      
+          topology (Thrift/buildTopology
+                    {"1" (Thrift/prepareSpoutDetails feeder)}
+                    {"2" (Thrift/prepareBoltDetails
+                           {(Utils/getGlobalStreamId "1" nil)
+                            (Thrift/prepareGlobalGrouping)}
+                           count-acks)})]
       (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
 
       (.feed feeder ["a"] 1)
@@ -133,9 +138,12 @@
                            "storm.zookeeper.session.timeout" 60000
                            }]
     (let [feeder (feeder-spout ["field1"])
-          topology (thrift/mk-topology
-                     {"1" (thrift/mk-spout-spec feeder)}
-                     {"2" (thrift/mk-bolt-spec {"1" :all} count-acks :p 1 :conf {TOPOLOGY-TASKS 2})})]
+          topology (Thrift/buildTopology
+                     {"1" (Thrift/prepareSpoutDetails feeder)}
+                     {"2" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareAllGrouping)}
+                            count-acks (Integer. 1) {TOPOLOGY-TASKS 2})})]
       (submit-local-topology (:nimbus cluster) "metrics-tester-with-multitasks" {} topology)
 
       (.feed feeder ["a"] 1)
@@ -154,10 +162,9 @@
       (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
 
 (defn mk-shell-bolt-with-metrics-spec
-  [inputs command & kwargs]
-  (let [command (into-array String command)]
-    (apply thrift/mk-bolt-spec inputs
-         (PythonShellMetricsBolt. command) kwargs)))
+  [inputs command file]
+    (Thrift/prepareBoltDetails inputs
+         (PythonShellMetricsBolt. command file)))
 
 (deftest test-custom-metric-with-multilang-py
   (with-simulated-time-local-cluster 
@@ -167,9 +174,12 @@
                        "storm.zookeeper.session.timeout" 60000
                        }]
     (let [feeder (feeder-spout ["field1"])
-          topology (thrift/mk-topology
-                     {"1" (thrift/mk-spout-spec feeder)}
-                     {"2" (mk-shell-bolt-with-metrics-spec {"1" :global} ["python" "tester_bolt_metrics.py"])})]
+          topology (Thrift/buildTopology
+                     {"1" (Thrift/prepareSpoutDetails feeder)}
+                     {"2" (mk-shell-bolt-with-metrics-spec
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareGlobalGrouping)}
+                            "python" "tester_bolt_metrics.py")})]
       (submit-local-topology (:nimbus cluster) "shell-metrics-tester" {} topology)
 
       (.feed feeder ["a"] 1)
@@ -189,9 +199,8 @@
       )))
 
 (defn mk-shell-spout-with-metrics-spec
-  [command & kwargs]
-  (let [command (into-array String command)]
-    (apply thrift/mk-spout-spec (PythonShellMetricsSpout. command) kwargs)))
+  [command file]
+    (Thrift/prepareSpoutDetails (PythonShellMetricsSpout. command file)))
 
 (deftest test-custom-metric-with-spout-multilang-py
   (with-simulated-time-local-cluster 
@@ -199,9 +208,12 @@
                        [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
                        "storm.zookeeper.connection.timeout" 30000
                        "storm.zookeeper.session.timeout" 60000}]
-    (let [topology (thrift/mk-topology
-                     {"1" (mk-shell-spout-with-metrics-spec ["python" "tester_spout_metrics.py"])}
-                     {"2" (thrift/mk-bolt-spec {"1" :all} count-acks)})]
+    (let [topology (Thrift/buildTopology
+                     {"1" (mk-shell-spout-with-metrics-spec "python" "tester_spout_metrics.py")}
+                     {"2" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareAllGrouping)}
+                            count-acks)})]
       (submit-local-topology (:nimbus cluster) "shell-spout-metrics-tester" {} topology)
 
       (advance-cluster-time cluster 7)
@@ -216,9 +228,12 @@
                            TOPOLOGY-STATS-SAMPLE-RATE 1.0
                            TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
     (let [feeder (feeder-spout ["field1"])
-          topology (thrift/mk-topology
-                    {"myspout" (thrift/mk-spout-spec feeder)}
-                    {"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} acking-bolt)})]      
+          topology (Thrift/buildTopology
+                    {"myspout" (Thrift/prepareSpoutDetails feeder)}
+                    {"mybolt" (Thrift/prepareBoltDetails
+                                {(Utils/getGlobalStreamId "myspout" nil)
+                                 (Thrift/prepareShuffleGrouping)}
+                                acking-bolt)})]
       (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
       
       (.feed feeder ["a"] 1)
@@ -255,9 +270,12 @@
     (let [feeder (feeder-spout ["field1"])
           tracker (AckFailMapTracker.)
           _ (.setAckFailDelegate feeder tracker)
-          topology (thrift/mk-topology
-                    {"myspout" (thrift/mk-spout-spec feeder)}
-                    {"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} ack-every-other)})]      
+          topology (Thrift/buildTopology
+                    {"myspout" (Thrift/prepareSpoutDetails feeder)}
+                    {"mybolt" (Thrift/prepareBoltDetails
+                                {(Utils/getGlobalStreamId "myspout" nil)
+                                 (Thrift/prepareShuffleGrouping)}
+                                ack-every-other)})]
       (submit-local-topology (:nimbus cluster)
                              "metrics-tester"
                              {}
@@ -307,9 +325,12 @@
     (let [feeder (feeder-spout ["field1"])
           tracker (AckFailMapTracker.)
           _ (.setAckFailDelegate feeder tracker)
-          topology (thrift/mk-topology
-                    {"myspout" (thrift/mk-spout-spec feeder)}
-                    {"mybolt" (thrift/mk-bolt-spec {"myspout" :global} ack-every-other)})]      
+          topology (Thrift/buildTopology
+                    {"myspout" (Thrift/prepareSpoutDetails feeder)}
+                    {"mybolt" (Thrift/prepareBoltDetails
+                                {(Utils/getGlobalStreamId "myspout" nil)
+                                 (Thrift/prepareGlobalGrouping)}
+                                ack-every-other)})]
       (submit-local-topology (:nimbus cluster)
                              "timeout-tester"
                              {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
@@ -341,8 +362,8 @@
                            [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
                            TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
     (let [feeder (feeder-spout ["field1"])
-          topology (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec feeder)}
+          topology (Thrift/buildTopology
+                    {"1" (Thrift/prepareSpoutDetails feeder)}
                     {})]      
       (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)