You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/12 08:29:10 UTC
[02/10] storm git commit: STORM-2447: add in storm local to avoid
having server on worker classpath
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
index 947b64b..0d77914 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
@@ -17,9 +17,12 @@
*/
package org.apache.storm.starter.trident;
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.trident.Stream;
@@ -29,12 +32,6 @@ import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-import java.io.Serializable;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
/**
* This class demonstrates different usages of
@@ -94,16 +91,8 @@ public class TridentMinMaxOfVehiclesTopology {
StormTopology topology = buildVehiclesTopology();
Config conf = new Config();
conf.setMaxSpoutPending(20);
- if (args.length == 0) {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("vehicles-topology", conf, topology);) {
- Utils.sleep(60 * 1000);
- }
- System.exit(0);
- } else {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar("vehicles-topology", conf, topology);
- }
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar("vehicles-topology", conf, topology);
}
static class SpeedComparator implements Comparator<TridentTuple>, Serializable {
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
index 6533a4e..7294091 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
@@ -17,14 +17,16 @@
*/
package org.apache.storm.starter.trident;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.LocalDRPC;
+import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
@@ -37,117 +39,115 @@ import org.apache.storm.trident.state.State;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.state.map.ReadOnlyMapState;
import org.apache.storm.trident.tuple.TridentTuple;
-
-import java.util.*;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.DRPCClient;
public class TridentReach {
- public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
- put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
- put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
- put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
- }};
-
- public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
- put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
- put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
- put("tim", Arrays.asList("alex"));
- put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
- put("adam", Arrays.asList("david", "carissa"));
- put("mike", Arrays.asList("john", "bob"));
- put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
- }};
-
- public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> {
- public static class Factory implements StateFactory {
- Map _map;
-
- public Factory(Map map) {
- _map = map;
- }
-
- @Override
- public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- return new StaticSingleKeyMapState(_map);
- }
+ public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
+ put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
+ put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
+ put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
+ }};
+
+ public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
+ put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
+ put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
+ put("tim", Arrays.asList("alex"));
+ put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
+ put("adam", Arrays.asList("david", "carissa"));
+ put("mike", Arrays.asList("john", "bob"));
+ put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
+ }};
+
+ public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> {
+ public static class Factory implements StateFactory {
+ Map _map;
+
+ public Factory(Map map) {
+ _map = map;
+ }
+
+ @Override
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+ return new StaticSingleKeyMapState(_map);
+ }
- }
-
- Map _map;
-
- public StaticSingleKeyMapState(Map map) {
- _map = map;
- }
+ }
+ Map _map;
- @Override
- public List<Object> multiGet(List<List<Object>> keys) {
- List<Object> ret = new ArrayList();
- for (List<Object> key : keys) {
- Object singleKey = key.get(0);
- ret.add(_map.get(singleKey));
- }
- return ret;
- }
+ public StaticSingleKeyMapState(Map map) {
+ _map = map;
+ }
- }
- public static class One implements CombinerAggregator<Integer> {
- @Override
- public Integer init(TridentTuple tuple) {
- return 1;
- }
+ @Override
+ public List<Object> multiGet(List<List<Object>> keys) {
+ List<Object> ret = new ArrayList();
+ for (List<Object> key : keys) {
+ Object singleKey = key.get(0);
+ ret.add(_map.get(singleKey));
+ }
+ return ret;
+ }
- @Override
- public Integer combine(Integer val1, Integer val2) {
- return 1;
}
- @Override
- public Integer zero() {
- return 1;
- }
- }
+ public static class One implements CombinerAggregator<Integer> {
+ @Override
+ public Integer init(TridentTuple tuple) {
+ return 1;
+ }
- public static class ExpandList extends BaseFunction {
+ @Override
+ public Integer combine(Integer val1, Integer val2) {
+ return 1;
+ }
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- List l = (List) tuple.getValue(0);
- if (l != null) {
- for (Object o : l) {
- collector.emit(new Values(o));
+ @Override
+ public Integer zero() {
+ return 1;
}
- }
}
- }
+ public static class ExpandList extends BaseFunction {
- public static StormTopology buildTopology(LocalDRPC drpc) {
- TridentTopology topology = new TridentTopology();
- TridentState urlToTweeters = topology.newStaticState(new StaticSingleKeyMapState.Factory(TWEETERS_DB));
- TridentState tweetersToFollowers = topology.newStaticState(new StaticSingleKeyMapState.Factory(FOLLOWERS_DB));
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ List l = (List) tuple.getValue(0);
+ if (l != null) {
+ for (Object o : l) {
+ collector.emit(new Values(o));
+ }
+ }
+ }
+ }
- topology.newDRPCStream("reach", drpc).stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields(
- "tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")).shuffle().stateQuery(
- tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")).each(new Fields("followers"),
- new ExpandList(), new Fields("follower")).groupBy(new Fields("follower")).aggregate(new One(), new Fields(
- "one")).aggregate(new Fields("one"), new Sum(), new Fields("reach"));
- return topology.build();
- }
+ public static StormTopology buildTopology() {
+ TridentTopology topology = new TridentTopology();
+ TridentState urlToTweeters = topology.newStaticState(new StaticSingleKeyMapState.Factory(TWEETERS_DB));
+ TridentState tweetersToFollowers = topology.newStaticState(new StaticSingleKeyMapState.Factory(FOLLOWERS_DB));
- public static void main(String[] args) throws Exception {
- Config conf = new Config();
- try (LocalDRPC drpc = new LocalDRPC();
- LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("reach", conf, buildTopology(drpc));) {
+ topology.newDRPCStream("reach").stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields(
+ "tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")).shuffle().stateQuery(
+ tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")).each(new Fields("followers"),
+ new ExpandList(), new Fields("follower")).groupBy(new Fields("follower")).aggregate(new One(), new Fields(
+ "one")).aggregate(new Fields("one"), new Sum(), new Fields("reach"));
+ return topology.build();
+ }
- Thread.sleep(2000);
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ StormSubmitter.submitTopology("reach", conf, buildTopology());
+ try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
+ Thread.sleep(2000);
- System.out.println("REACH: " + drpc.execute("reach", "aaa"));
- System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));
- System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5"));
+ System.out.println("REACH: " + drpc.execute("reach", "aaa"));
+ System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));
+ System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5"));
+ }
}
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
index c43b4b0..2da29bf 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
@@ -19,11 +19,8 @@
package org.apache.storm.starter.trident;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.Consumer;
@@ -33,17 +30,13 @@ import org.apache.storm.trident.testing.Split;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
import org.apache.storm.trident.windowing.WindowsStoreFactory;
-import org.apache.storm.trident.windowing.config.*;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.WindowConfig;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
/**
* Sample application of trident windowing which uses inmemory store for storing tuples in window.
*/
@@ -74,25 +67,12 @@ public class TridentWindowingInmemoryStoreTopology {
public static void main(String[] args) throws Exception {
Config conf = new Config();
WindowsStoreFactory mapState = new InMemoryWindowsStoreFactory();
-
- if (args.length == 0) {
- List<? extends WindowConfig> list = Arrays.asList(
- SlidingCountWindow.of(1000, 100)
- ,TumblingCountWindow.of(1000)
- ,SlidingDurationWindow.of(new BaseWindowedBolt.Duration(6, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
- ,TumblingDurationWindow.of(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
- );
-
- for (WindowConfig windowConfig : list) {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(mapState, windowConfig));) {
- Utils.sleep(60 * 1000);
- }
- }
- System.exit(0);
- } else {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(mapState, SlidingCountWindow.of(1000, 100)));
+ String topoName = "wordCounter";
+ if (args.length > 0) {
+ topoName = args[0];
}
+
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, conf, buildTopology(mapState, SlidingCountWindow.of(1000, 100)));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
index 553c26f..0f86e1f 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
@@ -18,9 +18,6 @@
package org.apache.storm.starter.trident;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.trident.TridentState;
@@ -35,54 +32,53 @@ import org.apache.storm.trident.testing.MemoryMapState;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.DRPCClient;
public class TridentWordCount {
- public static class Split extends BaseFunction {
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- String sentence = tuple.getString(0);
- for (String word : sentence.split(" ")) {
- collector.emit(new Values(word));
- }
+ public static class Split extends BaseFunction {
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ String sentence = tuple.getString(0);
+ for (String word : sentence.split(" ")) {
+ collector.emit(new Values(word));
+ }
+ }
}
- }
- public static StormTopology buildTopology(LocalDRPC drpc) {
- FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
- new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
- new Values("how many apples can you eat"), new Values("to be or not to be the person"));
- spout.setCycle(true);
+ public static StormTopology buildTopology() {
+ FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+ new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
+ new Values("how many apples can you eat"), new Values("to be or not to be the person"));
+ spout.setCycle(true);
- TridentTopology topology = new TridentTopology();
- TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
- new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
- new Count(), new Fields("count")).parallelismHint(16);
+ TridentTopology topology = new TridentTopology();
+ TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
+ new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
+ new Count(), new Fields("count")).parallelismHint(16);
- topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word"))
- .groupBy(new Fields("word"))
- .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
- .each(new Fields("count"), new FilterNull())
- .project(new Fields("word", "count"));
- return topology.build();
- }
+ topology.newDRPCStream("words").each(new Fields("args"), new Split(), new Fields("word"))
+ .groupBy(new Fields("word"))
+ .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
+ .each(new Fields("count"), new FilterNull())
+ .project(new Fields("word", "count"));
+ return topology.build();
+ }
- public static void main(String[] args) throws Exception {
- Config conf = new Config();
- conf.setMaxSpoutPending(20);
- if (args.length == 0) {
- try (LocalDRPC drpc = new LocalDRPC();
- LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(drpc));) {
- for (int i = 0; i < 100; i++) {
- System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
- Thread.sleep(1000);
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ conf.setMaxSpoutPending(20);
+ String topoName = "wordCounter";
+ if (args.length > 0) {
+ topoName = args[0];
+ }
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, conf, buildTopology());
+ try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
+ for (int i = 0; i < 10; i++) {
+ System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
+ Thread.sleep(1000);
+ }
}
- }
- }
- else {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
}
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml
index 7fe6071..deaefb8 100755
--- a/external/storm-eventhubs/pom.xml
+++ b/external/storm-eventhubs/pom.xml
@@ -65,13 +65,6 @@
<scope>${provided.scope}</scope>
</dependency>
<dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-server</artifactId>
- <version>${project.version}</version>
- <type>jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
index ae15634..4709b5e 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
@@ -17,20 +17,17 @@
*******************************************************************************/
package org.apache.storm.eventhubs.samples;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
+import java.io.FileReader;
+import java.util.Properties;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
import org.apache.storm.eventhubs.samples.bolt.GlobalCountBolt;
import org.apache.storm.eventhubs.samples.bolt.PartialCountBolt;
import org.apache.storm.eventhubs.spout.EventHubSpout;
import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-
-import java.io.FileReader;
-import java.util.Properties;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
/**
* The basic scenario topology that uses EventHubSpout with PartialCountBolt
@@ -124,23 +121,17 @@ public class EventCount {
}
protected void submitTopology(String[] args, StormTopology topology) throws Exception {
- Config config = new Config();
- config.setDebug(false);
- //Enable metrics
- config.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1);
+ Config config = new Config();
+ config.setDebug(false);
+ //Enable metrics
+ config.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1);
-
- if (args != null && args.length > 0) {
- config.setNumWorkers(numWorkers);
- StormSubmitter.submitTopology(args[0], config, topology);
- } else {
- config.setMaxTaskParallelism(2);
-
- try (LocalCluster localCluster = new LocalCluster();
- LocalTopology topo = localCluster.submitTopology("test", config, topology);) {
- Thread.sleep(5000000);
+ String topoName = "test";
+ if (args.length > 0) {
+ topoName = args[0];
}
- }
+ config.setNumWorkers(numWorkers);
+ StormSubmitter.submitTopology(topoName, config, topology);
}
protected void runScenario(String[] args) throws Exception{
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java b/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
index 222bf2d..1d38ada 100644
--- a/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
@@ -17,22 +17,33 @@
*/
package org.apache.storm.flux;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
+import org.apache.storm.flux.model.BoltDef;
+import org.apache.storm.flux.model.ExecutionContext;
+import org.apache.storm.flux.model.SpoutDef;
+import org.apache.storm.flux.model.StreamDef;
+import org.apache.storm.flux.model.TopologyDef;
+import org.apache.storm.flux.parser.FluxParser;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.SubmitOptions;
import org.apache.storm.generated.TopologyInitialStatus;
-import org.apache.storm.utils.Utils;
-import org.apache.commons.cli.*;
-import org.apache.storm.flux.model.*;
-import org.apache.storm.flux.parser.FluxParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.*;
-
/**
* Flux entry point.
*
@@ -40,10 +51,6 @@ import java.io.*;
public class Flux {
private static final Logger LOG = LoggerFactory.getLogger(Flux.class);
- private static final Long DEFAULT_LOCAL_SLEEP_TIME = 60000l;
-
- private static final Long DEFAULT_ZK_PORT = 2181l;
-
private static final String OPTION_LOCAL = "local";
private static final String OPTION_REMOTE = "remote";
private static final String OPTION_RESOURCE = "resource";
@@ -169,45 +176,8 @@ public class Flux {
}
StormSubmitter.submitTopology(topologyName, conf, topology, submitOptions, null);
} else {
- LOG.info("Running in local mode...");
-
- String sleepStr = cmd.getOptionValue(OPTION_SLEEP);
- Long sleepTime = DEFAULT_LOCAL_SLEEP_TIME;
- if (sleepStr != null) {
- sleepTime = Long.parseLong(sleepStr);
- }
- LOG.debug("Sleep time: {}", sleepTime);
- LocalCluster cluster = null;
-
- // in-process or external zookeeper
- if(cmd.hasOption(OPTION_ZOOKEEPER)){
- String zkStr = cmd.getOptionValue(OPTION_ZOOKEEPER);
- LOG.info("Using ZooKeeper at '{}' instead of in-process one.", zkStr);
- long zkPort = DEFAULT_ZK_PORT;
- String zkHost = null;
- if(zkStr.contains(":")){
- String[] hostPort = zkStr.split(":");
- zkHost = hostPort[0];
- zkPort = hostPort.length > 1 ? Long.parseLong(hostPort[1]) : DEFAULT_ZK_PORT;
-
- } else {
- zkHost = zkStr;
- }
- // the following constructor is only available in 0.9.3 and later
- try {
- cluster = new LocalCluster(zkHost, zkPort);
- } catch (NoSuchMethodError e){
- LOG.error("The --zookeeper option can only be used with Apache Storm 0.9.3 and later.");
- System.exit(1);
- }
- } else {
- cluster = new LocalCluster();
- }
- try (LocalTopology topo = cluster.submitTopology(topologyName, conf, topology)) {
- Utils.sleep(sleepTime);
- } finally {
- cluster.shutdown();
- }
+ LOG.error("To run in local mode run with 'storm local' instead of 'storm jar'");
+ return;
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/flux/pom.xml
----------------------------------------------------------------------
diff --git a/flux/pom.xml b/flux/pom.xml
index d1f330f..4e04fa8 100644
--- a/flux/pom.xml
+++ b/flux/pom.xml
@@ -44,16 +44,9 @@
</modules>
<dependencies>
- <!--
- Since Flux uses LocalCluster to provide the feature: running topology locally...
- User should notice that configured topology will be run with 'storm-client' dependencies
- when adding Flux into topology dependency.
- If user want to run topology with 'storm-server' dependencies, user can just include
- 'storm-server' as 'compile' scope, and exclude 'storm-client' from 'storm-server'.
- -->
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-server</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<scope>${provided.scope}</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1281171..91017f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -320,11 +320,13 @@
<module>storm-buildtools/maven-shade-clojure-transformer</module>
<module>storm-buildtools/storm-maven-plugins</module>
<module>storm-client</module>
+ <module>storm-client-misc</module>
<module>storm-server</module>
<module>storm-core</module>
<module>storm-webapp</module>
<module>storm-rename-hack</module>
<module>storm-clojure</module>
+ <module>storm-clojure-test</module>
<module>storm-submit-tools</module>
<module>flux</module>
<module>sql</module>
@@ -368,7 +370,6 @@
<module>examples/storm-pmml-examples</module>
<module>examples/storm-jms-examples</module>
<module>examples/storm-perf</module>
- <module>storm-client-misc</module>
</modules>
<dependencies>
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
index 5abb04a..f359722 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
@@ -30,6 +30,7 @@ import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.DRPCClient;
import org.apache.storm.utils.ExtendedThreadPoolExecutor;
import org.apache.storm.utils.ServiceRegistry;
import java.util.ArrayList;
@@ -59,8 +60,8 @@ public class DRPCSpout extends BaseRichSpout {
List<DRPCInvocationsClient> _clients = new ArrayList<>();
transient LinkedList<Future<Void>> _futures = null;
transient ExecutorService _backround = null;
- String _function;
- String _local_drpc_id = null;
+ final String _function;
+ final String _local_drpc_id;
private static class DRPCMessageId {
String id;
@@ -75,6 +76,11 @@ public class DRPCSpout extends BaseRichSpout {
public DRPCSpout(String function) {
_function = function;
+ if (DRPCClient.isLocalOverride()) {
+ _local_drpc_id = DRPCClient.getOverrideServiceId();
+ } else {
+ _local_drpc_id = null;
+ }
}
public DRPCSpout(String function, ILocalDRPC drpc) {
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
index 9a207b4..d84c5fa 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
@@ -56,7 +56,7 @@ public class ThriftClient implements AutoCloseable {
port = type.getPort(storm_conf);
}
- if (port<=0) {
+ if (port<=0 && !type.isFake()) {
throw new IllegalArgumentException("invalid port: "+port);
}
@@ -66,7 +66,9 @@ public class ThriftClient implements AutoCloseable {
_conf = storm_conf;
_type = type;
_asUser = asUser;
- reconnect();
+ if (!type.isFake()) {
+ reconnect();
+ }
}
public synchronized TTransport transport() {
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
index 0108d73..19d8dd6 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
@@ -31,7 +31,8 @@ public enum ThriftConnectionType {
DRPC(Config.DRPC_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_PORT, Config.DRPC_QUEUE_SIZE,
Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null),
DRPC_INVOCATIONS(Config.DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_INVOCATIONS_PORT, null,
- Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null);
+ Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null),
+ LOCAL_FAKE;
private final String _transConf;
private final String _portConf;
@@ -39,17 +40,32 @@ public enum ThriftConnectionType {
private final String _threadsConf;
private final String _buffConf;
private final String _socketTimeoutConf;
+ private final boolean _isFake;
+ ThriftConnectionType() {
+ this(null, null, null, null, null, null, true);
+ }
+
+ ThriftConnectionType(String transConf, String portConf, String qConf,
+ String threadsConf, String buffConf, String socketTimeoutConf) {
+ this(transConf, portConf, qConf, threadsConf, buffConf, socketTimeoutConf, false);
+ }
+
ThriftConnectionType(String transConf, String portConf, String qConf,
- String threadsConf, String buffConf, String socketTimeoutConf) {
+ String threadsConf, String buffConf, String socketTimeoutConf, boolean isFake) {
_transConf = transConf;
_portConf = portConf;
_qConf = qConf;
_threadsConf = threadsConf;
_buffConf = buffConf;
_socketTimeoutConf = socketTimeoutConf;
+ _isFake = isFake;
}
+ public boolean isFake() {
+ return _isFake;
+ }
+
public String getTransportPlugin(Map conf) {
String ret = (String)conf.get(_transConf);
if (ret == null) {
@@ -59,6 +75,9 @@ public enum ThriftConnectionType {
}
public int getPort(Map conf) {
+ if (_isFake) {
+ return -1;
+ }
return ObjectReader.getInt(conf.get(_portConf));
}
@@ -70,10 +89,16 @@ public enum ThriftConnectionType {
}
public int getNumThreads(Map conf) {
+ if (_isFake) {
+ return 1;
+ }
return ObjectReader.getInt(conf.get(_threadsConf));
}
public int getMaxBufferSize(Map conf) {
+ if (_isFake) {
+ return 1;
+ }
return ObjectReader.getInt(conf.get(_buffConf));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-client/src/jvm/org/apache/storm/topology/ConfigurableTopology.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/ConfigurableTopology.java b/storm-client/src/jvm/org/apache/storm/topology/ConfigurableTopology.java
new file mode 100644
index 0000000..3641e07
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/topology/ConfigurableTopology.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.utils.Utils;
+import org.yaml.snakeyaml.Yaml;
+
+/**
+ * Extensions of this class takes a
+ * reference to one or more configuration files. The main() method should call
+ * ConfigurableTopology.start() and it must instantiate a TopologyBuilder in the
+ * run() method.
+ *
+ * <pre>
+ * {
+ * public class MyTopology extends ConfigurableTopology {
+ *
+ * public static void main(String[] args) throws Exception {
+ * ConfigurableTopology.start(new MyTopology(), args);
+ * }
+ *
+ * @Override
+ * protected int run(String[] args) {
+ * TopologyBuilder builder = new TopologyBuilder();
+ *
+ * // build topology as usual
+ *
+ * return submit("crawl", conf, builder);
+ * }
+ * }
+ * </pre>
+ **/
+public abstract class ConfigurableTopology {
+
+ protected Config conf = new Config();
+
+ public static void start(ConfigurableTopology topology, String args[]) {
+ String[] remainingArgs = topology.parse(args);
+ try {
+ topology.run(remainingArgs);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected Config getConf() {
+ return conf;
+ }
+
+ protected abstract int run(String args[]) throws Exception;
+
+ /** Submits the topology with the name taken from the configuration **/
+ protected int submit(Config conf, TopologyBuilder builder) {
+ String name = (String) Utils.get(conf, Config.TOPOLOGY_NAME, null);
+ if (StringUtils.isBlank(name))
+ throw new RuntimeException(
+ "No value found for " + Config.TOPOLOGY_NAME);
+ return submit(name, conf, builder);
+ }
+
+ /** Submits the topology under a specific name **/
+ protected int submit(String name, Config conf, TopologyBuilder builder) {
+ try {
+ StormSubmitter.submitTopology(name, conf,
+ builder.createTopology());
+ } catch (Exception e) {
+ e.printStackTrace();
+ return -1;
+ }
+ return 0;
+ }
+
+ private String[] parse(String args[]) {
+
+ List<String> newArgs = new ArrayList<>();
+ Collections.addAll(newArgs, args);
+
+ Iterator<String> iter = newArgs.iterator();
+ while (iter.hasNext()) {
+ String param = iter.next();
+ if (param.equals("-conf")) {
+ if (!iter.hasNext()) {
+ throw new RuntimeException("Conf file not specified");
+ }
+ iter.remove();
+ String resource = iter.next();
+ try {
+ loadConf(resource, conf);
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException("File not found : " + resource);
+ }
+ iter.remove();
+ }
+ }
+
+ return newArgs.toArray(new String[newArgs.size()]);
+ }
+
+ public static Config loadConf(String resource, Config conf)
+ throws FileNotFoundException {
+ Yaml yaml = new Yaml();
+ Map<String, Object> ret = (Map<String, Object>) yaml.load(new InputStreamReader(
+ new FileInputStream(resource), Charset.defaultCharset()));
+ if (ret == null) {
+ ret = new HashMap<>();
+ }
+ // If the config consists of a single key 'config', its values are used
+ // instead. This means that the same config files can be used with Flux
+ // and the ConfigurableTopology.
+ else {
+ if (ret.size() == 1) {
+ Object confNode = ret.get("config");
+ if (confNode != null && confNode instanceof Map) {
+ ret = (Map<String, Object>) ret;
+ }
+ }
+ }
+ conf.putAll(ret);
+ return conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java b/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java
index 7f83789..1338231 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java
@@ -17,34 +17,90 @@
*/
package org.apache.storm.utils;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.ILocalDRPC;
+import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.DRPCExecutionException;
import org.apache.storm.generated.DistributedRPC;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransport;
import org.apache.storm.security.auth.ThriftClient;
import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
-import java.util.Map;
-
public class DRPCClient extends ThriftClient implements DistributedRPC.Iface {
- private TTransport conn;
- private DistributedRPC.Client client;
+ private static volatile ILocalDRPC _localOverrideClient = null;
+
+ public static class LocalOverride implements AutoCloseable {
+ public LocalOverride(ILocalDRPC client) {
+ _localOverrideClient = client;
+ }
+
+ @Override
+ public void close() throws Exception {
+ _localOverrideClient = null;
+ }
+ }
+
+ /**
+ * @return true of new clients will be overridden to connect to a local cluster
+ * and not the configured remote cluster.
+ */
+ public static boolean isLocalOverride() {
+ return _localOverrideClient != null;
+ }
+
+ /**
+ * @return the service ID of the local override DRPC instance
+ */
+ public static String getOverrideServiceId() {
+ return _localOverrideClient.getServiceId();
+ }
+
+ public static DRPCClient getConfiguredClient(Map conf) throws TTransportException {
+ DistributedRPC.Iface override = _localOverrideClient;
+ if (override != null) {
+ return new DRPCClient(override);
+ }
+
+ List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS);
+ Collections.shuffle(servers);
+ String host = servers.get(0);
+ int port = Integer.parseInt(conf.get(Config.DRPC_PORT).toString());
+ return new DRPCClient(conf, host, port);
+ }
+
+ private DistributedRPC.Iface client;
private String host;
private int port;
- private Integer timeout;
+ private DRPCClient(DistributedRPC.Iface override) {
+ super(new HashMap<>(), ThriftConnectionType.LOCAL_FAKE,
+ "localhost", 1234, null, null);
+ this.host = "localhost";
+ this.port = 1234;
+ this.client = override;
+ }
+
public DRPCClient(Map conf, String host, int port) throws TTransportException {
this(conf, host, port, null);
_retryForever = true;
}
public DRPCClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
- super(conf, ThriftConnectionType.DRPC, host, port, timeout, null);
+ super(conf, _localOverrideClient != null ? ThriftConnectionType.LOCAL_FAKE : ThriftConnectionType.DRPC,
+ host, port, timeout, null);
this.host = host;
this.port = port;
- this.client = new DistributedRPC.Client(_protocol);
+ if (_localOverrideClient != null) {
+ this.client = _localOverrideClient;
+ } else {
+ this.client = new DistributedRPC.Client(_protocol);
+ }
_retryForever = true;
}
@@ -60,7 +116,7 @@ public class DRPCClient extends ThriftClient implements DistributedRPC.Iface {
return client.execute(func, args);
}
- public DistributedRPC.Client getClient() {
+ public DistributedRPC.Iface getClient() {
return client;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java b/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
index 6699ba8..cb2e1af 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
@@ -29,15 +29,41 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.Principal;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class NimbusClient extends ThriftClient {
- private Nimbus.Client _client;
+ private static volatile Nimbus.Iface _localOverrideClient = null;
+
+ public static final class LocalOverride implements AutoCloseable {
+ public LocalOverride(Nimbus.Iface client) {
+ _localOverrideClient = client;
+ }
+
+ @Override
+ public void close() throws Exception {
+ _localOverrideClient = null;
+ }
+ }
+
+ /**
+ * @return true of new clients will be overridden to connect to a local cluster
+ * and not the configured remote cluster.
+ */
+ public static boolean isLocalOverride() {
+ return _localOverrideClient != null;
+ }
+
+ private Nimbus.Iface _client;
+ /**
+ * Indicates if this is a special client that is overwritten for local mode.
+ */
+ public final boolean _isLocal;
private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class);
public interface WithNimbus {
- public void run(Nimbus.Client client) throws Exception;
+ public void run(Nimbus.Iface client) throws Exception;
}
public static void withConfiguredClient(WithNimbus cb) throws Exception {
@@ -58,6 +84,10 @@ public class NimbusClient extends ThriftClient {
}
public static NimbusClient getConfiguredClientAs(Map conf, String asUser) {
+ Nimbus.Iface override = _localOverrideClient;
+ if (override != null) {
+ return new NimbusClient(override);
+ }
if (conf.containsKey(Config.STORM_DO_AS_USER)) {
if (asUser != null && !asUser.isEmpty()) {
LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence."
@@ -121,19 +151,28 @@ public class NimbusClient extends ThriftClient {
public NimbusClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, null);
_client = new Nimbus.Client(_protocol);
+ _isLocal = false;
}
public NimbusClient(Map conf, String host, Integer port, Integer timeout, String asUser) throws TTransportException {
super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, asUser);
_client = new Nimbus.Client(_protocol);
+ _isLocal = false;
}
public NimbusClient(Map conf, String host) throws TTransportException {
super(conf, ThriftConnectionType.NIMBUS, host, null, null, null);
_client = new Nimbus.Client(_protocol);
+ _isLocal = false;
+ }
+
+ private NimbusClient(Nimbus.Iface client) {
+ super(new HashMap<>(), ThriftConnectionType.LOCAL_FAKE, "localhost", null, null, null);
+ _client = client;
+ _isLocal = true;
}
- public Nimbus.Client getClient() {
+ public Nimbus.Iface getClient() {
return _client;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 57101dc..771dc70 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -982,7 +982,7 @@ public class Utils {
}
}
- public static String getTopologyId(String name, Nimbus.Client client) {
+ public static String getTopologyId(String name, Nimbus.Iface client) {
try {
ClusterSummary summary = client.getClusterInfo();
for(TopologySummary s : summary.get_topologies()) {
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-clojure-test/pom.xml
----------------------------------------------------------------------
diff --git a/storm-clojure-test/pom.xml b/storm-clojure-test/pom.xml
new file mode 100644
index 0000000..743b72b
--- /dev/null
+++ b/storm-clojure-test/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>storm-clojure-test</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-clojure</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>com.theoryinpractise</groupId>
+ <artifactId>clojure-maven-plugin</artifactId>
+ <extensions>true</extensions>
+ <configuration>
+ <sourceDirectories>
+ <sourceDirectory>src/clj</sourceDirectory>
+ </sourceDirectories>
+ </configuration>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-clojure-test/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-clojure-test/src/clj/org/apache/storm/testing.clj b/storm-clojure-test/src/clj/org/apache/storm/testing.clj
new file mode 100644
index 0000000..1e374ed
--- /dev/null
+++ b/storm-clojure-test/src/clj/org/apache/storm/testing.clj
@@ -0,0 +1,270 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.testing
+ (:import [org.apache.storm LocalCluster$Builder])
+ (:import [java.util.function UnaryOperator])
+ (:import [org.apache.storm.utils Time Time$SimulatedTime RegisteredGlobalState Utils])
+ (:import [org.apache.storm.testing InProcessZookeeper MkTupleParam TestJob MkClusterParam
+ TrackedTopology CompleteTopologyParam MockedSources])
+ (:import [org.apache.storm Thrift Testing Testing$Condition])
+ (:import [org.apache.storm.testing MockLeaderElector])
+ (:import [org.json.simple JSONValue])
+ (:use [org.apache.storm util config log])
+ (:use [org.apache.storm thrift]))
+
+(defnk add-supervisor
+ [cluster-map :ports 2 :conf {} :id nil]
+ (let [local-cluster (:local-cluster cluster-map)]
+ (.addSupervisor local-cluster ports conf id)))
+
+(defnk mock-leader-elector [:is-leader true :leader-name "test-host" :leader-port 9999]
+ (MockLeaderElector. is-leader leader-name leader-port))
+
+(defn local-cluster-state [local-cluster]
+ {:nimbus (.getNimbus local-cluster)
+ :daemon-conf (.getDaemonConf local-cluster)
+ :storm-cluster-state (.getClusterState local-cluster)
+ :local-cluster local-cluster})
+
+(defnk mk-mocked-nimbus
+ [:daemon-conf {} :inimbus nil :blob-store nil :cluster-state nil
+ :leader-elector nil :group-mapper nil :nimbus-daemon false :nimbus-wrapper nil]
+ (let [builder (doto (LocalCluster$Builder.)
+ (.withDaemonConf daemon-conf)
+ (.withINimbus inimbus)
+ (.withBlobStore blob-store)
+ (.withClusterState cluster-state)
+ (.withLeaderElector leader-elector)
+ (.withGroupMapper group-mapper)
+ (.withNimbusDaemon nimbus-daemon)
+ (.withNimbusWrapper (when nimbus-wrapper (reify UnaryOperator (apply [this nimbus] (nimbus-wrapper nimbus))))))
+ local-cluster (.build builder)]
+ (local-cluster-state local-cluster)))
+
+(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :group-mapper nil :supervisor-slot-port-min 1024 :nimbus-daemon false]
+ (let [builder (doto (LocalCluster$Builder.)
+ (.withSupervisors supervisors)
+ (.withPortsPerSupervisor ports-per-supervisor)
+ (.withDaemonConf daemon-conf)
+ (.withINimbus inimbus)
+ (.withGroupMapper group-mapper)
+ (.withSupervisorSlotPortMin supervisor-slot-port-min)
+ (.withNimbusDaemon nimbus-daemon))
+ local-cluster (.build builder)]
+ (local-cluster-state local-cluster)))
+
+(defn get-supervisor [cluster-map supervisor-id]
+ (let [local-cluster (:local-cluster cluster-map)]
+ (.getSupervisor local-cluster supervisor-id)))
+
+(defn kill-supervisor [cluster-map supervisor-id]
+ (let [local-cluster (:local-cluster cluster-map)]
+ (.killSupervisor local-cluster supervisor-id)))
+
+(defn kill-local-storm-cluster [cluster-map]
+ (let [local-cluster (:local-cluster cluster-map)]
+ (.close local-cluster)))
+
+(defmacro while-timeout [timeout-ms condition & body]
+ `(Testing/whileTimeout ~timeout-ms
+ (reify Testing$Condition (exec [this] ~condition))
+ (fn [] ~@body)))
+
+(defn wait-for-condition
+ ([apredicate]
+ (wait-for-condition Testing/TEST_TIMEOUT_MS apredicate))
+ ([timeout-ms apredicate]
+ (while-timeout timeout-ms (not (apredicate))
+ (Time/sleep 100))))
+
+(defn wait-until-cluster-waiting
+ "Wait until the cluster is idle. Should be used with time simulation."
+ ([cluster-map]
+ (let [local-cluster (:local-cluster cluster-map)]
+ (.waitForIdle local-cluster)))
+ ([cluster-map timeout-ms]
+ (let [local-cluster (:local-cluster cluster-map)]
+ (.waitForIdle local-cluster timeout-ms))))
+
+(defn advance-cluster-time
+ ([cluster-map secs increment-secs]
+ (let [local-cluster (:local-cluster cluster-map)]
+ (.advanceClusterTime local-cluster secs increment-secs)))
+ ([cluster-map secs]
+ (let [local-cluster (:local-cluster cluster-map)]
+ (.advanceClusterTime local-cluster secs))))
+
+(defmacro with-mocked-nimbus
+ [[nimbus-sym & args] & body]
+ `(let [~nimbus-sym (mk-mocked-nimbus ~@args)]
+ (try
+ ~@body
+ (catch Throwable t#
+ (log-error t# "Error in cluster")
+ (throw t#))
+ (finally
+ (let [keep-waiting?# (atom true)
+ f# (future (while @keep-waiting?# (simulate-wait ~nimbus-sym)))]
+ (kill-local-storm-cluster ~nimbus-sym)
+ (reset! keep-waiting?# false)
+ @f#)))))
+
+(defmacro with-local-cluster
+ [[cluster-sym & args] & body]
+ `(let [~cluster-sym (mk-local-storm-cluster ~@args)]
+ (try
+ ~@body
+ (catch Throwable t#
+ (log-error t# "Error in cluster")
+ (throw t#))
+ (finally
+ (let [keep-waiting?# (atom true)
+ f# (future (while @keep-waiting?# (simulate-wait ~cluster-sym)))]
+ (kill-local-storm-cluster ~cluster-sym)
+ (reset! keep-waiting?# false)
+ @f#)))))
+
+(defmacro with-simulated-time-local-cluster
+ [& args]
+ `(with-open [_# (Time$SimulatedTime.)]
+ (with-local-cluster ~@args)))
+
+(defmacro with-inprocess-zookeeper
+ [port-sym & body]
+ `(with-open [zks# (InProcessZookeeper. )]
+ (let [~port-sym (.getPort zks#)]
+ ~@body)))
+
+(defn submit-local-topology
+ [nimbus storm-name conf topology]
+ (when-not (Utils/isValidConf conf)
+ (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
+ (.submitTopology nimbus storm-name nil (JSONValue/toJSONString conf) topology))
+
+(defn submit-local-topology-with-opts
+ [nimbus storm-name conf topology submit-opts]
+ (when-not (Utils/isValidConf conf)
+ (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
+ (.submitTopologyWithOpts nimbus storm-name nil (JSONValue/toJSONString conf) topology submit-opts))
+
+(defn simulate-wait
+ [cluster-map]
+ (Testing/simulateWait (:local-cluster cluster-map)))
+
+(defn spout-objects [spec-map]
+ (for [[_ spout-spec] spec-map]
+ (-> spout-spec
+ .get_spout_object
+ (Thrift/deserializeComponentObject))))
+
+(defn capture-topology
+ [topology]
+ (let [cap-topo (Testing/captureTopology topology)]
+ {:topology (.topology cap-topo)
+ :capturer (.capturer cap-topo)}))
+
+(defnk complete-topology
+ [cluster-map topology
+ :mock-sources {}
+ :storm-conf {}
+ :cleanup-state true
+ :topology-name nil
+ :timeout-ms Testing/TEST_TIMEOUT_MS]
+ (Testing/completeTopology (:local-cluster cluster-map) topology,
+ (doto (CompleteTopologyParam.)
+ (.setStormConf storm-conf)
+ (.setTopologyName topology-name)
+ (.setTimeoutMs timeout-ms)
+ (.setMockedSources (MockedSources. mock-sources))
+ (.setCleanupState cleanup-state))))
+
+(defn read-tuples
+ ([results component-id stream-id]
+ (Testing/readTuples results component-id stream-id))
+ ([results component-id]
+ (Testing/readTuples results component-id )))
+
+(defn ms=
+ [a b]
+ (Testing/multiseteq a b))
+
+(def TRACKER-BOLT-ID "+++tracker-bolt")
+
+;; TODO: should override system-topology! and wrap everything there
+(defn mk-tracked-topology
+ ([tracked-cluster topology]
+ (let [tt (TrackedTopology. topology (:local-cluster tracked-cluster))]
+ {:topology (.getTopology tt)
+ :tracked-topo tt})))
+
+(defn increment-global!
+ [id key amt]
+ (-> (RegisteredGlobalState/getState id)
+ (get key)
+ (.addAndGet amt)))
+
+(defn global-amt
+ [id key]
+ (-> (RegisteredGlobalState/getState id)
+ (get key)
+ .get))
+
+(defnk mkClusterParam
+ [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :nimbus-daemon false]
+ ;;TODO do we need to support inimbus?, group-mapper, or supervisor-slot-port-min
+ (doto (MkClusterParam. )
+ (.setDaemonConf daemon-conf)
+ (.setNimbusDaemon nimbus-daemon)
+ (.setPortsPerSupervisor (int ports-per-supervisor))
+ (.setSupervisors (int supervisors))))
+
+(defmacro with-tracked-cluster
+ [[cluster-sym & cluster-args] & body]
+ `(Testing/withTrackedCluster
+ (mkClusterParam ~@cluster-args)
+ (reify TestJob
+ (run [this# lc#]
+ (let [~cluster-sym (local-cluster-state lc#)]
+ ~@body)))))
+
+(defn tracked-wait
+ "Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
+ ([tracked-map]
+ (Testing/trackedWait (:tracked-topo tracked-map)))
+ ([tracked-map amt]
+ (Testing/trackedWait (:tracked-topo tracked-map) (int amt)))
+ ([tracked-map amt timeout-ms]
+ (Testing/trackedWait (:tracked-topo tracked-map) (int amt) (int timeout-ms))))
+
+(defnk test-tuple
+ [values
+ :stream Utils/DEFAULT_STREAM_ID
+ :component "component"
+ :fields nil]
+ (Testing/testTuple
+ values
+ (doto (MkTupleParam. )
+ (.setStream stream)
+ (.setComponent component)
+ (.setFieldsList fields))))
+
+(defmacro with-timeout
+ [millis unit & body]
+ `(let [f# (future ~@body)]
+ (try
+ (.get f# ~millis ~unit)
+ (finally (future-cancel f#)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-clojure/pom.xml
----------------------------------------------------------------------
diff --git a/storm-clojure/pom.xml b/storm-clojure/pom.xml
index 1d50df0..cef8fd0 100644
--- a/storm-clojure/pom.xml
+++ b/storm-clojure/pom.xml
@@ -35,17 +35,8 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
+ <groupId>org.clojure</groupId>
+ <artifactId>tools.logging</artifactId>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
@@ -54,7 +45,7 @@
</dependency>
</dependencies>
- <build>
+ <build>
<plugins>
<plugin>
<groupId>com.theoryinpractise</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-clojure/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/config.clj b/storm-clojure/src/clj/org/apache/storm/config.clj
new file mode 100644
index 0000000..bfe47ed
--- /dev/null
+++ b/storm-clojure/src/clj/org/apache/storm/config.clj
@@ -0,0 +1,28 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.config
+ (:import [org.apache.storm Config]))
+
+(defn- clojure-config-name [name]
+ (.replace (.toUpperCase name) "_" "-"))
+
+; define clojure constants for every configuration parameter
+(doseq [f (seq (.getFields Config))]
+ (let [name (.getName f)
+ new-name (clojure-config-name name)]
+ (eval
+ `(def ~(symbol new-name) (. Config ~(symbol name))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-clojure/src/clj/org/apache/storm/log.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/log.clj b/storm-clojure/src/clj/org/apache/storm/log.clj
new file mode 100644
index 0000000..7a006ef
--- /dev/null
+++ b/storm-clojure/src/clj/org/apache/storm/log.clj
@@ -0,0 +1,34 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.log
+ (:require [clojure.tools.logging :as log]))
+
+(defmacro log-message
+ [& args]
+ `(log/info (str ~@args)))
+
+(defmacro log-error
+ [e & args]
+ `(log/log :error ~e (str ~@args)))
+
+(defmacro log-debug
+ [& args]
+ `(log/debug (str ~@args)))
+
+(defmacro log-warn
+ [& args]
+ `(log/warn (str ~@args)))
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-clojure/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/testing.clj b/storm-clojure/src/clj/org/apache/storm/testing.clj
deleted file mode 100644
index 1e374ed..0000000
--- a/storm-clojure/src/clj/org/apache/storm/testing.clj
+++ /dev/null
@@ -1,270 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.testing
- (:import [org.apache.storm LocalCluster$Builder])
- (:import [java.util.function UnaryOperator])
- (:import [org.apache.storm.utils Time Time$SimulatedTime RegisteredGlobalState Utils])
- (:import [org.apache.storm.testing InProcessZookeeper MkTupleParam TestJob MkClusterParam
- TrackedTopology CompleteTopologyParam MockedSources])
- (:import [org.apache.storm Thrift Testing Testing$Condition])
- (:import [org.apache.storm.testing MockLeaderElector])
- (:import [org.json.simple JSONValue])
- (:use [org.apache.storm util config log])
- (:use [org.apache.storm thrift]))
-
-(defnk add-supervisor
- [cluster-map :ports 2 :conf {} :id nil]
- (let [local-cluster (:local-cluster cluster-map)]
- (.addSupervisor local-cluster ports conf id)))
-
-(defnk mock-leader-elector [:is-leader true :leader-name "test-host" :leader-port 9999]
- (MockLeaderElector. is-leader leader-name leader-port))
-
-(defn local-cluster-state [local-cluster]
- {:nimbus (.getNimbus local-cluster)
- :daemon-conf (.getDaemonConf local-cluster)
- :storm-cluster-state (.getClusterState local-cluster)
- :local-cluster local-cluster})
-
-(defnk mk-mocked-nimbus
- [:daemon-conf {} :inimbus nil :blob-store nil :cluster-state nil
- :leader-elector nil :group-mapper nil :nimbus-daemon false :nimbus-wrapper nil]
- (let [builder (doto (LocalCluster$Builder.)
- (.withDaemonConf daemon-conf)
- (.withINimbus inimbus)
- (.withBlobStore blob-store)
- (.withClusterState cluster-state)
- (.withLeaderElector leader-elector)
- (.withGroupMapper group-mapper)
- (.withNimbusDaemon nimbus-daemon)
- (.withNimbusWrapper (when nimbus-wrapper (reify UnaryOperator (apply [this nimbus] (nimbus-wrapper nimbus))))))
- local-cluster (.build builder)]
- (local-cluster-state local-cluster)))
-
-(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :group-mapper nil :supervisor-slot-port-min 1024 :nimbus-daemon false]
- (let [builder (doto (LocalCluster$Builder.)
- (.withSupervisors supervisors)
- (.withPortsPerSupervisor ports-per-supervisor)
- (.withDaemonConf daemon-conf)
- (.withINimbus inimbus)
- (.withGroupMapper group-mapper)
- (.withSupervisorSlotPortMin supervisor-slot-port-min)
- (.withNimbusDaemon nimbus-daemon))
- local-cluster (.build builder)]
- (local-cluster-state local-cluster)))
-
-(defn get-supervisor [cluster-map supervisor-id]
- (let [local-cluster (:local-cluster cluster-map)]
- (.getSupervisor local-cluster supervisor-id)))
-
-(defn kill-supervisor [cluster-map supervisor-id]
- (let [local-cluster (:local-cluster cluster-map)]
- (.killSupervisor local-cluster supervisor-id)))
-
-(defn kill-local-storm-cluster [cluster-map]
- (let [local-cluster (:local-cluster cluster-map)]
- (.close local-cluster)))
-
-(defmacro while-timeout [timeout-ms condition & body]
- `(Testing/whileTimeout ~timeout-ms
- (reify Testing$Condition (exec [this] ~condition))
- (fn [] ~@body)))
-
-(defn wait-for-condition
- ([apredicate]
- (wait-for-condition Testing/TEST_TIMEOUT_MS apredicate))
- ([timeout-ms apredicate]
- (while-timeout timeout-ms (not (apredicate))
- (Time/sleep 100))))
-
-(defn wait-until-cluster-waiting
- "Wait until the cluster is idle. Should be used with time simulation."
- ([cluster-map]
- (let [local-cluster (:local-cluster cluster-map)]
- (.waitForIdle local-cluster)))
- ([cluster-map timeout-ms]
- (let [local-cluster (:local-cluster cluster-map)]
- (.waitForIdle local-cluster timeout-ms))))
-
-(defn advance-cluster-time
- ([cluster-map secs increment-secs]
- (let [local-cluster (:local-cluster cluster-map)]
- (.advanceClusterTime local-cluster secs increment-secs)))
- ([cluster-map secs]
- (let [local-cluster (:local-cluster cluster-map)]
- (.advanceClusterTime local-cluster secs))))
-
-(defmacro with-mocked-nimbus
- [[nimbus-sym & args] & body]
- `(let [~nimbus-sym (mk-mocked-nimbus ~@args)]
- (try
- ~@body
- (catch Throwable t#
- (log-error t# "Error in cluster")
- (throw t#))
- (finally
- (let [keep-waiting?# (atom true)
- f# (future (while @keep-waiting?# (simulate-wait ~nimbus-sym)))]
- (kill-local-storm-cluster ~nimbus-sym)
- (reset! keep-waiting?# false)
- @f#)))))
-
-(defmacro with-local-cluster
- [[cluster-sym & args] & body]
- `(let [~cluster-sym (mk-local-storm-cluster ~@args)]
- (try
- ~@body
- (catch Throwable t#
- (log-error t# "Error in cluster")
- (throw t#))
- (finally
- (let [keep-waiting?# (atom true)
- f# (future (while @keep-waiting?# (simulate-wait ~cluster-sym)))]
- (kill-local-storm-cluster ~cluster-sym)
- (reset! keep-waiting?# false)
- @f#)))))
-
-(defmacro with-simulated-time-local-cluster
- [& args]
- `(with-open [_# (Time$SimulatedTime.)]
- (with-local-cluster ~@args)))
-
-(defmacro with-inprocess-zookeeper
- [port-sym & body]
- `(with-open [zks# (InProcessZookeeper. )]
- (let [~port-sym (.getPort zks#)]
- ~@body)))
-
-(defn submit-local-topology
- [nimbus storm-name conf topology]
- (when-not (Utils/isValidConf conf)
- (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
- (.submitTopology nimbus storm-name nil (JSONValue/toJSONString conf) topology))
-
-(defn submit-local-topology-with-opts
- [nimbus storm-name conf topology submit-opts]
- (when-not (Utils/isValidConf conf)
- (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
- (.submitTopologyWithOpts nimbus storm-name nil (JSONValue/toJSONString conf) topology submit-opts))
-
-(defn simulate-wait
- [cluster-map]
- (Testing/simulateWait (:local-cluster cluster-map)))
-
-(defn spout-objects [spec-map]
- (for [[_ spout-spec] spec-map]
- (-> spout-spec
- .get_spout_object
- (Thrift/deserializeComponentObject))))
-
-(defn capture-topology
- [topology]
- (let [cap-topo (Testing/captureTopology topology)]
- {:topology (.topology cap-topo)
- :capturer (.capturer cap-topo)}))
-
-(defnk complete-topology
- [cluster-map topology
- :mock-sources {}
- :storm-conf {}
- :cleanup-state true
- :topology-name nil
- :timeout-ms Testing/TEST_TIMEOUT_MS]
- (Testing/completeTopology (:local-cluster cluster-map) topology,
- (doto (CompleteTopologyParam.)
- (.setStormConf storm-conf)
- (.setTopologyName topology-name)
- (.setTimeoutMs timeout-ms)
- (.setMockedSources (MockedSources. mock-sources))
- (.setCleanupState cleanup-state))))
-
-(defn read-tuples
- ([results component-id stream-id]
- (Testing/readTuples results component-id stream-id))
- ([results component-id]
- (Testing/readTuples results component-id )))
-
-(defn ms=
- [a b]
- (Testing/multiseteq a b))
-
-(def TRACKER-BOLT-ID "+++tracker-bolt")
-
-;; TODO: should override system-topology! and wrap everything there
-(defn mk-tracked-topology
- ([tracked-cluster topology]
- (let [tt (TrackedTopology. topology (:local-cluster tracked-cluster))]
- {:topology (.getTopology tt)
- :tracked-topo tt})))
-
-(defn increment-global!
- [id key amt]
- (-> (RegisteredGlobalState/getState id)
- (get key)
- (.addAndGet amt)))
-
-(defn global-amt
- [id key]
- (-> (RegisteredGlobalState/getState id)
- (get key)
- .get))
-
-(defnk mkClusterParam
- [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :nimbus-daemon false]
- ;;TODO do we need to support inimbus?, group-mapper, or supervisor-slot-port-min
- (doto (MkClusterParam. )
- (.setDaemonConf daemon-conf)
- (.setNimbusDaemon nimbus-daemon)
- (.setPortsPerSupervisor (int ports-per-supervisor))
- (.setSupervisors (int supervisors))))
-
-(defmacro with-tracked-cluster
- [[cluster-sym & cluster-args] & body]
- `(Testing/withTrackedCluster
- (mkClusterParam ~@cluster-args)
- (reify TestJob
- (run [this# lc#]
- (let [~cluster-sym (local-cluster-state lc#)]
- ~@body)))))
-
-(defn tracked-wait
- "Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
- ([tracked-map]
- (Testing/trackedWait (:tracked-topo tracked-map)))
- ([tracked-map amt]
- (Testing/trackedWait (:tracked-topo tracked-map) (int amt)))
- ([tracked-map amt timeout-ms]
- (Testing/trackedWait (:tracked-topo tracked-map) (int amt) (int timeout-ms))))
-
-(defnk test-tuple
- [values
- :stream Utils/DEFAULT_STREAM_ID
- :component "component"
- :fields nil]
- (Testing/testTuple
- values
- (doto (MkTupleParam. )
- (.setStream stream)
- (.setComponent component)
- (.setFieldsList fields))))
-
-(defmacro with-timeout
- [millis unit & body]
- `(let [f# (future ~@body)]
- (try
- (.get f# ~millis ~unit)
- (finally (future-cancel f#)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-clojure/src/clj/org/apache/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/util.clj b/storm-clojure/src/clj/org/apache/storm/util.clj
new file mode 100644
index 0000000..9ad1f10
--- /dev/null
+++ b/storm-clojure/src/clj/org/apache/storm/util.clj
@@ -0,0 +1,134 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.util
+ (:import [java.util Map List HashMap])
+ (:import [org.apache.storm.generated ErrorInfo])
+ (:import [org.apache.storm.utils Utils])
+ (:import [java.util List Set])
+ (:use [clojure walk])
+ (:use [org.apache.storm log]))
+
+;; name-with-attributes by Konrad Hinsen:
+(defn name-with-attributes
+ "To be used in macro definitions.
+ Handles optional docstrings and attribute maps for a name to be defined
+ in a list of macro arguments. If the first macro argument is a string,
+ it is added as a docstring to name and removed from the macro argument
+ list. If afterwards the first macro argument is a map, its entries are
+ added to the name's metadata map and the map is removed from the
+ macro argument list. The return value is a vector containing the name
+ with its extended metadata map and the list of unprocessed macro
+ arguments."
+ [name macro-args]
+ (let [[docstring macro-args] (if (string? (first macro-args))
+ [(first macro-args) (next macro-args)]
+ [nil macro-args])
+ [attr macro-args] (if (map? (first macro-args))
+ [(first macro-args) (next macro-args)]
+ [{} macro-args])
+ attr (if docstring
+ (assoc attr :doc docstring)
+ attr)
+ attr (if (meta name)
+ (conj (meta name) attr)
+ attr)]
+ [(with-meta name attr) macro-args]))
+
+(defmacro defnk
+ "Define a function accepting keyword arguments. Symbols up to the first
+ keyword in the parameter list are taken as positional arguments. Then
+ an alternating sequence of keywords and defaults values is expected. The
+ values of the keyword arguments are available in the function body by
+ virtue of the symbol corresponding to the keyword (cf. :keys destructuring).
+ defnk accepts an optional docstring as well as an optional metadata map."
+ [fn-name & fn-tail]
+ (let [[fn-name [args & body]] (name-with-attributes fn-name fn-tail)
+ [pos kw-vals] (split-with symbol? args)
+ syms (map #(-> % name symbol) (take-nth 2 kw-vals))
+ values (take-nth 2 (rest kw-vals))
+ sym-vals (apply hash-map (interleave syms values))
+ de-map {:keys (vec syms) :or sym-vals}]
+ `(defn ~fn-name
+ [~@pos & options#]
+ (let [~de-map (apply hash-map options#)]
+ ~@body))))
+
+(defmacro thrown-cause?
+ [klass & body]
+ `(try
+ ~@body
+ false
+ (catch Throwable t#
+ (let [tc# (Utils/exceptionCauseIsInstanceOf ~klass t#)]
+ (if (not tc#) (log-error t# "Exception did not match " ~klass))
+ tc#))))
+
+(defn clojurify-structure
+ [s]
+ (if s
+ (prewalk (fn [x]
+ (cond (instance? Map x) (into {} x)
+ (instance? List x) (vec x)
+ (instance? Set x) (into #{} x)
+ ;; (Boolean. false) does not evaluate to false in an if.
+ ;; This fixes that.
+ (instance? Boolean x) (boolean x)
+ true x))
+ s)))
+; move this func form convert.clj due to cyclic load dependency
+(defn clojurify-error [^ErrorInfo error]
+ (if error
+ {
+ :error (.get_error error)
+ :time-secs (.get_error_time_secs error)
+ :host (.get_host error)
+ :port (.get_port error)
+ }
+ ))
+
+;TODO: We're keeping this function around until all the code using it is properly tranlated to java
+;TODO: by properly having the for loop IN THE JAVA FUNCTION that originally used this function.
+(defn map-val
+ [afn amap]
+ (into {}
+ (for [[k v] amap]
+ [k (afn v)])))
+
+;TODO: We're keeping this function around until all the code using it is properly tranlated to java
+;TODO: by properly having the for loop IN THE JAVA FUNCTION that originally used this function.
+(defn filter-key
+ [afn amap]
+ (into {} (filter (fn [[k v]] (afn k)) amap)))
+
+;TODO: Once all the other clojure functions (100+ locations) are translated to java, this function becomes moot.
+(def not-nil? (complement nil?))
+
+(defmacro dofor [& body]
+ `(doall (for ~@body)))
+
+(defmacro -<>
+ ([x] x)
+ ([x form] (if (seq? form)
+ (with-meta
+ (let [[begin [_ & end]] (split-with #(not= % '<>) form)]
+ (concat begin [x] end))
+ (meta form))
+ (list form x)))
+ ([x form & more] `(-<> (-<> ~x ~form) ~@more)))
+
+(defn hashmap-to-persistent [^HashMap m]
+ (zipmap (.keySet m) (.values m)))
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index b5f7086..0014aa5 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -377,7 +377,7 @@
</dependencies>
<build>
<sourceDirectory>src/jvm</sourceDirectory>
- <testSourceDirectory>test/jvm</testSourceDirectory>
+ <testSourceDirectory>test/jvm</testSourceDirectory>
<resources>
<resource>
<directory>../</directory>
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-core/src/jvm/org/apache/storm/command/Activate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/Activate.java b/storm-core/src/jvm/org/apache/storm/command/Activate.java
index 6a64bf6..45d4a99 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Activate.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Activate.java
@@ -31,7 +31,7 @@ public class Activate {
NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
@Override
- public void run(Nimbus.Client nimbus) throws Exception {
+ public void run(Nimbus.Iface nimbus) throws Exception {
nimbus.activate(name);
LOG.info("Activated topology: {}", name);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-core/src/jvm/org/apache/storm/command/Deactivate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/Deactivate.java b/storm-core/src/jvm/org/apache/storm/command/Deactivate.java
index 6b9dd11..4eafb83 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Deactivate.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Deactivate.java
@@ -31,7 +31,7 @@ public class Deactivate {
NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
@Override
- public void run(Nimbus.Client nimbus) throws Exception {
+ public void run(Nimbus.Iface nimbus) throws Exception {
nimbus.deactivate(name);
LOG.info("Deactivated topology: {}", name);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-core/src/jvm/org/apache/storm/command/GetErrors.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/GetErrors.java b/storm-core/src/jvm/org/apache/storm/command/GetErrors.java
index ee15446..05c0ee4 100644
--- a/storm-core/src/jvm/org/apache/storm/command/GetErrors.java
+++ b/storm-core/src/jvm/org/apache/storm/command/GetErrors.java
@@ -40,7 +40,7 @@ public class GetErrors {
NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
@Override
- public void run(Nimbus.Client client) throws Exception {
+ public void run(Nimbus.Iface client) throws Exception {
GetInfoOptions opts = new GetInfoOptions();
opts.set_num_err_choice(NumErrorsChoice.ONE);
String topologyId = Utils.getTopologyId(name, client);