You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/18 17:12:39 UTC
[2/5] storm git commit: backport thrift.clj to Thrift.java
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/thrift.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/thrift.clj b/storm-core/src/clj/org/apache/storm/thrift.clj
deleted file mode 100644
index 779c1d1..0000000
--- a/storm-core/src/clj/org/apache/storm/thrift.clj
+++ /dev/null
@@ -1,286 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.thrift
- (:import [java.util HashMap]
- [java.io Serializable]
- [org.apache.storm.generated NodeInfo Assignment])
- (:import [org.apache.storm.generated JavaObject Grouping Nimbus StormTopology
- StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface
- ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo
- GlobalStreamId ComponentObject ComponentObject$_Fields
- ShellComponent SupervisorInfo])
- (:import [org.apache.storm.utils Utils NimbusClient ConfigUtils])
- (:import [org.apache.storm Constants])
- (:import [org.apache.storm.security.auth ReqContext])
- (:import [org.apache.storm.grouping CustomStreamGrouping])
- (:import [org.apache.storm.topology TopologyBuilder])
- (:import [org.apache.storm.clojure RichShellBolt RichShellSpout])
- (:import [org.apache.thrift.transport TTransport]
- (org.json.simple JSONValue))
- (:use [org.apache.storm util config log zookeeper]))
-
-(defn instantiate-java-object
- [^JavaObject obj]
- (let [name (symbol (.get_full_class_name obj))
- args (map (memfn getFieldValue) (.get_args_list obj))]
- (eval `(new ~name ~@args))))
-
-(def grouping-constants
- {Grouping$_Fields/FIELDS :fields
- Grouping$_Fields/SHUFFLE :shuffle
- Grouping$_Fields/ALL :all
- Grouping$_Fields/NONE :none
- Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized
- Grouping$_Fields/CUSTOM_OBJECT :custom-object
- Grouping$_Fields/DIRECT :direct
- Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle})
-
-(defn grouping-type
- [^Grouping grouping]
- (grouping-constants (.getSetField grouping)))
-
-(defn field-grouping
- [^Grouping grouping]
- (when-not (= (grouping-type grouping) :fields)
- (throw (IllegalArgumentException. "Tried to get grouping fields from non fields grouping")))
- (.get_fields grouping))
-
-(defn global-grouping?
- [^Grouping grouping]
- (and (= :fields (grouping-type grouping))
- (empty? (field-grouping grouping))))
-
-(defn parallelism-hint
- [^ComponentCommon component-common]
- (let [phint (.get_parallelism_hint component-common)]
- (if-not (.is_set_parallelism_hint component-common) 1 phint)))
-
-(defn nimbus-client-and-conn
- ([host port]
- (nimbus-client-and-conn host port nil))
- ([host port as-user]
- (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user)
- (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
- nimbusClient (NimbusClient. conf host port nil as-user)
- client (.getClient nimbusClient)
- transport (.transport nimbusClient)]
- [client transport] )))
-
-(defmacro with-nimbus-connection
- [[client-sym host port] & body]
- `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port)]
- (try
- ~@body
- (finally (.close conn#)))))
-
-(defmacro with-configured-nimbus-connection
- [client-sym & body]
- `(let [conf# (clojurify-structure (ConfigUtils/readStormConfig))
- context# (ReqContext/context)
- user# (if (.principal context#) (.getName (.principal context#)))
- nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#)
- ~client-sym (.getClient nimbusClient#)
- conn# (.transport nimbusClient#)
- ]
- (try
- ~@body
- (finally (.close conn#)))))
-
-(defn direct-output-fields
- [fields]
- (StreamInfo. fields true))
-
-(defn output-fields
- [fields]
- (StreamInfo. fields false))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn mk-output-spec
- [output-spec]
- (let [output-spec (if (map? output-spec)
- output-spec
- {Utils/DEFAULT_STREAM_ID output-spec})]
- (map-val
- (fn [out]
- (if (instance? StreamInfo out)
- out
- (StreamInfo. out false)))
- output-spec)))
-
-(defnk mk-plain-component-common
- [inputs output-spec parallelism-hint :conf nil]
- (let [ret (ComponentCommon. (HashMap. inputs) (HashMap. (mk-output-spec output-spec)))]
- (when parallelism-hint
- (.set_parallelism_hint ret parallelism-hint))
- (when conf
- (.set_json_conf ret (JSONValue/toJSONString conf)))
- ret))
-
-(defnk mk-spout-spec*
- [spout outputs :p nil :conf nil]
- (SpoutSpec. (ComponentObject/serialized_java (Utils/javaSerialize spout))
- (mk-plain-component-common {} outputs p :conf conf)))
-
-(defn mk-shuffle-grouping
- []
- (Grouping/shuffle (NullStruct.)))
-
-(defn mk-local-or-shuffle-grouping
- []
- (Grouping/local_or_shuffle (NullStruct.)))
-
-(defn mk-fields-grouping
- [fields]
- (Grouping/fields fields))
-
-(defn mk-global-grouping
- []
- (mk-fields-grouping []))
-
-(defn mk-direct-grouping
- []
- (Grouping/direct (NullStruct.)))
-
-(defn mk-all-grouping
- []
- (Grouping/all (NullStruct.)))
-
-(defn mk-none-grouping
- []
- (Grouping/none (NullStruct.)))
-
-(defn deserialized-component-object
- [^ComponentObject obj]
- (when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA)
- (throw (RuntimeException. "Cannot deserialize non-java-serialized object")))
- (Utils/javaDeserialize (.get_serialized_java obj) Serializable))
-
-(defn serialize-component-object
- [obj]
- (ComponentObject/serialized_java (Utils/javaSerialize obj)))
-
-(defn- mk-grouping
- [grouping-spec]
- (cond (nil? grouping-spec)
- (mk-none-grouping)
-
- (instance? Grouping grouping-spec)
- grouping-spec
-
- (instance? CustomStreamGrouping grouping-spec)
- (Grouping/custom_serialized (Utils/javaSerialize grouping-spec))
-
- (instance? JavaObject grouping-spec)
- (Grouping/custom_object grouping-spec)
-
- (sequential? grouping-spec)
- (mk-fields-grouping grouping-spec)
-
- (= grouping-spec :shuffle)
- (mk-shuffle-grouping)
-
- (= grouping-spec :local-or-shuffle)
- (mk-local-or-shuffle-grouping)
- (= grouping-spec :none)
- (mk-none-grouping)
-
- (= grouping-spec :all)
- (mk-all-grouping)
-
- (= grouping-spec :global)
- (mk-global-grouping)
-
- (= grouping-spec :direct)
- (mk-direct-grouping)
-
- true
- (throw (IllegalArgumentException.
- (str grouping-spec " is not a valid grouping")))))
-
-(defn- mk-inputs
- [inputs]
- (into {} (for [[stream-id grouping-spec] inputs]
- [(if (sequential? stream-id)
- (GlobalStreamId. (first stream-id) (second stream-id))
- (GlobalStreamId. stream-id Utils/DEFAULT_STREAM_ID))
- (mk-grouping grouping-spec)])))
-
-(defnk mk-bolt-spec*
- [inputs bolt outputs :p nil :conf nil]
- (let [common (mk-plain-component-common (mk-inputs inputs) outputs p :conf conf)]
- (Bolt. (ComponentObject/serialized_java (Utils/javaSerialize bolt))
- common)))
-
-(defnk mk-spout-spec
- [spout :parallelism-hint nil :p nil :conf nil]
- (let [parallelism-hint (if p p parallelism-hint)]
- {:obj spout :p parallelism-hint :conf conf}))
-
-(defn- shell-component-params
- [command script-or-output-spec kwargs]
- (if (string? script-or-output-spec)
- [(into-array String [command script-or-output-spec])
- (first kwargs)
- (rest kwargs)]
- [(into-array String command)
- script-or-output-spec
- kwargs]))
-
-(defnk mk-bolt-spec
- [inputs bolt :parallelism-hint nil :p nil :conf nil]
- (let [parallelism-hint (if p p parallelism-hint)]
- {:obj bolt :inputs inputs :p parallelism-hint :conf conf}))
-
-(defn mk-shell-bolt-spec
- [inputs command script-or-output-spec & kwargs]
- (let [[command output-spec kwargs]
- (shell-component-params command script-or-output-spec kwargs)]
- (apply mk-bolt-spec inputs
- (RichShellBolt. command (mk-output-spec output-spec)) kwargs)))
-
-(defn mk-shell-spout-spec
- [command script-or-output-spec & kwargs]
- (let [[command output-spec kwargs]
- (shell-component-params command script-or-output-spec kwargs)]
- (apply mk-spout-spec
- (RichShellSpout. command (mk-output-spec output-spec)) kwargs)))
-
-(defn- add-inputs
- [declarer inputs]
- (doseq [[id grouping] (mk-inputs inputs)]
- (.grouping declarer id grouping)))
-
-(defn mk-topology
- ([spout-map bolt-map]
- (let [builder (TopologyBuilder.)]
- (doseq [[name {spout :obj p :p conf :conf}] spout-map]
- (-> builder (.setSpout name spout (if-not (nil? p) (int p) p)) (.addConfigurations conf)))
- (doseq [[name {bolt :obj p :p conf :conf inputs :inputs}] bolt-map]
- (-> builder (.setBolt name bolt (if-not (nil? p) (int p) p)) (.addConfigurations conf) (add-inputs inputs)))
- (.createTopology builder)))
- ([spout-map bolt-map state-spout-map]
- (mk-topology spout-map bolt-map)))
-
-;; clojurify-structure is needed or else every element becomes the same after successive calls
-;; don't know why this happens
-(def STORM-TOPOLOGY-FIELDS
- (-> StormTopology/metaDataMap clojurify-structure keys))
-
-(def SPOUT-FIELDS
- [StormTopology$_Fields/SPOUTS
- StormTopology$_Fields/STATE_SPOUTS])
-
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 1bf85d4..5b5acdb 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -49,7 +49,7 @@
(:require [compojure.route :as route]
[compojure.handler :as handler]
[ring.util.response :as resp]
- [org.apache.storm [thrift :as thrift]])
+ [org.apache.storm.internal [thrift :as thrift]])
(:require [metrics.meters :refer [defmeter mark!]])
(:import [org.apache.commons.lang StringEscapeUtils])
(:import [org.apache.logging.log4j Level])
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/jvm/org/apache/storm/Thrift.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Thrift.java b/storm-core/src/jvm/org/apache/storm/Thrift.java
new file mode 100644
index 0000000..cde822f
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/Thrift.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm;
+
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.JavaObjectArg;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
+import org.apache.storm.generated.StreamInfo;
+
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.HashMap;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.storm.generated.JavaObject;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StormTopology._Fields;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.NullStruct;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.ComponentObject;
+
+import org.apache.storm.task.IBolt;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.SpoutDeclarer;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.utils.Utils;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.topology.TopologyBuilder;
+
+public class Thrift {
+ private static Logger LOG = LoggerFactory.getLogger(Thrift.class);
+
+ private static StormTopology._Fields[] STORM_TOPOLOGY_FIELDS = null;
+ private static StormTopology._Fields[] SPOUT_FIELDS =
+ { StormTopology._Fields.SPOUTS, StormTopology._Fields.STATE_SPOUTS };
+
+ static {
+ Set<_Fields> keys = StormTopology.metaDataMap.keySet();
+ keys.toArray(STORM_TOPOLOGY_FIELDS = new StormTopology._Fields[keys.size()]);
+ }
+
+ public static StormTopology._Fields[] getTopologyFields() {
+ return STORM_TOPOLOGY_FIELDS;
+ }
+
+ public static StormTopology._Fields[] getSpoutFields() {
+ return SPOUT_FIELDS;
+ }
+
+ public static class SpoutDetails {
+ private IRichSpout spout;
+ private Integer parallelism;
+ private Map conf;
+
+ public SpoutDetails(IRichSpout spout, Integer parallelism, Map conf) {
+ this.spout = spout;
+ this.parallelism = parallelism;
+ this.conf = conf;
+ }
+
+ public IRichSpout getSpout() {
+ return spout;
+ }
+
+ public Integer getParallelism() {
+ return parallelism;
+ }
+
+ public Map getConf() {
+ return conf;
+ }
+ }
+
+ public static class BoltDetails {
+ private Object bolt;
+ private Map conf;
+ private Integer parallelism;
+ private Map<GlobalStreamId, Grouping> inputs;
+
+ public BoltDetails(Object bolt, Map conf, Integer parallelism,
+ Map<GlobalStreamId, Grouping> inputs) {
+ this.bolt = bolt;
+ this.conf = conf;
+ this.parallelism = parallelism;
+ this.inputs = inputs;
+ }
+
+ public Object getBolt() {
+ return bolt;
+ }
+
+ public Map getConf() {
+ return conf;
+ }
+
+ public Map<GlobalStreamId, Grouping> getInputs() {
+ return inputs;
+ }
+
+ public Integer getParallelism() {
+ return parallelism;
+ }
+ }
+
+ public static StreamInfo directOutputFields(List<String> fields) {
+ return new StreamInfo(fields, true);
+ }
+
+ public static StreamInfo outputFields(List<String> fields) {
+ return new StreamInfo(fields, false);
+ }
+
+ public static Grouping prepareShuffleGrouping() {
+ return Grouping.shuffle(new NullStruct());
+ }
+
+ public static Grouping prepareLocalOrShuffleGrouping() {
+ return Grouping.local_or_shuffle(new NullStruct());
+ }
+
+ public static Grouping prepareFieldsGrouping(List<String> fields) {
+ return Grouping.fields(fields);
+ }
+
+ public static Grouping prepareGlobalGrouping() {
+ return prepareFieldsGrouping(new ArrayList<String>());
+ }
+
+ public static Grouping prepareDirectGrouping() {
+ return Grouping.direct(new NullStruct());
+ }
+
+ public static Grouping prepareAllGrouping() {
+ return Grouping.all(new NullStruct());
+ }
+
+ public static Grouping prepareNoneGrouping() {
+ return Grouping.none(new NullStruct());
+ }
+
+ public static Grouping prepareCustomStreamGrouping(Object obj) {
+ return Grouping.custom_serialized(Utils.javaSerialize(obj));
+ }
+
+ public static Grouping prepareCustomJavaObjectGrouping(JavaObject obj) {
+ return Grouping.custom_object(obj);
+ }
+
+ public static Object instantiateJavaObject(JavaObject obj) {
+
+ List<JavaObjectArg> args = obj.get_args_list();
+ Class[] paraTypes = new Class[args.size()];
+ Object[] paraValues = new Object[args.size()];
+ for (int i = 0; i < args.size(); i++) {
+ JavaObjectArg arg = args.get(i);
+ paraValues[i] = arg.getFieldValue();
+
+ if (arg.getSetField().equals(JavaObjectArg._Fields.INT_ARG)) {
+ paraTypes[i] = Integer.class;
+ } else if (arg.getSetField().equals(JavaObjectArg._Fields.LONG_ARG)) {
+ paraTypes[i] = Long.class;
+ } else if (arg.getSetField().equals(JavaObjectArg._Fields.STRING_ARG)) {
+ paraTypes[i] = String.class;
+ } else if (arg.getSetField().equals(JavaObjectArg._Fields.BOOL_ARG)) {
+ paraTypes[i] = Boolean.class;
+ } else if (arg.getSetField().equals(JavaObjectArg._Fields.BINARY_ARG)) {
+ paraTypes[i] = ByteBuffer.class;
+ } else if (arg.getSetField().equals(JavaObjectArg._Fields.DOUBLE_ARG)) {
+ paraTypes[i] = Double.class;
+ } else {
+ paraTypes[i] = Object.class;
+ }
+ }
+
+ try {
+ Class clazz = Class.forName(obj.get_full_class_name());
+ Constructor cons = clazz.getConstructor(paraTypes);
+ return cons.newInstance(paraValues);
+ } catch (Exception e) {
+ LOG.error("java object instantiation failed", e);
+ }
+
+ return null;
+
+ }
+
+ public static Grouping._Fields groupingType(Grouping grouping) {
+ return grouping.getSetField();
+ }
+
+ public static List<String> fieldGrouping(Grouping grouping) {
+ if (!Grouping._Fields.FIELDS.equals(groupingType(grouping))) {
+ throw new IllegalArgumentException("Tried to get grouping fields from non fields grouping");
+ }
+ return grouping.get_fields();
+ }
+
+ public static boolean isGlobalGrouping(Grouping grouping) {
+ if (Grouping._Fields.FIELDS.equals(groupingType(grouping))) {
+ return fieldGrouping(grouping).isEmpty();
+ }
+
+ return false;
+ }
+
+ public static int getParallelismHint(ComponentCommon componentCommon) {
+ if (!componentCommon.is_set_parallelism_hint()) {
+ return 1;
+ } else {
+ return componentCommon.get_parallelism_hint();
+ }
+ }
+
+ public static ComponentObject serializeComponentObject(Object obj) {
+ return ComponentObject.serialized_java(Utils.javaSerialize(obj));
+ }
+
+ public static Object deserializeComponentObject(ComponentObject obj) {
+ if (obj.getSetField() != ComponentObject._Fields.SERIALIZED_JAVA) {
+ throw new RuntimeException("Cannot deserialize non-java-serialized object");
+ }
+ return Utils.javaDeserialize(obj.get_serialized_java(), Serializable.class);
+ }
+
+ public static ComponentCommon prepareComponentCommon(Map<GlobalStreamId, Grouping> inputs, Map<String,
+ StreamInfo> outputs, Integer parallelismHint) {
+ return prepareComponentCommon(inputs, outputs, parallelismHint, null);
+ }
+
+ public static ComponentCommon prepareComponentCommon(Map<GlobalStreamId, Grouping> inputs, Map<String, StreamInfo> outputs,
+ Integer parallelismHint, Map conf) {
+ Map<GlobalStreamId, Grouping> mappedInputs = new HashMap<>();
+ Map<String, StreamInfo> mappedOutputs = new HashMap<>();
+ if (inputs != null && !inputs.isEmpty()) {
+ mappedInputs.putAll(inputs);
+ }
+ if (outputs !=null && !outputs.isEmpty()) {
+ mappedOutputs.putAll(outputs);
+ }
+ ComponentCommon component = new ComponentCommon(mappedInputs, mappedOutputs);
+ if (parallelismHint != null) {
+ component.set_parallelism_hint(parallelismHint);
+ }
+ if (conf != null) {
+ component.set_json_conf(JSONValue.toJSONString(conf));
+ }
+ return component;
+ }
+
+ public static SpoutSpec prepareSerializedSpoutDetails(IRichSpout spout, Map<String, StreamInfo> outputs) {
+ return new SpoutSpec(ComponentObject.serialized_java
+ (Utils.javaSerialize(spout)), prepareComponentCommon(new HashMap(), outputs, null, null));
+ }
+
+ public static Bolt prepareSerializedBoltDetails(Map<GlobalStreamId, Grouping> inputs, IBolt bolt, Map<String, StreamInfo> outputs,
+ Integer parallelismHint, Map conf) {
+ ComponentCommon common = prepareComponentCommon(inputs, outputs, parallelismHint, conf);
+ return new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common);
+ }
+
+ public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt) {
+ return prepareBoltDetails(inputs, bolt, null, null);
+ }
+
+ public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt,
+ Integer parallelismHint) {
+ return prepareBoltDetails(inputs, bolt, parallelismHint, null);
+ }
+
+ public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt,
+ Integer parallelismHint, Map conf) {
+ BoltDetails details = new BoltDetails(bolt, conf, parallelismHint, inputs);
+ return details;
+ }
+
+ public static SpoutDetails prepareSpoutDetails(IRichSpout spout) {
+ return prepareSpoutDetails(spout, null, null);
+ }
+
+ public static SpoutDetails prepareSpoutDetails(IRichSpout spout, Integer parallelismHint) {
+ return prepareSpoutDetails(spout, parallelismHint, null);
+ }
+
+ public static SpoutDetails prepareSpoutDetails(IRichSpout spout, Integer parallelismHint, Map conf) {
+ SpoutDetails details = new SpoutDetails(spout, parallelismHint, conf);
+ return details;
+ }
+
+ public static StormTopology buildTopology(HashMap<String, SpoutDetails> spoutMap,
+ HashMap<String, BoltDetails> boltMap, HashMap<String, StateSpoutSpec> stateMap) {
+ return buildTopology(spoutMap, boltMap);
+ }
+
+ private static void addInputs(BoltDeclarer declarer, Map<GlobalStreamId, Grouping> inputs) {
+ for(Entry<GlobalStreamId, Grouping> entry : inputs.entrySet()) {
+ declarer.grouping(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public static StormTopology buildTopology(Map<String, SpoutDetails> spoutMap, Map<String, BoltDetails> boltMap) {
+ TopologyBuilder builder = new TopologyBuilder();
+ for (Entry<String, SpoutDetails> entry : spoutMap.entrySet()) {
+ String spoutID = entry.getKey();
+ SpoutDetails spec = entry.getValue();
+ SpoutDeclarer spoutDeclarer = builder.setSpout(spoutID, spec.getSpout(), spec.getParallelism());
+ spoutDeclarer.addConfigurations(spec.getConf());
+ }
+ for (Entry<String, BoltDetails> entry : boltMap.entrySet()) {
+ String spoutID = entry.getKey();
+ BoltDetails spec = entry.getValue();
+ BoltDeclarer boltDeclarer = null;
+ if (spec.bolt instanceof IRichBolt) {
+ boltDeclarer = builder.setBolt(spoutID, (IRichBolt)spec.getBolt(), spec.getParallelism());
+ } else {
+ boltDeclarer = builder.setBolt(spoutID, (IBasicBolt)spec.getBolt(), spec.getParallelism());
+ }
+ boltDeclarer.addConfigurations(spec.getConf());
+ addInputs(boltDeclarer, spec.getInputs());
+ }
+ return builder.createTopology();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/jvm/org/apache/storm/testing/NGrouping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/NGrouping.java b/storm-core/src/jvm/org/apache/storm/testing/NGrouping.java
index 45b263d..06853fe 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/NGrouping.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/NGrouping.java
@@ -28,9 +28,7 @@ public class NGrouping implements CustomStreamGrouping {
int _n;
List<Integer> _outTasks;
- public NGrouping(int n) {
- _n = n;
- }
+ public NGrouping(Integer n) {_n = n;}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java b/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java
index eaf7dc8..4beec48 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java
@@ -28,18 +28,22 @@ import org.apache.storm.topology.OutputFieldsDeclarer;
public class PythonShellMetricsBolt extends ShellBolt implements IRichBolt {
private static final long serialVersionUID = 1999209252187463355L;
-
- public PythonShellMetricsBolt(String[] command) {
- super(command);
+
+ public PythonShellMetricsBolt(String[] args) {
+ super(args);
}
+ public PythonShellMetricsBolt(String command, String file) {
+ super(command, file);
+ }
+
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
super.prepare(stormConf, context, collector);
-
+
CountShellMetric cMetric = new CountShellMetric();
context.registerMetric("my-custom-shell-metric", cMetric, 5);
}
-
+
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java b/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java
index ed6de14..657baa6 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java
@@ -33,11 +33,15 @@ public class PythonShellMetricsSpout extends ShellSpout implements IRichSpout {
public PythonShellMetricsSpout(String[] command) {
super(command);
}
-
+
+ public PythonShellMetricsSpout(String command, String file) {
+ super(command, file);
+ }
+
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
super.open(conf, context, collector);
-
+
CountShellMetric cMetric = new CountShellMetric();
context.registerMetric("my-custom-shellspout-metric", cMetric, 5);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 9a849ea..eca9690 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -52,7 +52,6 @@ import org.apache.thrift.TSerializer;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
-import org.eclipse.jetty.util.log.Log;
import org.json.simple.JSONValue;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
@@ -1456,6 +1455,13 @@ public class Utils {
return number & Integer.MAX_VALUE;
}
+ public static GlobalStreamId getGlobalStreamId(String streamId, String componentId) {
+ if (componentId == null) {
+ return new GlobalStreamId(streamId, DEFAULT_STREAM_ID);
+ }
+ return new GlobalStreamId(streamId, componentId);
+ }
+
public static RuntimeException wrapInRuntime(Exception e){
if (e instanceof RuntimeException){
return (RuntimeException)e;
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index 5ba6651..6dce7d6 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -15,26 +15,37 @@
;; limitations under the License.
(ns integration.org.apache.storm.integration-test
(:use [clojure test])
- (:import [org.apache.storm Config])
+ (:import [org.apache.storm Config Thrift])
(:import [org.apache.storm.topology TopologyBuilder])
(:import [org.apache.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions])
(:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount
TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
(:import [org.apache.storm.tuple Fields])
- (:use [org.apache.storm testing config clojure])
+ (:use [org.apache.storm testing config util])
+ (:use [org.apache.storm.internal clojure])
(:use [org.apache.storm.daemon common])
- (:require [org.apache.storm [thrift :as thrift]]))
+ (:import [org.apache.storm Thrift])
+ (:import [org.apache.storm.utils Utils]))
(deftest test-basic-topology
(doseq [zmq-on? [true false]]
(with-simulated-time-local-cluster [cluster :supervisors 4
:daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}]
- (let [topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
- {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
- "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
- "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
- })
+ (let [topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails
+ (TestWordSpout. true) (Integer. 3))}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareFieldsGrouping ["word"])}
+ (TestWordCounter.) (Integer. 4))
+ "3" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareGlobalGrouping)}
+ (TestGlobalCount.))
+ "4" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "2" nil)
+ (Thrift/prepareGlobalGrouping)}
+ (TestAggregatesCounter.))})
results (complete-topology cluster
topology
:mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
@@ -60,12 +71,14 @@
(deftest test-multi-tasks-per-executor
(with-simulated-time-local-cluster [cluster :supervisors 4]
- (let [topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true))}
- {"2" (thrift/mk-bolt-spec {"1" :all} emit-task-id
- :parallelism-hint 3
- :conf {TOPOLOGY-TASKS 6})
- })
+ (let [topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true))}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareAllGrouping)}
+ emit-task-id
+ (Integer. 3)
+ {TOPOLOGY-TASKS 6})})
results (complete-topology cluster
topology
:mock-sources {"1" [["a"]]})]
@@ -98,9 +111,11 @@
(let [feeder (feeder-spout ["field1"])
tracker (AckFailMapTracker.)
_ (.setAckFailDelegate feeder tracker)
- topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec feeder)}
- {"2" (thrift/mk-bolt-spec {"1" :global} ack-every-other)})]
+ topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails feeder)}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareGlobalGrouping)} ack-every-other)})]
(submit-local-topology (:nimbus cluster)
"timeout-tester"
{TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
@@ -117,24 +132,36 @@
)))
(defn mk-validate-topology-1 []
- (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
- {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+ (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareFieldsGrouping ["word"])}
+ (TestWordCounter.) (Integer. 4))}))
(defn mk-invalidate-topology-1 []
- (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
- {"2" (thrift/mk-bolt-spec {"3" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+ (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "3" nil)
+ (Thrift/prepareFieldsGrouping ["word"])}
+ (TestWordCounter.) (Integer. 4))}))
(defn mk-invalidate-topology-2 []
- (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
- {"2" (thrift/mk-bolt-spec {"1" ["non-exists-field"]} (TestWordCounter.) :parallelism-hint 4)}))
+ (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareFieldsGrouping ["non-exists-field"])}
+ (TestWordCounter.) (Integer. 4))}))
(defn mk-invalidate-topology-3 []
- (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
- {"2" (thrift/mk-bolt-spec {["1" "non-exists-stream"] ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+ (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" "non-exists-stream")
+ (Thrift/prepareFieldsGrouping ["word"])}
+ (TestWordCounter.) (Integer. 4))}))
(defn try-complete-wc-topology [cluster topology]
(try (do
@@ -164,10 +191,15 @@
(deftest test-system-stream
;; this test works because mocking a spout splits up the tuples evenly among the tasks
(with-simulated-time-local-cluster [cluster]
- (let [topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true) :p 3)}
- {"2" (thrift/mk-bolt-spec {"1" ["word"] ["1" "__system"] :global} identity-bolt :p 1)
- })
+ (let [topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails
+ (TestWordSpout. true) (Integer. 3))}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareFieldsGrouping ["word"])
+ (Utils/getGlobalStreamId "1" "__system")
+ (Thrift/prepareGlobalGrouping)}
+ identity-bolt (Integer. 1))})
results (complete-topology cluster
topology
:mock-sources {"1" [["a"] ["b"] ["c"]]}
@@ -218,20 +250,38 @@
[feeder3 checker3] (ack-tracking-feeder ["num"])
tracked (mk-tracked-topology
cluster
- (topology
- {"1" (spout-spec feeder1)
- "2" (spout-spec feeder2)
- "3" (spout-spec feeder3)}
- {"4" (bolt-spec {"1" :shuffle} (branching-bolt 2))
- "5" (bolt-spec {"2" :shuffle} (branching-bolt 4))
- "6" (bolt-spec {"3" :shuffle} (branching-bolt 1))
- "7" (bolt-spec
- {"4" :shuffle
- "5" :shuffle
- "6" :shuffle}
+ (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails feeder1)
+ "2" (Thrift/prepareSpoutDetails feeder2)
+ "3" (Thrift/prepareSpoutDetails feeder3)}
+ {"4" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareShuffleGrouping)}
+ (branching-bolt 2))
+ "5" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "2" nil)
+ (Thrift/prepareShuffleGrouping)}
+ (branching-bolt 4))
+ "6" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "3" nil)
+ (Thrift/prepareShuffleGrouping)}
+ (branching-bolt 1))
+ "7" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "4" nil)
+ (Thrift/prepareShuffleGrouping)
+ (Utils/getGlobalStreamId "5" nil)
+ (Thrift/prepareShuffleGrouping)
+ (Utils/getGlobalStreamId "6" nil)
+ (Thrift/prepareShuffleGrouping)}
(agg-bolt 3))
- "8" (bolt-spec {"7" :shuffle} (branching-bolt 2))
- "9" (bolt-spec {"8" :shuffle} ack-bolt)}
+ "8" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "7" nil)
+ (Thrift/prepareShuffleGrouping)}
+ (branching-bolt 2))
+ "9" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "8" nil)
+ (Thrift/prepareShuffleGrouping)}
+ ack-bolt)}
))]
(submit-local-topology (:nimbus cluster)
"acking-test1"
@@ -268,13 +318,21 @@
(let [[feeder checker] (ack-tracking-feeder ["num"])
tracked (mk-tracked-topology
cluster
- (topology
- {"1" (spout-spec feeder)}
- {"2" (bolt-spec {"1" :shuffle} identity-bolt)
- "3" (bolt-spec {"1" :shuffle} identity-bolt)
- "4" (bolt-spec
- {"2" :shuffle
- "3" :shuffle}
+ (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails feeder)}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareShuffleGrouping)}
+ identity-bolt)
+ "3" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareShuffleGrouping)}
+ identity-bolt)
+ "4" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "2" nil)
+ (Thrift/prepareShuffleGrouping)
+ (Utils/getGlobalStreamId "3" nil)
+ (Thrift/prepareShuffleGrouping)}
(agg-bolt 4))}))]
(submit-local-topology (:nimbus cluster)
"test-acking2"
@@ -314,10 +372,13 @@
(let [feeder (feeder-spout ["field1"])
tracker (AckFailMapTracker.)
_ (.setAckFailDelegate feeder tracker)
- topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec feeder)
- "2" (thrift/mk-spout-spec open-tracked-spout)}
- {"3" (thrift/mk-bolt-spec {"1" :global} prepare-tracked-bolt)})]
+ topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails feeder)
+ "2" (Thrift/prepareSpoutDetails open-tracked-spout)}
+ {"3" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareGlobalGrouping)}
+ prepare-tracked-bolt)})]
(reset! bolt-prepared? false)
(reset! spout-opened? false)
@@ -343,10 +404,16 @@
(let [[feeder checker] (ack-tracking-feeder ["num"])
tracked (mk-tracked-topology
cluster
- (topology
- {"1" (spout-spec feeder)}
- {"2" (bolt-spec {"1" :shuffle} dup-anchor)
- "3" (bolt-spec {"2" :shuffle} ack-bolt)}))]
+ (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails feeder)}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareShuffleGrouping)}
+ dup-anchor)
+ "3" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "2" nil)
+ (Thrift/prepareShuffleGrouping)}
+ ack-bolt)}))]
(submit-local-topology (:nimbus cluster)
"test"
{}
@@ -362,36 +429,6 @@
(checker 3)
)))
-;; (defspout ConstantSpout ["val"] {:prepare false}
-;; [collector]
-;; (Time/sleep 100)
-;; (emit-spout! collector [1]))
-
-;; (def errored (atom false))
-;; (def restarted (atom false))
-
-;; (defbolt local-error-checker {} [tuple collector]
-;; (when-not @errored
-;; (reset! errored true)
-;; (println "erroring")
-;; (throw (RuntimeException.)))
-;; (when-not @restarted (println "restarted"))
-;; (reset! restarted true))
-
-;; (deftest test-no-halt-local-mode
-;; (with-simulated-time-local-cluster [cluster]
-;; (let [topology (topology
-;; {1 (spout-spec ConstantSpout)}
-;; {2 (bolt-spec {1 :shuffle} local-error-checker)
-;; })]
-;; (submit-local-topology (:nimbus cluster)
-;; "test"
-;; {}
-;; topology)
-;; (while (not @restarted)
-;; (advance-time-ms! 100))
-;; )))
-
(defspout IncSpout ["word"]
[conf context collector]
(let [state (atom 0)]
@@ -416,23 +453,6 @@
)
)))
-;; (deftest test-clojure-spout
-;; (with-local-cluster [cluster]
-;; (let [nimbus (:nimbus cluster)
-;; top (topology
-;; {1 (spout-spec IncSpout)}
-;; {}
-;; )]
-;; (submit-local-topology nimbus
-;; "spout-test"
-;; {TOPOLOGY-DEBUG true
-;; TOPOLOGY-MESSAGE-TIMEOUT-SECS 3}
-;; top)
-;; (Thread/sleep 10000)
-;; (.killTopology nimbus "spout-test")
-;; (Thread/sleep 10000)
-;; )))
-
(deftest test-kryo-decorators-config
(with-simulated-time-local-cluster [cluster
:daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
@@ -513,11 +533,13 @@
(deftest test-hooks
(with-simulated-time-local-cluster [cluster]
- (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])))
- }
- {"2" (bolt-spec {"1" :shuffle}
- hooks-bolt)
- })
+ (let [topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails
+ (TestPlannerSpout. (Fields. ["conf"])))}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareShuffleGrouping)}
+ hooks-bolt)})
results (complete-topology cluster
topology
:mock-sources {"1" [[1]
@@ -545,9 +567,12 @@
[feeder checker] (ack-tracking-feeder ["num"])
tracked (mk-tracked-topology
cluster
- (topology
- {"1" (spout-spec feeder)}
- {"2" (bolt-spec {"1" :shuffle} report-errors-bolt)}))
+ (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails feeder)}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareShuffleGrouping)}
+ report-errors-bolt)}))
_ (submit-local-topology (:nimbus cluster)
"test-errors"
{TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS 10
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
index e86e893..3b1a48b 100644
--- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
@@ -15,15 +15,19 @@
;; limitations under the License.
(ns integration.org.apache.storm.testing4j-test
(:use [clojure.test])
- (:use [org.apache.storm config clojure testing])
+ (:use [org.apache.storm config testing util])
+ (:use [org.apache.storm.internal clojure])
(:require [integration.org.apache.storm.integration-test :as it])
- (:require [org.apache.storm.thrift :as thrift])
- (:import [org.apache.storm Testing Config ILocalCluster])
+ (:require [org.apache.storm.internal.thrift :as thrift])
+ (:import [org.apache.storm Testing Config ILocalCluster]
+ [org.apache.storm.generated GlobalStreamId])
(:import [org.apache.storm.tuple Values Tuple])
(:import [org.apache.storm.utils Time Utils])
(:import [org.apache.storm.testing MkClusterParam TestJob MockedSources TestWordSpout
TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam
- AckFailMapTracker MkTupleParam]))
+ AckFailMapTracker MkTupleParam])
+ (:import [org.apache.storm.utils Utils])
+ (:import [org.apache.storm Thrift]))
(deftest test-with-simulated-time
(is (= false (Time/isSimulating)))
@@ -69,12 +73,20 @@
(Testing/withSimulatedTimeLocalCluster
(reify TestJob
(^void run [this ^ILocalCluster cluster]
- (let [topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
- {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
- "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
- "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
- })
+ (let [topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
+ {"2" (Thrift/prepareBoltDetails
+ {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+ (Thrift/prepareFieldsGrouping ["word"])}
+ (TestWordCounter.) (Integer. 4))
+ "3" (Thrift/prepareBoltDetails
+ {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+ (Thrift/prepareGlobalGrouping)}
+ (TestGlobalCount.))
+ "4" (Thrift/prepareBoltDetails
+ {(GlobalStreamId. "2" Utils/DEFAULT_STREAM_ID)
+ (Thrift/prepareGlobalGrouping)}
+ (TestAggregatesCounter.))})
mocked-sources (doto (MockedSources.)
(.addMockData "1" (into-array Values [(Values. (into-array ["nathan"]))
(Values. (into-array ["bob"]))
@@ -106,13 +118,21 @@
(let [[feeder checker] (it/ack-tracking-feeder ["num"])
tracked (Testing/mkTrackedTopology
cluster
- (topology
- {"1" (spout-spec feeder)}
- {"2" (bolt-spec {"1" :shuffle} it/identity-bolt)
- "3" (bolt-spec {"1" :shuffle} it/identity-bolt)
- "4" (bolt-spec
- {"2" :shuffle
- "3" :shuffle}
+ (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails feeder)}
+ {"2" (Thrift/prepareBoltDetails
+ {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+ (Thrift/prepareShuffleGrouping)}
+ it/identity-bolt)
+ "3" (Thrift/prepareBoltDetails
+ {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+ (Thrift/prepareShuffleGrouping)}
+ it/identity-bolt)
+ "4" (Thrift/prepareBoltDetails
+ {(GlobalStreamId. "2" Utils/DEFAULT_STREAM_ID)
+ (Thrift/prepareShuffleGrouping)
+ (GlobalStreamId. "3" Utils/DEFAULT_STREAM_ID)
+ (Thrift/prepareShuffleGrouping)}
(it/agg-bolt 4))}))]
(.submitTopology cluster
"test-acking2"
@@ -139,9 +159,12 @@
(let [feeder (feeder-spout ["field1"])
tracker (AckFailMapTracker.)
_ (.setAckFailDelegate feeder tracker)
- topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec feeder)}
- {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
+ topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails feeder)}
+ {"2" (Thrift/prepareBoltDetails
+ {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+ (Thrift/prepareGlobalGrouping)}
+ it/ack-every-other)})
storm-conf (doto (Config.)
(.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10))]
(.submitTopology cluster
@@ -170,9 +193,12 @@
(let [feeder (feeder-spout ["field1"])
tracker (AckFailMapTracker.)
_ (.setAckFailDelegate feeder tracker)
- topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec feeder)}
- {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
+ topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails feeder)}
+ {"2" (Thrift/prepareBoltDetails
+ {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+ (Thrift/prepareGlobalGrouping)}
+ it/ack-every-other)})
storm-conf (doto (Config.)
(.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10)
(.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))]
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/clojure_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/clojure_test.clj b/storm-core/test/clj/org/apache/storm/clojure_test.clj
index ccec825..13fdeb7 100644
--- a/storm-core/test/clj/org/apache/storm/clojure_test.clj
+++ b/storm-core/test/clj/org/apache/storm/clojure_test.clj
@@ -17,10 +17,12 @@
(:use [clojure test])
(:import [org.apache.storm.testing TestWordSpout TestPlannerSpout]
[org.apache.storm.tuple Fields])
- (:use [org.apache.storm testing clojure config])
+ (:use [org.apache.storm testing config])
+ (:use [org.apache.storm.internal clojure])
(:use [org.apache.storm.daemon common])
- (:require [org.apache.storm [thrift :as thrift]]))
-
+ (:require [org.apache.storm.internal [thrift :as thrift]])
+ (:import [org.apache.storm Thrift])
+ (:import [org.apache.storm.utils Utils]))
(defbolt lalala-bolt1 ["word"] [[val :as tuple] collector]
(let [ret (str val "lalala")]
@@ -56,15 +58,21 @@
(deftest test-clojure-bolt
(with-simulated-time-local-cluster [cluster :supervisors 4]
(let [nimbus (:nimbus cluster)
- topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. false))}
- {"2" (thrift/mk-bolt-spec {"1" :shuffle}
- lalala-bolt1)
- "3" (thrift/mk-bolt-spec {"1" :local-or-shuffle}
- lalala-bolt2)
- "4" (thrift/mk-bolt-spec {"1" :shuffle}
- (lalala-bolt3 "_nathan_"))}
- )
+ topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails (TestWordSpout. false))}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareShuffleGrouping)}
+ lalala-bolt1)
+ "3" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareLocalOrShuffleGrouping)}
+ lalala-bolt2)
+ "4" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareShuffleGrouping)}
+ (lalala-bolt3 "_nathan_"))}
+ )
results (complete-topology cluster
topology
:mock-sources {"1" [["david"]
@@ -91,11 +99,12 @@
(deftest test-map-emit
(with-simulated-time-local-cluster [cluster :supervisors 4]
- (let [topology (thrift/mk-topology
- {"words" (thrift/mk-spout-spec (TestWordSpout. false))}
- {"out" (thrift/mk-bolt-spec {"words" :shuffle}
- punctuator-bolt)}
- )
+ (let [topology (Thrift/buildTopology
+ {"words" (Thrift/prepareSpoutDetails (TestWordSpout. false))}
+ {"out" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "words" nil)
+ (Thrift/prepareShuffleGrouping)}
+ punctuator-bolt)})
results (complete-topology cluster
topology
:mock-sources {"words" [["foo"] ["bar"]]}
@@ -115,14 +124,19 @@
(deftest test-component-specific-config-clojure
(with-simulated-time-local-cluster [cluster]
- (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])) :conf {TOPOLOGY-MESSAGE-TIMEOUT-SECS 40})
- }
- {"2" (bolt-spec {"1" :shuffle}
- (conf-query-bolt {"fake.config" 1
- TOPOLOGY-MAX-TASK-PARALLELISM 2
- TOPOLOGY-MAX-SPOUT-PENDING 10})
- :conf {TOPOLOGY-MAX-SPOUT-PENDING 3})
- })
+ (let [topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails
+ (TestPlannerSpout. (Fields. ["conf"]))
+ nil
+ {TOPOLOGY-MESSAGE-TIMEOUT-SECS 40})}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareShuffleGrouping)}
+ (conf-query-bolt {"fake.config" 1
+ TOPOLOGY-MAX-TASK-PARALLELISM 2
+ TOPOLOGY-MAX-SPOUT-PENDING 10})
+ nil
+ {TOPOLOGY-MAX-SPOUT-PENDING 3})})
results (complete-topology cluster
topology
:topology-name "test123"
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index b146cb0..18e3a80 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -30,7 +30,8 @@
(:require [conjure.core])
(:use [conjure core])
(:use [clojure test])
- (:use [org.apache.storm cluster config util testing thrift log]))
+ (:use [org.apache.storm cluster config util testing log])
+ (:use [org.apache.storm.internal thrift]))
(defn mk-config [zk-port]
(merge (clojurify-structure (ConfigUtils/readStormConfig))
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/drpc_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/drpc_test.clj b/storm-core/test/clj/org/apache/storm/drpc_test.clj
index 3dcef7a..6024674 100644
--- a/storm-core/test/clj/org/apache/storm/drpc_test.clj
+++ b/storm-core/test/clj/org/apache/storm/drpc_test.clj
@@ -16,7 +16,8 @@
(ns org.apache.storm.drpc-test
(:use [clojure test])
(:import [org.apache.storm.drpc ReturnResults DRPCSpout
- LinearDRPCTopologyBuilder])
+ LinearDRPCTopologyBuilder]
+ [org.apache.storm.utils ConfigUtils Utils])
(:import [org.apache.storm.topology FailedException])
(:import [org.apache.storm.coordination CoordinatedBolt$FinishedCallback])
(:import [org.apache.storm LocalDRPC LocalCluster])
@@ -25,7 +26,9 @@
[org.apache.storm.utils.staticmocking ConfigUtilsInstaller])
(:import [org.apache.storm.generated DRPCExecutionException])
(:import [java.util.concurrent ConcurrentLinkedQueue])
- (:use [org.apache.storm config testing clojure])
+ (:import [org.apache.storm Thrift])
+ (:use [org.apache.storm config testing])
+ (:use [org.apache.storm.internal clojure])
(:use [org.apache.storm.daemon common drpc])
(:use [conjure core]))
@@ -40,12 +43,16 @@
(let [drpc (LocalDRPC.)
spout (DRPCSpout. "test" drpc)
cluster (LocalCluster.)
- topology (topology
- {"1" (spout-spec spout)}
- {"2" (bolt-spec {"1" :shuffle}
- exclamation-bolt)
- "3" (bolt-spec {"2" :shuffle}
- (ReturnResults.))})]
+ topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails spout)}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareShuffleGrouping)}
+ exclamation-bolt)
+ "3" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "2" nil)
+ (Thrift/prepareGlobalGrouping)}
+ (ReturnResults.))})]
(.submitTopology cluster "test" {} topology)
(is (= "aaa!!!" (.execute drpc "test" "aaa")))
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/grouping_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj
index f2a3f4b..61caf68 100644
--- a/storm-core/test/clj/org/apache/storm/grouping_test.clj
+++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj
@@ -18,9 +18,11 @@
(:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestWordBytesCounter NGrouping]
[org.apache.storm.generated JavaObject JavaObjectArg])
(:import [org.apache.storm.grouping LoadMapping])
- (:use [org.apache.storm testing clojure log config])
+ (:use [org.apache.storm testing log config])
+ (:use [org.apache.storm.internal clojure])
(:use [org.apache.storm.daemon common executor])
- (:require [org.apache.storm [thrift :as thrift]]))
+ (:import [org.apache.storm Thrift])
+ (:import [org.apache.storm.utils Utils]))
(deftest test-shuffle
(let [shuffle-fn (mk-shuffle-grouper [(int 1) (int 2)] {TOPOLOGY-DISABLE-LOADAWARE-MESSAGING true} nil "comp" "stream")
@@ -77,12 +79,13 @@
(with-simulated-time-local-cluster [cluster :supervisors 4]
(let [spout-phint 4
bolt-phint 6
- topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true)
- :parallelism-hint spout-phint)}
- {"2" (thrift/mk-bolt-spec {"1" ["word"]}
- (TestWordBytesCounter.)
- :parallelism-hint bolt-phint)
+ topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails
+ (TestWordSpout. true) (Integer. spout-phint))}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareFieldsGrouping ["word"])}
+ (TestWordBytesCounter.) (Integer. spout-phint))
})
results (complete-topology
cluster
@@ -101,12 +104,13 @@
(with-simulated-time-local-cluster [cluster :supervisors 4]
(let [spout-phint 4
bolt-phint 6
- topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true)
- :parallelism-hint spout-phint)}
- {"2" (thrift/mk-bolt-spec {"1" ["word"]}
- (TestWordBytesCounter.)
- :parallelism-hint bolt-phint)
+ topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails
+ (TestWordSpout. true) (Integer. spout-phint))}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareFieldsGrouping ["word"])}
+ (TestWordBytesCounter.) (Integer. bolt-phint))
})
results (complete-topology
cluster
@@ -127,15 +131,21 @@
(deftest test-custom-groupings
(with-simulated-time-local-cluster [cluster]
- (let [topology (topology
- {"1" (spout-spec (TestWordSpout. true))}
- {"2" (bolt-spec {"1" (NGrouping. 2)}
- id-bolt
- :p 4)
- "3" (bolt-spec {"1" (JavaObject. "org.apache.storm.testing.NGrouping"
- [(JavaObjectArg/int_arg 3)])}
- id-bolt
- :p 6)
+ (let [topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails
+ (TestWordSpout. true))}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareCustomStreamGrouping (NGrouping. (Integer. 2)))}
+ id-bolt
+ (Integer. 4))
+ "3" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareCustomJavaObjectGrouping
+ (JavaObject. "org.apache.storm.testing.NGrouping"
+ [(JavaObjectArg/int_arg 3)]))}
+ id-bolt
+ (Integer. 6))
})
results (complete-topology cluster
topology
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
index f75a8e3..7fffd34 100644
--- a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
@@ -15,10 +15,11 @@
;; limitations under the License.
(ns org.apache.storm.messaging.netty-integration-test
(:use [clojure test])
- (:import [org.apache.storm.messaging TransportFactory])
+ (:import [org.apache.storm.messaging TransportFactory]
+ [org.apache.storm Thrift])
(:import [org.apache.storm.testing TestWordSpout TestGlobalCount])
- (:use [org.apache.storm testing config])
- (:require [org.apache.storm [thrift :as thrift]]))
+ (:import [org.apache.storm.utils Utils])
+ (:use [org.apache.storm testing util config]))
(deftest test-integration
(with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710
@@ -31,10 +32,13 @@
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1}]
- (let [topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
- {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
- :parallelism-hint 6)})
+ (let [topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails
+ (TestWordSpout. true) (Integer. 4))}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareShuffleGrouping)}
+ (TestGlobalCount.) (Integer. 6))})
results (complete-topology cluster
topology
;; important for test that
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/messaging_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/messaging_test.clj b/storm-core/test/clj/org/apache/storm/messaging_test.clj
index e987688..402ea7f 100644
--- a/storm-core/test/clj/org/apache/storm/messaging_test.clj
+++ b/storm-core/test/clj/org/apache/storm/messaging_test.clj
@@ -18,7 +18,8 @@
(:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestEventLogSpout TestEventOrderCheckBolt])
(:use [org.apache.storm testing config])
(:use [org.apache.storm.daemon common])
- (:require [org.apache.storm [thrift :as thrift]]))
+ (:import [org.apache.storm Thrift])
+ (:import [org.apache.storm.utils Utils]))
(deftest test-local-transport
(doseq [transport-on? [false true]]
@@ -28,10 +29,13 @@
(if transport-on? true false)
STORM-MESSAGING-TRANSPORT
"org.apache.storm.messaging.netty.Context"}]
- (let [topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 2)}
- {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
- :parallelism-hint 6)
+ (let [topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails
+ (TestWordSpout. true) (Integer. 2))}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareShuffleGrouping)}
+ (TestGlobalCount.) (Integer. 6))
})
results (complete-topology cluster
topology
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/metrics_test.clj b/storm-core/test/clj/org/apache/storm/metrics_test.clj
index 9f051f6..c186288 100644
--- a/storm-core/test/clj/org/apache/storm/metrics_test.clj
+++ b/storm-core/test/clj/org/apache/storm/metrics_test.clj
@@ -25,10 +25,12 @@
(:import [org.apache.storm.metric.api.rpc CountShellMetric])
(:import [org.apache.storm.utils Utils])
- (:use [org.apache.storm testing clojure config])
+ (:use [org.apache.storm testing config])
+ (:use [org.apache.storm.internal clojure])
(:use [org.apache.storm.daemon common])
(:use [org.apache.storm.metric testing])
- (:require [org.apache.storm [thrift :as thrift]]))
+ (:import [org.apache.storm Thrift])
+ (:import [org.apache.storm.utils Utils]))
(defbolt acking-bolt {} {:prepare true}
[conf context collector]
@@ -105,9 +107,12 @@
"storm.zookeeper.session.timeout" 60000
}]
(let [feeder (feeder-spout ["field1"])
- topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec feeder)}
- {"2" (thrift/mk-bolt-spec {"1" :global} count-acks)})]
+ topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails feeder)}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareGlobalGrouping)}
+ count-acks)})]
(submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
(.feed feeder ["a"] 1)
@@ -133,9 +138,12 @@
"storm.zookeeper.session.timeout" 60000
}]
(let [feeder (feeder-spout ["field1"])
- topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec feeder)}
- {"2" (thrift/mk-bolt-spec {"1" :all} count-acks :p 1 :conf {TOPOLOGY-TASKS 2})})]
+ topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails feeder)}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareAllGrouping)}
+ count-acks (Integer. 1) {TOPOLOGY-TASKS 2})})]
(submit-local-topology (:nimbus cluster) "metrics-tester-with-multitasks" {} topology)
(.feed feeder ["a"] 1)
@@ -154,10 +162,9 @@
(assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
(defn mk-shell-bolt-with-metrics-spec
- [inputs command & kwargs]
- (let [command (into-array String command)]
- (apply thrift/mk-bolt-spec inputs
- (PythonShellMetricsBolt. command) kwargs)))
+ [inputs command file]
+ (Thrift/prepareBoltDetails inputs
+ (PythonShellMetricsBolt. command file)))
(deftest test-custom-metric-with-multilang-py
(with-simulated-time-local-cluster
@@ -167,9 +174,12 @@
"storm.zookeeper.session.timeout" 60000
}]
(let [feeder (feeder-spout ["field1"])
- topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec feeder)}
- {"2" (mk-shell-bolt-with-metrics-spec {"1" :global} ["python" "tester_bolt_metrics.py"])})]
+ topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails feeder)}
+ {"2" (mk-shell-bolt-with-metrics-spec
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareGlobalGrouping)}
+ "python" "tester_bolt_metrics.py")})]
(submit-local-topology (:nimbus cluster) "shell-metrics-tester" {} topology)
(.feed feeder ["a"] 1)
@@ -189,9 +199,8 @@
)))
(defn mk-shell-spout-with-metrics-spec
- [command & kwargs]
- (let [command (into-array String command)]
- (apply thrift/mk-spout-spec (PythonShellMetricsSpout. command) kwargs)))
+ [command file]
+ (Thrift/prepareSpoutDetails (PythonShellMetricsSpout. command file)))
(deftest test-custom-metric-with-spout-multilang-py
(with-simulated-time-local-cluster
@@ -199,9 +208,12 @@
[{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
"storm.zookeeper.connection.timeout" 30000
"storm.zookeeper.session.timeout" 60000}]
- (let [topology (thrift/mk-topology
- {"1" (mk-shell-spout-with-metrics-spec ["python" "tester_spout_metrics.py"])}
- {"2" (thrift/mk-bolt-spec {"1" :all} count-acks)})]
+ (let [topology (Thrift/buildTopology
+ {"1" (mk-shell-spout-with-metrics-spec "python" "tester_spout_metrics.py")}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareAllGrouping)}
+ count-acks)})]
(submit-local-topology (:nimbus cluster) "shell-spout-metrics-tester" {} topology)
(advance-cluster-time cluster 7)
@@ -216,9 +228,12 @@
TOPOLOGY-STATS-SAMPLE-RATE 1.0
TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
(let [feeder (feeder-spout ["field1"])
- topology (thrift/mk-topology
- {"myspout" (thrift/mk-spout-spec feeder)}
- {"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} acking-bolt)})]
+ topology (Thrift/buildTopology
+ {"myspout" (Thrift/prepareSpoutDetails feeder)}
+ {"mybolt" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "myspout" nil)
+ (Thrift/prepareShuffleGrouping)}
+ acking-bolt)})]
(submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
(.feed feeder ["a"] 1)
@@ -255,9 +270,12 @@
(let [feeder (feeder-spout ["field1"])
tracker (AckFailMapTracker.)
_ (.setAckFailDelegate feeder tracker)
- topology (thrift/mk-topology
- {"myspout" (thrift/mk-spout-spec feeder)}
- {"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} ack-every-other)})]
+ topology (Thrift/buildTopology
+ {"myspout" (Thrift/prepareSpoutDetails feeder)}
+ {"mybolt" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "myspout" nil)
+ (Thrift/prepareShuffleGrouping)}
+ ack-every-other)})]
(submit-local-topology (:nimbus cluster)
"metrics-tester"
{}
@@ -307,9 +325,12 @@
(let [feeder (feeder-spout ["field1"])
tracker (AckFailMapTracker.)
_ (.setAckFailDelegate feeder tracker)
- topology (thrift/mk-topology
- {"myspout" (thrift/mk-spout-spec feeder)}
- {"mybolt" (thrift/mk-bolt-spec {"myspout" :global} ack-every-other)})]
+ topology (Thrift/buildTopology
+ {"myspout" (Thrift/prepareSpoutDetails feeder)}
+ {"mybolt" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "myspout" nil)
+ (Thrift/prepareGlobalGrouping)}
+ ack-every-other)})]
(submit-local-topology (:nimbus cluster)
"timeout-tester"
{TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
@@ -341,8 +362,8 @@
[{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
(let [feeder (feeder-spout ["field1"])
- topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec feeder)}
+ topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails feeder)}
{})]
(submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)