You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/12/03 15:50:07 UTC
[07/10] storm git commit: STORM-1281: LocalCluster,
testing4j and testing.clj to java
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
index 431b9d8..45014fc 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
@@ -19,6 +19,7 @@ package org.apache.storm.starter;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.ShellSpout;
import org.apache.storm.task.ShellBolt;
@@ -110,12 +111,10 @@ public class WordCountTopologyNode {
else {
conf.setMaxTaskParallelism(3);
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("word-count", conf, builder.createTopology());
-
- Thread.sleep(10000);
-
- cluster.shutdown();
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("word-count", conf, builder.createTopology());) {
+ Thread.sleep(10000);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
index ba18f7c..70a23b8 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
@@ -20,6 +20,7 @@ package org.apache.storm.starter.trident;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
@@ -77,12 +78,10 @@ public class TridentHBaseWindowingStoreTopology {
HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
if (args.length == 0) {
- LocalCluster cluster = new LocalCluster();
- String topologyName = "wordCounterWithWindowing";
- cluster.submitTopology(topologyName, conf, buildTopology(windowStoreFactory));
- Utils.sleep(120 * 1000);
- cluster.killTopology(topologyName);
- cluster.shutdown();
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("wordCounterWithWindowing", conf, buildTopology(windowStoreFactory));) {
+ Utils.sleep(120 * 1000);
+ }
System.exit(0);
} else {
conf.setNumWorkers(3);
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
index c3661a2..5ddace8 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
@@ -19,6 +19,7 @@ package org.apache.storm.starter.trident;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
@@ -108,14 +109,14 @@ public class TridentMapExample {
Config conf = new Config();
conf.setMaxSpoutPending(20);
if (args.length == 0) {
- LocalDRPC drpc = new LocalDRPC();
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
- for (int i = 0; i < 100; i++) {
- System.out.println("DRPC RESULT: " + drpc.execute("words", "CAT THE DOG JUMPED"));
- Thread.sleep(1000);
+ try (LocalDRPC drpc = new LocalDRPC();
+ LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(drpc));) {
+ for (int i = 0; i < 100; i++) {
+ System.out.println("DRPC RESULT: " + drpc.execute("words", "CAT THE DOG JUMPED"));
+ Thread.sleep(1000);
+ }
}
- cluster.shutdown();
} else {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
index d985436..e5a775b 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
@@ -19,6 +19,7 @@ package org.apache.storm.starter.trident;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.starter.spout.RandomNumberGeneratorSpout;
@@ -113,10 +114,10 @@ public class TridentMinMaxOfDevicesTopology {
Config conf = new Config();
conf.setMaxSpoutPending(20);
if (args.length == 0) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("devices-topology", conf, topology);
- Utils.sleep(60 * 1000);
- cluster.shutdown();
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("devices-topology", conf, topology);) {
+ Utils.sleep(60 * 1000);
+ }
System.exit(0);
} else {
conf.setNumWorkers(3);
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
index 192b198..947b64b 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
@@ -19,6 +19,7 @@ package org.apache.storm.starter.trident;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.trident.Stream;
@@ -94,10 +95,10 @@ public class TridentMinMaxOfVehiclesTopology {
Config conf = new Config();
conf.setMaxSpoutPending(20);
if (args.length == 0) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("vehicles-topology", conf, topology);
- Utils.sleep(60 * 1000);
- cluster.shutdown();
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("vehicles-topology", conf, topology);) {
+ Utils.sleep(60 * 1000);
+ }
System.exit(0);
} else {
conf.setNumWorkers(3);
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
index 056b2b6..6533a4e 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
@@ -19,6 +19,7 @@ package org.apache.storm.starter.trident;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.LocalDRPC;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.task.IMetricsContext;
@@ -136,21 +137,17 @@ public class TridentReach {
}
public static void main(String[] args) throws Exception {
- LocalDRPC drpc = new LocalDRPC();
Config conf = new Config();
- LocalCluster cluster = new LocalCluster();
+ try (LocalDRPC drpc = new LocalDRPC();
+ LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("reach", conf, buildTopology(drpc));) {
- cluster.submitTopology("reach", conf, buildTopology(drpc));
+ Thread.sleep(2000);
- Thread.sleep(2000);
-
- System.out.println("REACH: " + drpc.execute("reach", "aaa"));
- System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));
- System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5"));
-
-
- cluster.shutdown();
- drpc.shutdown();
+ System.out.println("REACH: " + drpc.execute("reach", "aaa"));
+ System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));
+ System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5"));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
index 5aec01d..c43b4b0 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
@@ -20,6 +20,7 @@ package org.apache.storm.starter.trident;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.base.BaseWindowedBolt;
@@ -83,10 +84,10 @@ public class TridentWindowingInmemoryStoreTopology {
);
for (WindowConfig windowConfig : list) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("wordCounter", conf, buildTopology(mapState, windowConfig));
- Utils.sleep(60 * 1000);
- cluster.shutdown();
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(mapState, windowConfig));) {
+ Utils.sleep(60 * 1000);
+ }
}
System.exit(0);
} else {
@@ -94,5 +95,4 @@ public class TridentWindowingInmemoryStoreTopology {
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(mapState, SlidingCountWindow.of(1000, 100)));
}
}
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
index ac46474..553c26f 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
@@ -19,6 +19,7 @@ package org.apache.storm.starter.trident;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
@@ -70,12 +71,13 @@ public class TridentWordCount {
Config conf = new Config();
conf.setMaxSpoutPending(20);
if (args.length == 0) {
- LocalDRPC drpc = new LocalDRPC();
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
- for (int i = 0; i < 100; i++) {
- System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
- Thread.sleep(1000);
+ try (LocalDRPC drpc = new LocalDRPC();
+ LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(drpc));) {
+ for (int i = 0; i < 100; i++) {
+ System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
+ Thread.sleep(1000);
+ }
}
}
else {
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java
index d7f2bf4..06ec4d7 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java
@@ -19,6 +19,7 @@ package org.apache.storm.starter.util;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
@@ -34,11 +35,12 @@ public final class StormRunner {
public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds)
throws InterruptedException {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(topologyName, conf, topology);
- Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
- cluster.killTopology(topologyName);
- cluster.shutdown();
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology(topologyName, conf, topology);) {
+ Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
public static void runTopologyRemotely(StormTopology topology, String topologyName, Config conf)
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
index ba513dd..d073350 100644
--- a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
@@ -19,6 +19,7 @@ package storm.starter;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.starter.spout.RandomIntegerSpout;
@@ -134,12 +135,10 @@ public class StatefulTopology {
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
- LocalCluster cluster = new LocalCluster();
- StormTopology topology = builder.createTopology();
- cluster.submitTopology("test", conf, topology);
- Utils.sleep(40000);
- cluster.killTopology("test");
- cluster.shutdown();
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopology topology = cluster.submitTopology("test", conf, builder.createTopology());) {
+ Utils.sleep(40000);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java b/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java
index e18ef36..e43759c 100644
--- a/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java
@@ -19,6 +19,7 @@ package storm.starter;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.starter.bolt.PrinterBolt;
@@ -101,12 +102,10 @@ public class StatefulWindowingTopology {
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
- LocalCluster cluster = new LocalCluster();
- StormTopology topology = builder.createTopology();
- cluster.submitTopology("test", conf, topology);
- Utils.sleep(40000);
- cluster.killTopology("test");
- cluster.shutdown();
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("test", conf, builder.createTopology());) {
+ Utils.sleep(40000);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
index 5848d2f..982765f 100644
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
@@ -19,6 +19,7 @@ package org.apache.storm.flux;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.SubmitOptions;
@@ -204,11 +205,11 @@ public class Flux {
} else {
cluster = new LocalCluster();
}
- cluster.submitTopology(topologyName, conf, topology);
-
- Utils.sleep(sleepTime);
- cluster.killTopology(topologyName);
- cluster.shutdown();
+ try (LocalTopology topo = cluster.submitTopology(topologyName, conf, topology)) {
+ Utils.sleep(sleepTime);
+ } finally {
+ cluster.shutdown();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
index c1ceeba..5755de8 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
@@ -29,9 +29,8 @@ import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.storm.Config;
-import org.apache.storm.ILocalCluster;
import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.sql.TestUtils;
import org.apache.storm.sql.compiler.TestCompilerUtils;
import org.apache.storm.sql.runtime.ISqlTridentDataSource;
@@ -40,8 +39,10 @@ import org.apache.storm.trident.TridentTopology;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.time.ZoneOffset;
@@ -57,6 +58,20 @@ public class TestPlanCompiler {
private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
RelDataTypeSystem.DEFAULT);
private final DataContext dataContext = new StormDataContext();
+ private static LocalCluster cluster;
+
+ @BeforeClass
+ public static void staticSetup() throws Exception {
+ cluster = new LocalCluster();
+ }
+
+ @AfterClass
+ public static void staticCleanup() {
+ if (cluster!= null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
@Before
public void setUp() {
@@ -347,11 +362,8 @@ public class TestPlanCompiler {
final Config conf = new Config();
conf.setMaxSpoutPending(20);
- ILocalCluster cluster = new LocalCluster();
- StormTopology stormTopo = topo.build();
- try {
- Utils.setClassLoaderForJavaDeSerialize(proc.getClass().getClassLoader());
- cluster.submitTopology("storm-sql", conf, stormTopo);
+ Utils.setClassLoaderForJavaDeSerialize(proc.getClass().getClassLoader());
+ try (LocalTopology stormTopo = cluster.submitTopology("storm-sql", conf, topo.build())) {
waitForCompletion(1000 * 1000, new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
@@ -359,8 +371,10 @@ public class TestPlanCompiler {
}
});
} finally {
+ while(cluster.getClusterInfo().get_topologies_size() > 0) {
+ Thread.sleep(10);
+ }
Utils.resetClassLoaderForJavaDeSerialize();
- cluster.shutdown();
}
}
@@ -375,4 +389,4 @@ public class TestPlanCompiler {
Assert.assertTrue("Two lists are not same (even ignoring order)!\n"+ "Expected: " + expected + "\n" + "Actual: " + actual,
CollectionUtils.isEqualCollection(expected, actual));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
index 88b2bf1..298353c 100644
--- a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
+++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
@@ -20,6 +20,7 @@ package org.apache.storm.druid;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.druid.bolt.DruidBeamBolt;
import org.apache.storm.druid.bolt.DruidBeamFactory;
@@ -69,12 +70,10 @@ public class SampleDruidBoltTopology {
} else {
conf.setMaxTaskParallelism(3);
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("druid-test", conf, topologyBuilder.createTopology());
-
- Thread.sleep(30000);
-
- cluster.shutdown();
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("druid-test", conf, topologyBuilder.createTopology());) {
+ Thread.sleep(30000);
+ }
System.exit(0);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
index 0e20ecd..3de18c1 100644
--- a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
+++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
@@ -20,6 +20,7 @@ package org.apache.storm.druid;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.druid.bolt.DruidBeamFactory;
import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
@@ -78,12 +79,10 @@ public class SampleDruidBoltTridentTopology {
} else {
conf.setMaxTaskParallelism(3);
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("druid-test", conf, tridentTopology.build());
-
- Thread.sleep(30000);
-
- cluster.shutdown();
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("druid-test", conf, tridentTopology.build());) {
+ Thread.sleep(30000);
+ }
System.exit(0);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
index 2c2261c..ae15634 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
@@ -20,6 +20,7 @@ package org.apache.storm.eventhubs.samples;
import org.apache.storm.StormSubmitter;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
@@ -135,14 +136,12 @@ public class EventCount {
} else {
config.setMaxTaskParallelism(2);
- LocalCluster localCluster = new LocalCluster();
- localCluster.submitTopology("test", config, topology);
-
- Thread.sleep(5000000);
-
- localCluster.shutdown();
+ try (LocalCluster localCluster = new LocalCluster();
+ LocalTopology topo = localCluster.submitTopology("test", config, topology);) {
+ Thread.sleep(5000000);
+ }
}
- }
+ }
protected void runScenario(String[] args) throws Exception{
readEHConfig(args);
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java b/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
index 3324aac..82dbd5b 100644
--- a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
+++ b/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
@@ -24,6 +24,7 @@ import javax.jms.TextMessage;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.jms.JmsMessageProducer;
import org.apache.storm.jms.JmsProvider;
@@ -120,11 +121,10 @@ public class ExampleJmsTopology {
conf.setDebug(true);
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("storm-jms-example", conf, builder.createTopology());
- Utils.sleep(60000);
- cluster.killTopology("storm-jms-example");
- cluster.shutdown();
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("storm-jms-example", conf, builder.createTopology());) {
+ Utils.sleep(60000);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
index 5a78137..10ad2c7 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
@@ -58,10 +58,9 @@ public class KafkaSpoutTopologyMainNamedTopics {
} else {
submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), getConfig());
}
-
}
- protected void submitTopologyLocalCluster(StormTopology topology, Config config) throws InterruptedException {
+ protected void submitTopologyLocalCluster(StormTopology topology, Config config) throws Exception {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, topology);
stopWaitingForInput();
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
index 0873601..049fce7 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.storm.kafka;
import org.junit.Test;
import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
import org.junit.After;
import org.junit.Before;
import static org.junit.Assert.assertEquals;
@@ -34,15 +35,19 @@ public class ExponentialBackoffMsgRetryManagerTest {
private static final Long TEST_OFFSET2 = 102L;
private static final Long TEST_OFFSET3 = 105L;
private static final Long TEST_NEW_OFFSET = 103L;
+ private SimulatedTime st;
@Before
public void setup() throws Exception {
- Time.startSimulating();
+ st = new SimulatedTime();
}
@After
public void cleanup() throws Exception {
- Time.stopSimulating();
+ if (st != null) {
+ st.close();
+ st = null;
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java b/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
index 57725b4..0dd4d73 100644
--- a/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
+++ b/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.storm.mqtt;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.testing.IntegrationTest;
import org.apache.storm.topology.TopologyBuilder;
@@ -100,31 +101,31 @@ public class StormMqttIntegrationTest implements Serializable{
Topic[] topics = {new Topic("/integration-result", QoS.AT_LEAST_ONCE)};
byte[] qoses = connection.subscribe(topics);
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", new Config(), buildMqttTopology());
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("test", new Config(), buildMqttTopology());) {
- LOG.info("topology started");
- while(!spoutActivated) {
- Thread.sleep(500);
- }
+ LOG.info("topology started");
+ while(!spoutActivated) {
+ Thread.sleep(500);
+ }
- // publish a retained message to the broker
- MqttOptions options = new MqttOptions();
- options.setCleanConnection(false);
- MqttPublisher publisher = new MqttPublisher(options, true);
- publisher.connectMqtt("MqttPublisher");
- publisher.publish(new MqttMessage(TEST_TOPIC, "test".getBytes()));
+ // publish a retained message to the broker
+ MqttOptions options = new MqttOptions();
+ options.setCleanConnection(false);
+ MqttPublisher publisher = new MqttPublisher(options, true);
+ publisher.connectMqtt("MqttPublisher");
+ publisher.publish(new MqttMessage(TEST_TOPIC, "test".getBytes()));
- LOG.info("published message");
+ LOG.info("published message");
- Message message = connection.receive();
- LOG.info("Message recieved on topic: {}", message.getTopic());
- LOG.info("Payload: {}", new String(message.getPayload()));
- message.ack();
+ Message message = connection.receive();
+ LOG.info("Message recieved on topic: {}", message.getTopic());
+ LOG.info("Payload: {}", new String(message.getPayload()));
+ message.ack();
- Assert.assertArrayEquals(message.getPayload(), RESULT_PAYLOAD.getBytes());
- Assert.assertEquals(message.getTopic(), RESULT_TOPIC);
- cluster.shutdown();
+ Assert.assertArrayEquals(message.getPayload(), RESULT_PAYLOAD.getBytes());
+ Assert.assertEquals(message.getTopic(), RESULT_TOPIC);
+ }
}
public StormTopology buildMqttTopology(){
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java b/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java
index d464608..992bb5d 100644
--- a/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java
+++ b/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java
@@ -18,6 +18,7 @@
package org.apache.storm;
import org.apache.storm.generated.StormTopology;
+import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.TestWordSpout;
@@ -75,11 +76,10 @@ public class ExclamationTopology {
}
else {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, topology);
- Utils.sleep(10000);
- cluster.killTopology("test");
- cluster.shutdown();
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopolgoy topo = cluster.submitTopology("test", conf, topology);) {
+ Utils.sleep(10000);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-clojure/src/clj/org/apache/storm/clojure.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/clojure.clj b/storm-clojure/src/clj/org/apache/storm/clojure.clj
index 607fc24..1fe0c26 100644
--- a/storm-clojure/src/clj/org/apache/storm/clojure.clj
+++ b/storm-clojure/src/clj/org/apache/storm/clojure.clj
@@ -195,6 +195,24 @@
(let [values (tuple-values values collector stream)]
(.emitDirect ^SpoutOutputCollector (:output-collector collector) task stream values id)))
+(defmacro defalias
+ "Defines an alias for a var: a new var with the same root binding (if
+ any) and similar metadata. The metadata of the alias is its initial
+ metadata (as provided by def) merged into the metadata of the original."
+ ([name orig]
+ `(do
+ (alter-meta!
+ (if (.hasRoot (var ~orig))
+ (def ~name (.getRawRoot (var ~orig)))
+ (def ~name))
+ ;; When copying metadata, disregard {:macro false}.
+ ;; Workaround for http://www.assembla.com/spaces/clojure/tickets/273
+ #(conj (dissoc % :macro)
+ (apply dissoc (meta (var ~orig)) (remove #{:macro} (keys %)))))
+ (var ~name)))
+ ([name orig doc]
+ (list `defalias (with-meta name (assoc (meta name) :doc doc)) orig)))
+
(defalias topology thrift/mk-topology)
(defalias bolt-spec thrift/mk-bolt-spec)
(defalias spout-spec thrift/mk-spout-spec)
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-clojure/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/testing.clj b/storm-clojure/src/clj/org/apache/storm/testing.clj
new file mode 100644
index 0000000..e080dd2
--- /dev/null
+++ b/storm-clojure/src/clj/org/apache/storm/testing.clj
@@ -0,0 +1,270 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.testing
+ (:import [org.apache.storm LocalCluster$Builder])
+ (:import [java.util.function UnaryOperator])
+ (:import [org.apache.storm.utils Time Time$SimulatedTime Utils RegisteredGlobalState])
+ (:import [org.apache.storm.testing InProcessZookeeper MkTupleParam TestJob MkClusterParam
+ TrackedTopology CompleteTopologyParam MockedSources])
+ (:import [org.apache.storm Thrift Testing Testing$Condition])
+ (:import [org.apache.storm.testing MockLeaderElector])
+ (:import [org.json.simple JSONValue])
+ (:use [org.apache.storm util config log])
+ (:use [org.apache.storm thrift]))
+
+(defnk add-supervisor
+ [cluster-map :ports 2 :conf {} :id nil]
+ (let [local-cluster (:local-cluster cluster-map)]
+ (.addSupervisor local-cluster ports conf id)))
+
+(defnk mock-leader-elector [:is-leader true :leader-name "test-host" :leader-port 9999]
+ (MockLeaderElector. is-leader leader-name leader-port))
+
+(defn local-cluster-state [local-cluster]
+ {:nimbus (.getNimbus local-cluster)
+ :daemon-conf (.getDaemonConf local-cluster)
+ :storm-cluster-state (.getClusterState local-cluster)
+ :local-cluster local-cluster})
+
+(defnk mk-mocked-nimbus
+ [:daemon-conf {} :inimbus nil :blob-store nil :cluster-state nil
+ :leader-elector nil :group-mapper nil :nimbus-daemon false :nimbus-wrapper nil]
+ (let [builder (doto (LocalCluster$Builder.)
+ (.withDaemonConf daemon-conf)
+ (.withINimbus inimbus)
+ (.withBlobStore blob-store)
+ (.withClusterState cluster-state)
+ (.withLeaderElector leader-elector)
+ (.withGroupMapper group-mapper)
+ (.withNimbusDaemon nimbus-daemon)
+ (.withNimbusWrapper (when nimbus-wrapper (reify UnaryOperator (apply [this nimbus] (nimbus-wrapper nimbus))))))
+ local-cluster (.build builder)]
+ (local-cluster-state local-cluster)))
+
+(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :group-mapper nil :supervisor-slot-port-min 1024 :nimbus-daemon false]
+ (let [builder (doto (LocalCluster$Builder.)
+ (.withSupervisors supervisors)
+ (.withPortsPerSupervisor ports-per-supervisor)
+ (.withDaemonConf daemon-conf)
+ (.withINimbus inimbus)
+ (.withGroupMapper group-mapper)
+ (.withSupervisorSlotPortMin supervisor-slot-port-min)
+ (.withNimbusDaemon nimbus-daemon))
+ local-cluster (.build builder)]
+ (local-cluster-state local-cluster)))
+
+(defn get-supervisor [cluster-map supervisor-id]
+ (let [local-cluster (:local-cluster cluster-map)]
+ (.getSupervisor local-cluster supervisor-id)))
+
+(defn kill-supervisor [cluster-map supervisor-id]
+ (let [local-cluster (:local-cluster cluster-map)]
+ (.killSupervisor local-cluster supervisor-id)))
+
+(defn kill-local-storm-cluster [cluster-map]
+ (let [local-cluster (:local-cluster cluster-map)]
+ (.close local-cluster)))
+
+(defmacro while-timeout [timeout-ms condition & body]
+ `(Testing/whileTimeout ~timeout-ms
+ (reify Testing$Condition (exec [this] ~condition))
+ (fn [] ~@body)))
+
+(defn wait-for-condition
+ ([apredicate]
+ (wait-for-condition Testing/TEST_TIMEOUT_MS apredicate))
+ ([timeout-ms apredicate]
+ (while-timeout timeout-ms (not (apredicate))
+ (Time/sleep 100))))
+
+(defn wait-until-cluster-waiting
+ "Wait until the cluster is idle. Should be used with time simulation."
+ ([cluster-map]
+ (let [local-cluster (:local-cluster cluster-map)]
+ (.waitForIdle local-cluster)))
+ ([cluster-map timeout-ms]
+ (let [local-cluster (:local-cluster cluster-map)]
+ (.waitForIdle local-cluster timeout-ms))))
+
+(defn advance-cluster-time
+ ([cluster-map secs increment-secs]
+ (let [local-cluster (:local-cluster cluster-map)]
+ (.advanceClusterTime local-cluster secs increment-secs)))
+ ([cluster-map secs]
+ (let [local-cluster (:local-cluster cluster-map)]
+ (.advanceClusterTime local-cluster secs))))
+
+(defmacro with-mocked-nimbus
+ [[nimbus-sym & args] & body]
+ `(let [~nimbus-sym (mk-mocked-nimbus ~@args)]
+ (try
+ ~@body
+ (catch Throwable t#
+ (log-error t# "Error in cluster")
+ (throw t#))
+ (finally
+ (let [keep-waiting?# (atom true)
+ f# (future (while @keep-waiting?# (simulate-wait ~nimbus-sym)))]
+ (kill-local-storm-cluster ~nimbus-sym)
+ (reset! keep-waiting?# false)
+ @f#)))))
+
+(defmacro with-local-cluster
+ [[cluster-sym & args] & body]
+ `(let [~cluster-sym (mk-local-storm-cluster ~@args)]
+ (try
+ ~@body
+ (catch Throwable t#
+ (log-error t# "Error in cluster")
+ (throw t#))
+ (finally
+ (let [keep-waiting?# (atom true)
+ f# (future (while @keep-waiting?# (simulate-wait ~cluster-sym)))]
+ (kill-local-storm-cluster ~cluster-sym)
+ (reset! keep-waiting?# false)
+ @f#)))))
+
+(defmacro with-simulated-time-local-cluster
+ [& args]
+ `(with-open [_# (Time$SimulatedTime.)]
+ (with-local-cluster ~@args)))
+
+(defmacro with-inprocess-zookeeper
+ [port-sym & body]
+ `(with-open [zks# (InProcessZookeeper. )]
+ (let [~port-sym (.getPort zks#)]
+ ~@body)))
+
+(defn submit-local-topology
+ [nimbus storm-name conf topology]
+ (when-not (Utils/isValidConf conf)
+ (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
+ (.submitTopology nimbus storm-name nil (JSONValue/toJSONString conf) topology))
+
+(defn submit-local-topology-with-opts
+ [nimbus storm-name conf topology submit-opts]
+ (when-not (Utils/isValidConf conf)
+ (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
+ (.submitTopologyWithOpts nimbus storm-name nil (JSONValue/toJSONString conf) topology submit-opts))
+
+(defn simulate-wait
+ [cluster-map]
+ (Testing/simulateWait (:local-cluster cluster-map)))
+
+(defn spout-objects [spec-map]
+ (for [[_ spout-spec] spec-map]
+ (-> spout-spec
+ .get_spout_object
+ (Thrift/deserializeComponentObject))))
+
+(defn capture-topology
+ [topology]
+ (let [cap-topo (Testing/captureTopology topology)]
+ {:topology (.topology cap-topo)
+ :capturer (.capturer cap-topo)}))
+
+(defnk complete-topology
+ [cluster-map topology
+ :mock-sources {}
+ :storm-conf {}
+ :cleanup-state true
+ :topology-name nil
+ :timeout-ms Testing/TEST_TIMEOUT_MS]
+ (Testing/completeTopology (:local-cluster cluster-map) topology,
+ (doto (CompleteTopologyParam.)
+ (.setStormConf storm-conf)
+ (.setTopologyName topology-name)
+ (.setTimeoutMs timeout-ms)
+ (.setMockedSources (MockedSources. mock-sources))
+ (.setCleanupState cleanup-state))))
+
+(defn read-tuples
+ ([results component-id stream-id]
+ (Testing/readTuples results component-id stream-id))
+ ([results component-id]
+ (Testing/readTuples results component-id )))
+
+(defn ms=
+ [a b]
+ (Testing/multiseteq a b))
+
+(def TRACKER-BOLT-ID "+++tracker-bolt")
+
+;; TODO: should override system-topology! and wrap everything there
+(defn mk-tracked-topology
+ ([tracked-cluster topology]
+ (let [tt (TrackedTopology. topology (:local-cluster tracked-cluster))]
+ {:topology (.getTopology tt)
+ :tracked-topo tt})))
+
+(defn increment-global!
+ [id key amt]
+ (-> (RegisteredGlobalState/getState id)
+ (get key)
+ (.addAndGet amt)))
+
+(defn global-amt
+ [id key]
+ (-> (RegisteredGlobalState/getState id)
+ (get key)
+ .get))
+
+(defnk mkClusterParam
+ [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :nimbus-daemon false]
+ ;;TODO do we need to support inimbus?, group-mapper, or supervisor-slot-port-min
+ (doto (MkClusterParam. )
+ (.setDaemonConf daemon-conf)
+ (.setNimbusDaemon nimbus-daemon)
+ (.setPortsPerSupervisor (int ports-per-supervisor))
+ (.setSupervisors (int supervisors))))
+
+(defmacro with-tracked-cluster
+ [[cluster-sym & cluster-args] & body]
+ `(Testing/withTrackedCluster
+ (mkClusterParam ~@cluster-args)
+ (reify TestJob
+ (run [this# lc#]
+ (let [~cluster-sym (local-cluster-state lc#)]
+ ~@body)))))
+
+(defn tracked-wait
+ "Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
+ ([tracked-map]
+ (Testing/trackedWait (:tracked-topo tracked-map)))
+ ([tracked-map amt]
+ (Testing/trackedWait (:tracked-topo tracked-map) (int amt)))
+ ([tracked-map amt timeout-ms]
+ (Testing/trackedWait (:tracked-topo tracked-map) (int amt) (int timeout-ms))))
+
+(defnk test-tuple
+ [values
+ :stream Utils/DEFAULT_STREAM_ID
+ :component "component"
+ :fields nil]
+ (Testing/testTuple
+ values
+ (doto (MkTupleParam. )
+ (.setStream stream)
+ (.setComponent component)
+ (.setFieldsList fields))))
+
+(defmacro with-timeout
+ [millis unit & body]
+ `(let [f# (future ~@body)]
+ (try
+ (.get f# ~millis ~unit)
+ (finally (future-cancel f#)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/clj/org/apache/storm/LocalCluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/LocalCluster.clj b/storm-core/src/clj/org/apache/storm/LocalCluster.clj
deleted file mode 100644
index bce2a2e..0000000
--- a/storm-core/src/clj/org/apache/storm/LocalCluster.clj
+++ /dev/null
@@ -1,106 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.LocalCluster
- (:use [org.apache.storm testing config])
- (:import [org.apache.storm.utils Utils])
- (:import [java.util Map])
- (:gen-class
- :init init
- :implements [org.apache.storm.ILocalCluster]
- :constructors {[] []
- [java.util.Map] []
- [String Long] []}
- :state state))
-
-(defn -init
- ([]
- (let [ret (mk-local-storm-cluster
- :daemon-conf
- {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})]
- [[] ret]))
- ([^String zk-host ^Long zk-port]
- (let [ret (mk-local-storm-cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true
- STORM-ZOOKEEPER-SERVERS (list zk-host)
- STORM-ZOOKEEPER-PORT zk-port})]
- [[] ret]))
- ([^Map stateMap]
- [[] stateMap]))
-
-(defn submit-hook [hook name conf topology]
- (let [topologyInfo (Utils/getTopologyInfo name nil conf)]
- (.notify hook topologyInfo conf topology)))
-
-(defn -submitTopology
- [this name conf topology]
- (submit-local-topology
- (:nimbus (. this state)) name conf topology)
- (let [hook (Utils/getConfiguredClass conf STORM-TOPOLOGY-SUBMISSION-NOTIFIER-PLUGIN)]
- (when hook (submit-hook hook name conf topology))))
-
-
-(defn -submitTopologyWithOpts
- [this name conf topology submit-opts]
- (submit-local-topology-with-opts
- (:nimbus (. this state)) name conf topology submit-opts))
-
-(defn -uploadNewCredentials
- [this name creds]
- (.uploadNewCredentials (:nimbus (. this state)) name creds))
-
-(defn -shutdown
- [this]
- (kill-local-storm-cluster (. this state)))
-
-(defn -killTopology
- [this name]
- (.killTopology (:nimbus (. this state)) name))
-
-(defn -getTopologyConf
- [this id]
- (.getTopologyConf (:nimbus (. this state)) id))
-
-(defn -getTopology
- [this id]
- (.getTopology (:nimbus (. this state)) id))
-
-(defn -getClusterInfo
- [this]
- (.getClusterInfo (:nimbus (. this state))))
-
-(defn -getTopologyInfo
- [this id]
- (.getTopologyInfo (:nimbus (. this state)) id))
-
-(defn -killTopologyWithOpts
- [this name opts]
- (.killTopologyWithOpts (:nimbus (. this state)) name opts))
-
-(defn -activate
- [this name]
- (.activate (:nimbus (. this state)) name))
-
-(defn -deactivate
- [this name]
- (.deactivate (:nimbus (. this state)) name))
-
-(defn -rebalance
- [this name opts]
- (.rebalance (:nimbus (. this state)) name opts))
-
-(defn -getState
- [this]
- (.state this))
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/config.clj b/storm-core/src/clj/org/apache/storm/config.clj
index d0c4d87..bfe47ed 100644
--- a/storm-core/src/clj/org/apache/storm/config.clj
+++ b/storm-core/src/clj/org/apache/storm/config.clj
@@ -15,14 +15,7 @@
;; limitations under the License.
(ns org.apache.storm.config
- (:import [java.io FileReader File IOException]
- [org.apache.storm.generated StormTopology])
- (:import [org.apache.storm Config])
- (:import [org.apache.storm.utils Utils LocalState ConfigUtils MutableInt])
- (:import [org.apache.storm.validation ConfigValidation])
- (:import [org.apache.commons.io FileUtils])
- (:require [clojure [string :as str]])
- (:use [org.apache.storm log util]))
+ (:import [org.apache.storm Config]))
(defn- clojure-config-name [name]
(.replace (.toUpperCase name) "_" "-"))
@@ -33,12 +26,3 @@
new-name (clojure-config-name name)]
(eval
`(def ~(symbol new-name) (. Config ~(symbol name))))))
-
-(def ALL-CONFIGS
- (dofor [f (seq (.getFields Config))]
- (.get f nil)))
-
-;; TODO this function and its callings will be replace when nimbus and supervisor move to Java
-(defn cluster-mode
- [conf & args]
- (keyword (conf STORM-CLUSTER-MODE)))
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj
deleted file mode 100644
index 6f041bb..0000000
--- a/storm-core/src/clj/org/apache/storm/converter.clj
+++ /dev/null
@@ -1,240 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.converter
- (:import [org.apache.storm.generated SupervisorInfo NodeInfo Assignment WorkerResources
- StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions
- TopologyActionOptions DebugOptions ProfileRequest]
- [org.apache.storm.daemon.nimbus TopologyActions]
- [org.apache.storm.utils Utils]
- [org.apache.storm.stats StatsUtil])
- (:import [org.apache.storm.cluster ExecutorBeat])
- (:use [org.apache.storm util log])
- (:require [org.apache.storm.daemon [common :as common]]))
-
-(defn thriftify-supervisor-info [supervisor-info]
- (doto (SupervisorInfo.)
- (.set_time_secs (long (:time-secs supervisor-info)))
- (.set_hostname (:hostname supervisor-info))
- (.set_assignment_id (:assignment-id supervisor-info))
- (.set_used_ports (map long (:used-ports supervisor-info)))
- (.set_meta (map long (:meta supervisor-info)))
- (.set_scheduler_meta (:scheduler-meta supervisor-info))
- (.set_uptime_secs (long (:uptime-secs supervisor-info)))
- (.set_version (:version supervisor-info))
- (.set_resources_map (:resources-map supervisor-info))
- ))
-
-(defn clojurify-supervisor-info [^SupervisorInfo supervisor-info]
- (if supervisor-info
- (org.apache.storm.daemon.common.SupervisorInfo.
- (.get_time_secs supervisor-info)
- (.get_hostname supervisor-info)
- (.get_assignment_id supervisor-info)
- (if (.get_used_ports supervisor-info) (into [] (.get_used_ports supervisor-info)))
- (if (.get_meta supervisor-info) (into [] (.get_meta supervisor-info)))
- (if (.get_scheduler_meta supervisor-info) (into {} (.get_scheduler_meta supervisor-info)))
- (.get_uptime_secs supervisor-info)
- (.get_version supervisor-info)
- (if-let [res-map (.get_resources_map supervisor-info)] (into {} res-map)))))
-
-(defn thriftify-assignment [assignment]
- (let [thrift-assignment (doto (Assignment.)
- (.set_master_code_dir (:master-code-dir assignment))
- (.set_node_host (:node->host assignment))
- (.set_executor_node_port (into {}
- (map (fn [[k v]]
- [(map long k)
- (NodeInfo. (first v) (set (map long (rest v))))])
- (:executor->node+port assignment))))
- (.set_executor_start_time_secs
- (into {}
- (map (fn [[k v]]
- [(map long k) (long v)])
- (:executor->start-time-secs assignment)))))]
- (if (:worker->resources assignment)
- (.set_worker_resources thrift-assignment (into {} (map
- (fn [[node+port resources]]
- [(NodeInfo. (first node+port) (set (map long (rest node+port))))
- (doto (WorkerResources.)
- (.set_mem_on_heap (first resources))
- (.set_mem_off_heap (second resources))
- (.set_cpu (last resources)))])
- (:worker->resources assignment)))))
- thrift-assignment))
-
-(defn clojurify-task->node_port [task->node_port]
- (into {}
- (map-val
- (fn [nodeInfo]
- (concat [(.get_node nodeInfo)] (.get_port nodeInfo))) ;nodeInfo should be converted to [node,port1,port2..]
- task->node_port)))
-
-;TODO: when translating this function, you should replace the map-key with a proper for loop HERE
-(defn clojurify-executor->node_port [executor->node_port]
- (into {}
- (map-val
- (fn [nodeInfo]
- (concat [(.get_node nodeInfo)] (.get_port nodeInfo))) ;nodeInfo should be converted to [node,port1,port2..]
- (map-key
- (fn [list-of-executors]
- (into [] list-of-executors)) ; list of executors must be coverted to clojure vector to ensure it is sortable.
- executor->node_port))))
-
-(defn thriftify-executor->node_port [executor->node_port]
- (into {}
- (map (fn [[k v]]
- [(map long k)
- (NodeInfo. (first v) (set (map long (rest v))))])
- executor->node_port))
-)
-
-(defn clojurify-worker->resources [worker->resources]
- "convert worker info to be [node, port]
- convert resources to be [mem_on_heap mem_off_heap cpu]"
- (into {} (map
- (fn [[nodeInfo resources]]
- [(concat [(.get_node nodeInfo)] (.get_port nodeInfo))
- [(.get_mem_on_heap resources) (.get_mem_off_heap resources) (.get_cpu resources)]])
- worker->resources)))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn clojurify-assignment [^Assignment assignment]
- (if assignment
- (org.apache.storm.daemon.common.Assignment.
- (.get_master_code_dir assignment)
- (into {} (.get_node_host assignment))
- (clojurify-executor->node_port (into {} (.get_executor_node_port assignment)))
- (map-key (fn [executor] (into [] executor))
- (into {} (.get_executor_start_time_secs assignment)))
- (clojurify-worker->resources (into {} (.get_worker_resources assignment))))))
-
-(defn convert-to-symbol-from-status [status]
- (if status {:type status} nil))
-
-(defn- convert-to-status-from-symbol [status]
- (if status
- (:type status)))
-
-(defn assoc-non-nil
- [m k v]
- (if v (assoc m k v) m))
-
-(defn clojurify-rebalance-options [^RebalanceOptions rebalance-options]
- (-> {:action TopologyActions/REBALANCE}
- (assoc-non-nil :delay-secs (if (.is_set_wait_secs rebalance-options) (.get_wait_secs rebalance-options)))
- (assoc-non-nil :num-workers (if (.is_set_num_workers rebalance-options) (.get_num_workers rebalance-options)))
- (assoc-non-nil :component->executors (if (.is_set_num_executors rebalance-options) (into {} (.get_num_executors rebalance-options))))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn thriftify-rebalance-options [rebalance-options]
- (if rebalance-options
- (let [thrift-rebalance-options (RebalanceOptions.)]
- (if (:delay-secs rebalance-options)
- (.set_wait_secs thrift-rebalance-options (int (:delay-secs rebalance-options))))
- (if (:num-workers rebalance-options)
- (.set_num_workers thrift-rebalance-options (int (:num-workers rebalance-options))))
- (if (:component->executors rebalance-options)
- (.set_num_executors thrift-rebalance-options (map-val int (:component->executors rebalance-options))))
- thrift-rebalance-options)))
-
-(defn clojurify-kill-options [^KillOptions kill-options]
- (-> {:action TopologyActions/KILL}
- (assoc-non-nil :delay-secs (if (.is_set_wait_secs kill-options) (.get_wait_secs kill-options)))))
-
-(defn thriftify-kill-options [kill-options]
- (if kill-options
- (let [thrift-kill-options (KillOptions.)]
- (if (:delay-secs kill-options)
- (.set_wait_secs thrift-kill-options (int (:delay-secs kill-options))))
- thrift-kill-options)))
-
-(defn thriftify-topology-action-options [storm-base]
- (if (:topology-action-options storm-base)
- (let [ topology-action-options (:topology-action-options storm-base)
- action (:action topology-action-options)
- thrift-topology-action-options (TopologyActionOptions.)]
- (if (= action TopologyActions/KILL)
- (.set_kill_options thrift-topology-action-options (thriftify-kill-options topology-action-options)))
- (if (= action TopologyActions/REBALANCE)
- (.set_rebalance_options thrift-topology-action-options (thriftify-rebalance-options topology-action-options)))
- thrift-topology-action-options)))
-
-(defn clojurify-topology-action-options [^TopologyActionOptions topology-action-options]
- (if topology-action-options
- (or (and (.is_set_kill_options topology-action-options)
- (clojurify-kill-options
- (.get_kill_options topology-action-options)))
- (and (.is_set_rebalance_options topology-action-options)
- (clojurify-rebalance-options
- (.get_rebalance_options topology-action-options))))))
-
-(defn clojurify-debugoptions [^DebugOptions options]
- (if options
- {
- :enable (.is_enable options)
- :samplingpct (.get_samplingpct options)
- }
- ))
-
-(defn thriftify-debugoptions [options]
- (doto (DebugOptions.)
- (.set_enable (get options :enable false))
- (.set_samplingpct (get options :samplingpct 10))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn thriftify-storm-base [storm-base]
- (doto (StormBase.)
- (.set_name (:storm-name storm-base))
- (.set_launch_time_secs (if (:launch-time-secs storm-base) (int (:launch-time-secs storm-base)) 0))
- (.set_status (convert-to-status-from-symbol (:status storm-base)))
- (.set_num_workers (if (:num-workers storm-base) (int (:num-workers storm-base)) 0))
- (.set_component_executors (map-val int (:component->executors storm-base)))
- (.set_owner (:owner storm-base))
- (.set_topology_action_options (thriftify-topology-action-options storm-base))
- (.set_prev_status (convert-to-status-from-symbol (:prev-status storm-base)))
- (.set_component_debug (map-val thriftify-debugoptions (:component->debug storm-base)))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn clojurify-storm-base [^StormBase storm-base]
- (if storm-base
- (org.apache.storm.daemon.common.StormBase.
- (.get_name storm-base)
- (.get_launch_time_secs storm-base)
- (convert-to-symbol-from-status (.get_status storm-base))
- (.get_num_workers storm-base)
- (into {} (.get_component_executors storm-base))
- (.get_owner storm-base)
- (clojurify-topology-action-options (.get_topology_action_options storm-base))
- (convert-to-symbol-from-status (.get_prev_status storm-base))
- (map-val clojurify-debugoptions (.get_component_debug storm-base)))))
-
-(defn clojurify-profile-request
- [^ProfileRequest request]
- (when request
- {:host (.get_node (.get_nodeInfo request))
- :port (first (.get_port (.get_nodeInfo request)))
- :action (.get_action request)
- :timestamp (.get_time_stamp request)}))
-
-(defn thriftify-credentials [credentials]
- (doto (Credentials.)
- (.set_creds (if credentials credentials {}))))
-
-(defn clojurify-crdentials [^Credentials credentials]
- (if credentials
- (into {} (.get_creds credentials))
- nil
- ))
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
deleted file mode 100644
index 01a49b3..0000000
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ /dev/null
@@ -1,55 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.daemon.common
- (:use [org.apache.storm log config util])
- (:require [clojure.set :as set])
- (:import (org.apache.storm.task WorkerTopologyContext)
- (org.apache.storm.utils Utils ConfigUtils)
- (java.io InterruptedIOException)))
-
-;; the task id is the virtual port
-;; node->host is here so that tasks know who to talk to just from assignment
-;; this avoid situation where node goes down and task doesn't know what to do information-wise
-(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs worker->resources])
-
-
-;; component->executors is a map from spout/bolt id to number of executors for that component
-(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status component->debug])
-
-(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version resources-map])
-
-(defrecord ExecutorStats [^long processed
- ^long acked
- ^long emitted
- ^long transferred
- ^long failed])
-
-(defn new-executor-stats []
- (ExecutorStats. 0 0 0 0 0))
-
-(defmacro defserverfn [name & body]
- `(let [exec-fn# (fn ~@body)]
- (defn ~name [& args#]
- (try-cause
- (apply exec-fn# args#)
- (catch InterruptedIOException e#
- (throw e#))
- (catch InterruptedException e#
- (throw e#))
- (catch Throwable t#
- (log-error t# "Error on initialization of server " ~(str name))
- (Utils/exitProcess 13 "Error on initialization")
- )))))
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
deleted file mode 100644
index a02a8e2..0000000
--- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
+++ /dev/null
@@ -1,28 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.daemon.local-supervisor
- (:import [org.apache.storm.daemon.supervisor Supervisor]
- [org.apache.storm.utils ConfigUtils])
- (:use [org.apache.storm.daemon common])
- (:gen-class))
-
-(defserverfn mk-local-supervisor [conf shared-context isupervisor]
- (if (not (ConfigUtils/isLocalMode conf))
- (throw
- (IllegalArgumentException. "Cannot start server in distrubuted mode!")))
- (let [supervisor-server (Supervisor. conf shared-context isupervisor)]
- (.launch supervisor-server)
- supervisor-server))
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/clj/org/apache/storm/internal/thrift.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/internal/thrift.clj b/storm-core/src/clj/org/apache/storm/internal/thrift.clj
index 07e0bdf..87a0441 100644
--- a/storm-core/src/clj/org/apache/storm/internal/thrift.clj
+++ b/storm-core/src/clj/org/apache/storm/internal/thrift.clj
@@ -50,24 +50,6 @@
[^Grouping grouping]
(grouping-constants (.getSetField grouping)))
-(defn nimbus-client-and-conn
- ([host port]
- (nimbus-client-and-conn host port nil))
- ([host port as-user]
- (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user)
- (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
- nimbusClient (NimbusClient. conf host port nil as-user)
- client (.getClient nimbusClient)
- transport (.transport nimbusClient)]
- [client transport] )))
-
-(defmacro with-nimbus-connection
- [[client-sym host port] & body]
- `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port)]
- (try
- ~@body
- (finally (.close conn#)))))
-
(defmacro with-configured-nimbus-connection
[client-sym & body]
`(let [conf# (clojurify-structure (ConfigUtils/readStormConfig))
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/clj/org/apache/storm/local_state_converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/local_state_converter.clj b/storm-core/src/clj/org/apache/storm/local_state_converter.clj
deleted file mode 100644
index e8eeaca..0000000
--- a/storm-core/src/clj/org/apache/storm/local_state_converter.clj
+++ /dev/null
@@ -1,24 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.local-state-converter
- (:import [org.apache.storm.generated ExecutorInfo]))
-
-(defn ->ExecutorInfo
- [[low high]] (ExecutorInfo. low high))
-
-(defn ->ExecutorInfo-list
- [executors]
- (map ->ExecutorInfo executors))
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/clj/org/apache/storm/log.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/log.clj b/storm-core/src/clj/org/apache/storm/log.clj
index 96570e3..7a006ef 100644
--- a/storm-core/src/clj/org/apache/storm/log.clj
+++ b/storm-core/src/clj/org/apache/storm/log.clj
@@ -15,9 +15,7 @@
;; limitations under the License.
(ns org.apache.storm.log
- (:require [clojure.tools.logging :as log])
- (:use [clojure pprint])
- (:import [java.io StringWriter]))
+ (:require [clojure.tools.logging :as log]))
(defmacro log-message
[& args]
@@ -31,26 +29,6 @@
[& args]
`(log/debug (str ~@args)))
-(defmacro log-warn-error
- [e & args]
- `(log/warn (str ~@args) ~e))
-
(defmacro log-warn
[& args]
`(log/warn (str ~@args)))
-
-(defn log-capture!
- [& args]
- (apply log/log-capture! args))
-
-(defn log-stream
- [& args]
- (apply log/log-stream args))
-
-(defmacro log-pprint
- [& args]
- `(let [^StringWriter writer# (StringWriter.)]
- (doall
- (for [object# [~@args]]
- (pprint object# writer#)))
- (log-message "\n" writer#)))