You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/12/03 15:50:04 UTC
[04/10] storm git commit: STORM-1281: LocalCluster,
testing4j and testing.clj to java
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/CompleteTopologyParam.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/CompleteTopologyParam.java b/storm-core/src/jvm/org/apache/storm/testing/CompleteTopologyParam.java
index c124d58..00d0e57 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/CompleteTopologyParam.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/CompleteTopologyParam.java
@@ -17,7 +17,10 @@
*/
package org.apache.storm.testing;
+import java.util.Map;
+
import org.apache.storm.Config;
+import org.apache.storm.Testing;
/**
* The param class for the `Testing.completeTopology`.
@@ -26,15 +29,15 @@ public class CompleteTopologyParam {
/**
* The mocked spout sources
*/
- private MockedSources mockedSources;
+ private MockedSources mockedSources = new MockedSources();
/**
* the config for the topology when it was submitted to the cluster
*/
- private Config stormConf;
+ private Map<String, Object> stormConf = new Config();
/**
* whether cleanup the state?
*/
- private Boolean cleanupState;
+ private boolean cleanupState = true;
/**
* the topology name you want to submit to the cluster
*/
@@ -43,29 +46,39 @@ public class CompleteTopologyParam {
/**
* the timeout of topology you want to submit to the cluster
*/
- private Integer timeoutMs;
+ private int timeoutMs = Testing.TEST_TIMEOUT_MS;
public MockedSources getMockedSources() {
return mockedSources;
}
public void setMockedSources(MockedSources mockedSources) {
+ if (mockedSources == null) {
+ mockedSources = new MockedSources();
+ }
+
this.mockedSources = mockedSources;
}
- public Config getStormConf() {
+ public Map<String, Object> getStormConf() {
return stormConf;
}
- public void setStormConf(Config stormConf) {
+ public void setStormConf(Map<String, Object> stormConf) {
+ if (stormConf == null) {
+ stormConf = new Config();
+ }
this.stormConf = stormConf;
}
- public Boolean getCleanupState() {
+ public boolean getCleanupState() {
return cleanupState;
}
public void setCleanupState(Boolean cleanupState) {
+ if (cleanupState == null) {
+ cleanupState = true;
+ }
this.cleanupState = cleanupState;
}
@@ -82,6 +95,9 @@ public class CompleteTopologyParam {
}
public void setTimeoutMs(Integer timeoutMs) {
+ if (timeoutMs == null) {
+ timeoutMs = Testing.TEST_TIMEOUT_MS;
+ }
this.timeoutMs = timeoutMs;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java b/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
index 02872d0..7801ca0 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
@@ -37,6 +37,10 @@ public class FeederSpout extends BaseRichSpout {
private SpoutOutputCollector _collector;
private AckFailDelegate _ackFailDelegate;
+ public FeederSpout(List<String> outFields) {
+ this(new Fields(outFields));
+ }
+
public FeederSpout(Fields outFields) {
_id = InprocMessaging.acquireNewPort();
_outFields = outFields;
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/FixedTupleSpout.java b/storm-core/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
index 87fc52d..feac43e 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
@@ -30,7 +30,7 @@ import java.util.Map;
import java.util.UUID;
import static org.apache.storm.utils.Utils.get;
-public class FixedTupleSpout implements IRichSpout {
+public class FixedTupleSpout implements IRichSpout, CompletableSpout {
private static final Map<String, Integer> acked = new HashMap<String, Integer>();
private static final Map<String, Integer> failed = new HashMap<String, Integer>();
@@ -176,4 +176,9 @@ public class FixedTupleSpout implements IRichSpout {
public Map<String, Object> getComponentConfiguration() {
return null;
}
+
+ @Override
+ public boolean isExhausted() {
+ return getSourceTuples().size() == getCompleted();
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/InProcessZookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/InProcessZookeeper.java b/storm-core/src/jvm/org/apache/storm/testing/InProcessZookeeper.java
new file mode 100644
index 0000000..dafde30
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/testing/InProcessZookeeper.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.testing;
+
+import java.util.List;
+
+import org.apache.storm.zookeeper.Zookeeper;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+
+/**
+ * A local Zookeeper instance available for testing.
+ * try (InProcessZookeeper zk = new InProcessZookeeper) {
+ * // Run Tests...
+ * }
+ */
+public class InProcessZookeeper implements AutoCloseable {
+
+ private final TmpPath zkTmp;
+ private final NIOServerCnxnFactory zookeeper;
+ private final long zkPort;
+
+ public InProcessZookeeper() throws Exception {
+ zkTmp = new TmpPath();
+ @SuppressWarnings("unchecked")
+ List<Object> portAndHandle = Zookeeper.mkInprocessZookeeper(zkTmp.getPath(), null);
+ zkPort = (Long) portAndHandle.get(0);
+ zookeeper = (NIOServerCnxnFactory) portAndHandle.get(1);
+ }
+
+ /**
+ * @return the port ZK is listening on (localhost)
+ */
+ public long getPort() {
+ return zkPort;
+ }
+
+ @Override
+ public void close() throws Exception {
+ Zookeeper.shutdownInprocessZookeeper(zookeeper);
+ zkTmp.close();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/MkClusterParam.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/MkClusterParam.java b/storm-core/src/jvm/org/apache/storm/testing/MkClusterParam.java
index c3946fa..7e1e206 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/MkClusterParam.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/MkClusterParam.java
@@ -34,7 +34,7 @@ public class MkClusterParam {
/**
* cluster config
*/
- private Map daemonConf;
+ private Map<String, Object> daemonConf;
private Boolean nimbusDaemon;
@@ -53,10 +53,10 @@ public class MkClusterParam {
public void setPortsPerSupervisor(Integer portsPerSupervisor) {
this.portsPerSupervisor = portsPerSupervisor;
}
- public Map getDaemonConf() {
+ public Map<String, Object> getDaemonConf() {
return daemonConf;
}
- public void setDaemonConf(Map daemonConf) {
+ public void setDaemonConf(Map<String, Object> daemonConf) {
this.daemonConf = daemonConf;
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/MkTupleParam.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/MkTupleParam.java b/storm-core/src/jvm/org/apache/storm/testing/MkTupleParam.java
index ac3e1d3..93aba72 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/MkTupleParam.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/MkTupleParam.java
@@ -18,6 +18,7 @@
package org.apache.storm.testing;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
public class MkTupleParam {
@@ -42,10 +43,16 @@ public class MkTupleParam {
public List<String> getFields() {
return fields;
}
+
+ public void setFieldsList(List<String> fields) {
+ if (fields != null) {
+ this.fields = new ArrayList<>(fields);
+ } else {
+ this.fields = null;
+ }
+ }
+
public void setFields(String... fields) {
- this.fields = new ArrayList<String>();
- for (int i = 0; i < fields.length; i++) {
- this.fields.add(fields[i]);
- }
+ this.fields = new ArrayList<>(Arrays.asList(fields));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/MockLeaderElector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/MockLeaderElector.java b/storm-core/src/jvm/org/apache/storm/testing/MockLeaderElector.java
new file mode 100644
index 0000000..ade29a9
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/testing/MockLeaderElector.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.testing;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.nimbus.ILeaderElector;
+import org.apache.storm.nimbus.NimbusInfo;
+
+public class MockLeaderElector implements ILeaderElector {
+ private final boolean isLeader;
+ private final NimbusInfo leaderAddress;
+
+ public MockLeaderElector() {
+ this(true, "test-host", 9999);
+ }
+
+ public MockLeaderElector(boolean isLeader) {
+ this(isLeader, "test-host", 9999);
+ }
+
+ public MockLeaderElector(boolean isLeader, String host, int port) {
+ this.isLeader = isLeader;
+ this.leaderAddress = new NimbusInfo(host, port, true);
+ }
+
+ @Override
+ public void prepare(Map conf) {
+ //NOOP
+ }
+
+ @Override
+ public void addToLeaderLockQueue() throws Exception {
+ //NOOP
+ }
+
+ @Override
+ public void removeFromLeaderLockQueue() throws Exception {
+ //NOOP
+ }
+
+ @Override
+ public boolean isLeader() throws Exception {
+ return isLeader;
+ }
+
+ @Override
+ public NimbusInfo getLeader() {
+ return leaderAddress;
+ }
+
+ @Override
+ public List<NimbusInfo> getAllNimbuses() throws Exception {
+ return Arrays.asList(leaderAddress);
+ }
+
+ @Override
+ public void close() {
+ //NOOP
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/MockedSources.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/MockedSources.java b/storm-core/src/jvm/org/apache/storm/testing/MockedSources.java
index 1063ad2..61df4b6 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/MockedSources.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/MockedSources.java
@@ -31,6 +31,14 @@ public class MockedSources {
*/
private Map<String, List<FixedTuple>> data = new HashMap<String, List<FixedTuple>>();
+ public MockedSources() {
+ //Empty
+ }
+
+ public MockedSources(Map<String, List<FixedTuple>> data) {
+ this.data = new HashMap<>(data);
+ }
+
/**
* add mock data for the spout.
*
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/TestEventLogSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/TestEventLogSpout.java b/storm-core/src/jvm/org/apache/storm/testing/TestEventLogSpout.java
index 0e04a6a..3f310e3 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/TestEventLogSpout.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/TestEventLogSpout.java
@@ -34,7 +34,7 @@ import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
-public class TestEventLogSpout extends BaseRichSpout {
+public class TestEventLogSpout extends BaseRichSpout implements CompletableSpout {
public static Logger LOG = LoggerFactory.getLogger(TestEventLogSpout.class);
private static final Map<String, Integer> acked = new HashMap<String, Integer>();
@@ -136,4 +136,9 @@ public class TestEventLogSpout extends BaseRichSpout {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("source", "eventId"));
}
+
+ @Override
+ public boolean isExhausted() {
+ return completed();
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/TmpPath.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/TmpPath.java b/storm-core/src/jvm/org/apache/storm/testing/TmpPath.java
new file mode 100644
index 0000000..ccb1b2f
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/testing/TmpPath.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.testing;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TmpPath implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(TmpPath.class);
+
+ public static String localTempPath() {
+ StringBuilder ret = new StringBuilder().append(System.getProperty("java.io.tmpdir"));
+ if (!Utils.isOnWindows()) {
+ ret.append("/");
+ }
+ ret.append(Utils.uuid());
+ return ret.toString();
+ }
+
+ private final File path;
+
+ public TmpPath() {
+ this(localTempPath());
+ }
+
+ public TmpPath(String path) {
+ this.path = new File(path);
+ }
+
+ public String getPath() {
+ return path.getAbsolutePath();
+ }
+
+ public File getFile() {
+ return path;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (path.exists()) {
+ try {
+ FileUtils.forceDelete(path);
+ } catch (Exception e) {
+ //on windows, the host process still holds lock on the logfile
+ LOG.info(e.getMessage());
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/TrackedTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/TrackedTopology.java b/storm-core/src/jvm/org/apache/storm/testing/TrackedTopology.java
index ff3a32c..00bd1e7 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/TrackedTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/TrackedTopology.java
@@ -17,18 +17,119 @@
*/
package org.apache.storm.testing;
-import java.util.HashMap;
-import java.util.Map;
+import static org.apache.storm.Testing.whileTimeout;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.storm.ILocalCluster;
+import org.apache.storm.Testing;
+import org.apache.storm.Thrift;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
-import clojure.lang.Keyword;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.utils.RegisteredGlobalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class TrackedTopology extends HashMap{
- public TrackedTopology(Map map) {
- super(map);
+/**
+ * A tracked topology keeps metrics for every bolt and spout.
+ * This allows a test to know how many tuples have been fully processed.
+ * Metrics are tracked on a per cluster basis. So only one tracked topology
+ * should be run in a tracked cluster to avoid conflicts.
+ */
+public class TrackedTopology {
+ private static final Logger LOG = LoggerFactory.getLogger(TrackedTopology.class);
+ private final StormTopology topology;
+ private final AtomicInteger lastSpoutCommit;
+ private final ILocalCluster cluster;
+
+ /**
+ * Create a new topology to be tracked.
+ * @param origTopo the original topology.
+ * @param cluster a cluster that should have been launched with tracking enabled.
+ */
+ public TrackedTopology(StormTopology origTopo, ILocalCluster cluster) {
+ LOG.warn("CLUSTER {} - {}", cluster, cluster.getTrackedId());
+ this.cluster = cluster;
+ lastSpoutCommit = new AtomicInteger(0);
+ String id = cluster.getTrackedId();
+ topology = origTopo.deepCopy();
+ for (Bolt bolt: topology.get_bolts().values()) {
+ IRichBolt obj = (IRichBolt) Thrift.deserializeComponentObject(bolt.get_bolt_object());
+ bolt.set_bolt_object(Thrift.serializeComponentObject(new BoltTracker(obj, id)));
+ }
+ for (SpoutSpec spout: topology.get_spouts().values()) {
+ IRichSpout obj = (IRichSpout) Thrift.deserializeComponentObject(spout.get_spout_object());
+ spout.set_spout_object(Thrift.serializeComponentObject(new SpoutTracker(obj, id)));
+ }
}
+
+ public StormTopology getTopology() {
+ return topology;
+ }
- public StormTopology getTopology() {
- return (StormTopology)get(Keyword.intern("topology"));
+ public ILocalCluster getCluster() {
+ return cluster;
}
+
+ /**
+ * Wait for 1 tuple to be fully processed
+ */
+ public void trackedWait() {
+ trackedWait(1, Testing.TEST_TIMEOUT_MS);
+ }
+
+ /**
+ * Wait for amt tuples to be fully processed.
+ */
+ public void trackedWait(int amt) {
+ trackedWait(amt, Testing.TEST_TIMEOUT_MS);
+ }
+
+ /**
+ * Wait for amt tuples to be fully processed timeoutMs happens.
+ */
+ public void trackedWait(int amt, int timeoutMs) {
+ final int target = amt + lastSpoutCommit.get();
+ final String id = cluster.getTrackedId();
+ Random rand = ThreadLocalRandom.current();
+ whileTimeout(timeoutMs,
+ () -> {
+ int se = globalAmt(id, "spout-emitted");
+ int transferred = globalAmt(id, "transferred");
+ int processed = globalAmt(id, "processed");
+ LOG.info("emitted {} target {} transferred {} processed {}", se, target, transferred, processed);
+ return (target != se) || (transferred != processed);
+ },
+ () -> {
+ Time.advanceTimeSecs(1);
+ try {
+ Thread.sleep(rand.nextInt(200));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ lastSpoutCommit.set(target);
+ }
+
+ /**
+ * Read a metric from the tracked cluster (NOT JUST THIS TOPOLOGY)
+ * @param key one of "spout-emitted", "processed", or "transferred"
+ * @return the amount of that metric
+ */
+ public int globalAmt(String key) {
+ return globalAmt(cluster.getTrackedId(), key);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static int globalAmt(String id, String key) {
+ LOG.warn("Reading tracked metrics for ID {}", id);
+ return ((ConcurrentHashMap<String, AtomicInteger>)RegisteredGlobalState.getState(id)).get(key).get();
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index 40ede4c..a86036e 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -88,10 +88,10 @@ import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_STREAM_ID;
* conf.put(Config.TOPOLOGY_WORKERS, 4);
* conf.put(Config.TOPOLOGY_DEBUG, true);
*
- * LocalCluster cluster = new LocalCluster();
- * cluster.submitTopology("mytopology", conf, builder.createTopology());
- * Utils.sleep(10000);
- * cluster.shutdown();
+ * try (LocalCluster cluster = new LocalCluster();
+ * LocalTopology topo = cluster.submitTopology("mytopology", conf, builder.createTopology());){
+ * Utils.sleep(10000);
+ * }
* ```
*
* The pattern for `TopologyBuilder` is to map component ids to components using the setSpout
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Time.java b/storm-core/src/jvm/org/apache/storm/utils/Time.java
index 9b656ee..b6c48c1 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Time.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Time.java
@@ -26,36 +26,55 @@ import org.slf4j.LoggerFactory;
public class Time {
- public static final Logger LOG = LoggerFactory.getLogger(Time.class);
-
+ private static final Logger LOG = LoggerFactory.getLogger(Time.class);
private static AtomicBoolean simulating = new AtomicBoolean(false);
private static AtomicLong autoAdvanceOnSleep = new AtomicLong(0);
- //TODO: should probably use weak references here or something
private static volatile Map<Thread, AtomicLong> threadSleepTimes;
private static final Object sleepTimesLock = new Object();
+ private static AtomicLong simulatedCurrTimeMs;
- private static AtomicLong simulatedCurrTimeMs; //should this be a thread local that's allowed to keep advancing?
-
- public static void startSimulating() {
- synchronized(sleepTimesLock) {
- simulating.set(true);
- simulatedCurrTimeMs = new AtomicLong(0);
- threadSleepTimes = new ConcurrentHashMap<>();
+ public static class SimulatedTime implements AutoCloseable {
+
+ public SimulatedTime() {
+ this(null);
+ }
+
+ public SimulatedTime(Number ms) {
+ synchronized(Time.sleepTimesLock) {
+ Time.simulating.set(true);
+ Time.simulatedCurrTimeMs = new AtomicLong(0);
+ Time.threadSleepTimes = new ConcurrentHashMap<>();
+ if (ms != null) {
+ Time.autoAdvanceOnSleep.set(ms.longValue());
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ synchronized(Time.sleepTimesLock) {
+ Time.simulating.set(false);
+ Time.autoAdvanceOnSleep.set(0);
+ Time.threadSleepTimes = null;
+ }
}
}
- public static void startSimulatingAutoAdvanceOnSleep(long ms) {
- synchronized(sleepTimesLock) {
- startSimulating();
- autoAdvanceOnSleep.set(ms);
+ @Deprecated
+ public static void startSimulating() {
+ synchronized(Time.sleepTimesLock) {
+ Time.simulating.set(true);
+ Time.simulatedCurrTimeMs = new AtomicLong(0);
+ Time.threadSleepTimes = new ConcurrentHashMap<>();
}
}
+ @Deprecated
public static void stopSimulating() {
- synchronized(sleepTimesLock) {
- simulating.set(false);
- autoAdvanceOnSleep.set(0);
- threadSleepTimes = null;
+ synchronized(Time.sleepTimesLock) {
+ Time.simulating.set(false);
+ Time.autoAdvanceOnSleep.set(0);
+ Time.threadSleepTimes = null;
}
}
@@ -145,6 +164,10 @@ public class Time {
LOG.debug("Advanced simulated time to {}", newTime);
}
+ public static void advanceTimeSecs(long secs) {
+ advanceTime(secs * 1_000);
+ }
+
public static boolean isThreadWaiting(Thread t) {
if(!simulating.get()) throw new IllegalStateException("Must be in simulation mode");
AtomicLong time;
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index 925f81b..9c54aed 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -15,24 +15,26 @@
;; limitations under the License.
(ns integration.org.apache.storm.integration-test
(:use [clojure test])
- (:import [org.apache.storm Config Thrift])
+ (:import [org.apache.storm Testing LocalCluster$Builder Config Thrift])
(:import [org.apache.storm.topology TopologyBuilder])
(:import [org.apache.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions])
- (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount
+ (:import [org.apache.storm.testing TrackedTopology MockedSources TestWordCounter TestWordSpout TestGlobalCount FeederSpout CompleteTopologyParam
TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
(:import [org.apache.storm.utils Time])
(:import [org.apache.storm.tuple Fields])
(:import [org.apache.storm.cluster StormClusterStateImpl])
(:use [org.apache.storm.internal clojure])
- (:use [org.apache.storm testing config util])
+ (:use [org.apache.storm config util])
(:import [org.apache.storm Thrift])
(:import [org.apache.storm.utils Utils])
(:import [org.apache.storm.daemon StormCommon]))
(deftest test-basic-topology
(doseq [zmq-on? [true false]]
- (with-simulated-time-local-cluster [cluster :supervisors 4
- :daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}]
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withSupervisors 4)
+ (.withDaemonConf {STORM-LOCAL-MODE-ZMQ zmq-on?})))]
(let [topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestWordSpout. true) (Integer. 3))}
@@ -48,19 +50,20 @@
{(Utils/getGlobalStreamId "2" nil)
(Thrift/prepareGlobalGrouping)}
(TestAggregatesCounter.))})
- results (complete-topology cluster
+ results (Testing/completeTopology cluster
topology
- :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
- :storm-conf {TOPOLOGY-WORKERS 2
- TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE true})]
- (is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]]
- (read-tuples results "1")))
- (is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
- (read-tuples results "2")))
+ (doto (CompleteTopologyParam.)
+ (.setMockedSources (MockedSources. {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}))
+ (.setStormConf {TOPOLOGY-WORKERS 2
+ TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE true})))]
+ (is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
+ (Testing/readTuples results "1")))
+ (is (Testing/multiseteq [["nathan" (int 1)] ["nathan" (int 2)] ["bob" (int 1)] ["joey" (int 1)]]
+ (Testing/readTuples results "2")))
(is (= [[1] [2] [3] [4]]
- (read-tuples results "3")))
+ (Testing/readTuples results "3")))
(is (= [[1] [2] [3] [4]]
- (read-tuples results "4")))
+ (Testing/readTuples results "4")))
))))
(defbolt emit-task-id ["tid"] {:prepare true}
@@ -73,7 +76,9 @@
))))
(deftest test-multi-tasks-per-executor
- (with-simulated-time-local-cluster [cluster :supervisors 4]
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withSupervisors 4)))]
(let [topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails (TestWordSpout. true))}
{"2" (Thrift/prepareBoltDetails
@@ -82,11 +87,12 @@
emit-task-id
(Integer. 3)
{TOPOLOGY-TASKS 6})})
- results (complete-topology cluster
+ results (Testing/completeTopology cluster
topology
- :mock-sources {"1" [["a"]]})]
- (is (ms= [[0] [1] [2] [3] [4] [5]]
- (read-tuples results "2")))
+ (doto (CompleteTopologyParam.)
+ (.setMockedSources (MockedSources. {"1" [["a"]]}))))]
+ (is (Testing/multiseteq [[(int 0)] [(int 1)] [(int 2)] [(int 3)] [(int 4)] [(int 5)]]
+ (Testing/readTuples results "2")))
)))
(defbolt ack-every-other {} {:prepare true}
@@ -119,8 +125,11 @@
(assert-loop #(.isFailed tracker %) ids))
(deftest test-timeout
- (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
- (let [feeder (feeder-spout ["field1"])
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withSupervisors 4)
+ (.withDaemonConf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})))]
+ (let [feeder (FeederSpout. ["field1"])
tracker (AckFailMapTracker.)
_ (.setAckFailDelegate feeder tracker)
topology (Thrift/buildTopology
@@ -128,18 +137,18 @@
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareGlobalGrouping)} ack-every-other)})]
- (submit-local-topology (:nimbus cluster)
+ (.submitTopology cluster
"timeout-tester"
{TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
topology)
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(.feed feeder ["a"] 1)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
- (advance-cluster-time cluster 9)
+ (.advanceClusterTime cluster 9)
(assert-acked tracker 1 3)
(is (not (.isFailed tracker 2)))
- (advance-cluster-time cluster 12)
+ (.advanceClusterTime cluster 12)
(assert-failed tracker 2)
)))
@@ -158,8 +167,10 @@
)))))
(deftest test-reset-timeout
- (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
- (let [feeder (feeder-spout ["field1"])
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withDaemonConf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})))]
+ (let [feeder (FeederSpout. ["field1"])
tracker (AckFailMapTracker.)
_ (.setAckFailDelegate feeder tracker)
topology (Thrift/buildTopology
@@ -167,16 +178,16 @@
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareGlobalGrouping)} extend-timeout-twice)})]
- (submit-local-topology (:nimbus cluster)
+ (.submitTopology cluster
"timeout-tester"
{TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
topology)
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(.feed feeder ["a"] 1)
- (advance-cluster-time cluster 21)
+ (.advanceClusterTime cluster 21)
(is (not (.isFailed tracker 1)))
(is (not (.isAcked tracker 1)))
- (advance-cluster-time cluster 5)
+ (.advanceClusterTime cluster 5)
(assert-acked tracker 1)
)))
@@ -214,15 +225,18 @@
(defn try-complete-wc-topology [cluster topology]
(try (do
- (complete-topology cluster
+ (Testing/completeTopology cluster
topology
- :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
- :storm-conf {TOPOLOGY-WORKERS 2})
+ (doto (CompleteTopologyParam.)
+ (.setMockedSources (MockedSources. {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}))
+ (.setStormConf {TOPOLOGY-WORKERS 2})))
false)
(catch InvalidTopologyException e true)))
(deftest test-validate-topology-structure
- (with-simulated-time-local-cluster [cluster :supervisors 4]
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSupervisors 4)
+ (.withSimulatedTime)))]
(let [any-error1? (try-complete-wc-topology cluster (mk-validate-topology-1))
any-error2? (try-complete-wc-topology cluster (mk-invalidate-topology-1))
any-error3? (try-complete-wc-topology cluster (mk-invalidate-topology-2))
@@ -239,7 +253,8 @@
(deftest test-system-stream
;; this test works because mocking a spout splits up the tuples evenly among the tasks
- (with-simulated-time-local-cluster [cluster]
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)))]
(let [topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestWordSpout. true) (Integer. 3))}
@@ -249,17 +264,18 @@
(Utils/getGlobalStreamId "1" "__system")
(Thrift/prepareGlobalGrouping)}
identity-bolt (Integer. 1))})
- results (complete-topology cluster
+ results (Testing/completeTopology cluster
topology
- :mock-sources {"1" [["a"] ["b"] ["c"]]}
- :storm-conf {TOPOLOGY-WORKERS 2})]
- (is (ms= [["a"] ["b"] ["c"] ["startup"] ["startup"] ["startup"]]
- (read-tuples results "2")))
+ (doto (CompleteTopologyParam.)
+ (.setMockedSources (MockedSources. {"1" [["a"] ["b"] ["c"]]}))
+ (.setStormConf {TOPOLOGY-WORKERS 2})))]
+ (is (Testing/multiseteq [["a"] ["b"] ["c"] ["startup"] ["startup"] ["startup"]]
+ (Testing/readTuples results "2")))
)))
(defn ack-tracking-feeder [fields]
(let [tracker (AckTracker.)]
- [(doto (feeder-spout fields)
+ [(doto (FeederSpout. fields)
(.setAckFailDelegate tracker))
(fn [val]
(is (= (.getNumAcks tracker) val))
@@ -293,12 +309,11 @@
(ack! collector tuple))
(deftest test-acking
- (with-tracked-cluster [cluster]
+ (with-open [cluster (.build (.withSimulatedTime (.withTracked (LocalCluster$Builder. ))))]
(let [[feeder1 checker1] (ack-tracking-feeder ["num"])
[feeder2 checker2] (ack-tracking-feeder ["num"])
[feeder3 checker3] (ack-tracking-feeder ["num"])
- tracked (mk-tracked-topology
- cluster
+ tracked (TrackedTopology.
(Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails feeder1)
"2" (Thrift/prepareSpoutDetails feeder2)
@@ -331,42 +346,40 @@
{(Utils/getGlobalStreamId "8" nil)
(Thrift/prepareShuffleGrouping)}
ack-bolt)}
- ))]
- (submit-local-topology (:nimbus cluster)
- "acking-test1"
- {}
- (:topology tracked))
- (advance-cluster-time cluster 11)
+ )
+ cluster)]
+ (.submitTopology cluster
+ "acking-test1"
+ {}
+ tracked)
+ (.advanceClusterTime cluster 11)
(.feed feeder1 [1])
- (tracked-wait tracked 1)
+ (Testing/trackedWait tracked (int 1))
(checker1 0)
(.feed feeder2 [1])
- (tracked-wait tracked 1)
+ (Testing/trackedWait tracked (int 1))
(checker1 1)
(checker2 1)
(.feed feeder1 [1])
- (tracked-wait tracked 1)
+ (Testing/trackedWait tracked (int 1))
(checker1 0)
(.feed feeder1 [1])
- (tracked-wait tracked 1)
+ (Testing/trackedWait tracked (int 1))
(checker1 1)
(.feed feeder3 [1])
- (tracked-wait tracked 1)
+ (Testing/trackedWait tracked (int 1))
(checker1 0)
(checker3 0)
(.feed feeder2 [1])
- (tracked-wait tracked 1)
+ (Testing/trackedWait tracked (int 1))
(checker1 1)
(checker2 1)
- (checker3 1)
-
- )))
+ (checker3 1))))
(deftest test-ack-branching
- (with-tracked-cluster [cluster]
+ (with-open [cluster (.build (.withSimulatedTime (.withTracked (LocalCluster$Builder. ))))]
(let [[feeder checker] (ack-tracking-feeder ["num"])
- tracked (mk-tracked-topology
- cluster
+ tracked (TrackedTopology.
(Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails feeder)}
{"2" (Thrift/prepareBoltDetails
@@ -382,19 +395,19 @@
(Thrift/prepareShuffleGrouping)
(Utils/getGlobalStreamId "3" nil)
(Thrift/prepareShuffleGrouping)}
- (agg-bolt 4))}))]
- (submit-local-topology (:nimbus cluster)
- "test-acking2"
- {}
- (:topology tracked))
- (advance-cluster-time cluster 11)
+ (agg-bolt 4))})
+ cluster)]
+ (.submitTopology cluster
+ "test-acking2"
+ {}
+ tracked)
+ (.advanceClusterTime cluster 11)
(.feed feeder [1])
- (tracked-wait tracked 1)
+ (Testing/trackedWait tracked (int 1))
(checker 0)
(.feed feeder [1])
- (tracked-wait tracked 1)
- (checker 2)
- )))
+ (Testing/trackedWait tracked (int 1))
+ (checker 2))))
(defbolt dup-anchor ["num"]
[tuple collector]
@@ -418,8 +431,10 @@
)))
(deftest test-submit-inactive-topology
- (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
- (let [feeder (feeder-spout ["field1"])
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withDaemonConf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})))]
+ (let [feeder (FeederSpout. ["field1"])
tracker (AckFailMapTracker.)
_ (.setAckFailDelegate feeder tracker)
topology (Thrift/buildTopology
@@ -432,28 +447,27 @@
(reset! bolt-prepared? false)
(reset! spout-opened? false)
- (submit-local-topology-with-opts (:nimbus cluster)
+ (.submitTopologyWithOpts cluster
"test"
{TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
topology
(SubmitOptions. TopologyInitialStatus/INACTIVE))
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(.feed feeder ["a"] 1)
- (advance-cluster-time cluster 9)
+ (.advanceClusterTime cluster 9)
(is (not @bolt-prepared?))
(is (not @spout-opened?))
- (.activate (:nimbus cluster) "test")
+ (.activate (.getNimbus cluster) "test")
- (advance-cluster-time cluster 12)
+ (.advanceClusterTime cluster 12)
(assert-acked tracker 1)
(is @bolt-prepared?)
(is @spout-opened?))))
(deftest test-acking-self-anchor
- (with-tracked-cluster [cluster]
+ (with-open [cluster (.build (.withSimulatedTime (.withTracked (LocalCluster$Builder. ))))]
(let [[feeder checker] (ack-tracking-feeder ["num"])
- tracked (mk-tracked-topology
- cluster
+ tracked (TrackedTopology.
(Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails feeder)}
{"2" (Thrift/prepareBoltDetails
@@ -463,21 +477,21 @@
"3" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "2" nil)
(Thrift/prepareShuffleGrouping)}
- ack-bolt)}))]
- (submit-local-topology (:nimbus cluster)
- "test"
- {}
- (:topology tracked))
- (advance-cluster-time cluster 11)
+ ack-bolt)})
+ cluster)]
+ (.submitTopology cluster
+ "test"
+ {}
+ tracked)
+ (.advanceClusterTime cluster 11)
(.feed feeder [1])
- (tracked-wait tracked 1)
+ (Testing/trackedWait tracked (int 1))
(checker 1)
(.feed feeder [1])
(.feed feeder [1])
(.feed feeder [1])
- (tracked-wait tracked 3)
- (checker 3)
- )))
+ (Testing/trackedWait tracked (int 3))
+ (checker 3))))
(defspout IncSpout ["word"]
[conf context collector]
@@ -504,22 +518,25 @@
)))
(deftest test-kryo-decorators-config
- (with-simulated-time-local-cluster [cluster
- :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
- TOPOLOGY-KRYO-DECORATORS ["this-is-overriden"]}]
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withDaemonConf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
+ TOPOLOGY-KRYO-DECORATORS ["this-is-overriden"]})))]
(let [builder (TopologyBuilder.)
_ (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
_ (-> builder (.setBolt "2" (TestConfBolt. {TOPOLOGY-KRYO-DECORATORS ["one" "two"]})) (.shuffleGrouping "1"))
- results (complete-topology cluster
+ results (Testing/completeTopology cluster
(.createTopology builder)
- :storm-conf {TOPOLOGY-KRYO-DECORATORS ["one" "three"]}
- :mock-sources {"1" [[TOPOLOGY-KRYO-DECORATORS]]})]
+ (doto (CompleteTopologyParam.)
+ (.setStormConf {TOPOLOGY-KRYO-DECORATORS ["one" "three"]})
+ (.setMockedSources (MockedSources. {"1" [[TOPOLOGY-KRYO-DECORATORS]]}))))]
(is (= {"topology.kryo.decorators" (list "one" "two" "three")}
- (->> (read-tuples results "2") (apply concat) (apply hash-map)))))))
+ (->> (Testing/readTuples results "2") (apply concat) (apply hash-map)))))))
(deftest test-component-specific-config
- (with-simulated-time-local-cluster [cluster
- :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true}]
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withDaemonConf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true})))]
(let [builder (TopologyBuilder.)
_ (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
_ (-> builder
@@ -533,14 +550,15 @@
(.shuffleGrouping "1")
(.setMaxTaskParallelism (int 2))
(.addConfiguration "fake.config2" 987))
- results (complete-topology cluster
+ results (Testing/completeTopology cluster
(.createTopology builder)
- :storm-conf {TOPOLOGY-KRYO-REGISTER [{"fake.type" "good.serializer", "fake.type3" "a.serializer3"}]}
- :mock-sources {"1" [["fake.config"]
+ (doto (CompleteTopologyParam.)
+ (.setStormConf {TOPOLOGY-KRYO-REGISTER [{"fake.type" "good.serializer", "fake.type3" "a.serializer3"}]})
+ (.setMockedSources (MockedSources. {"1" [["fake.config"]
[TOPOLOGY-MAX-TASK-PARALLELISM]
[TOPOLOGY-MAX-SPOUT-PENDING]
["fake.config2"]
- [TOPOLOGY-KRYO-REGISTER]]})]
+ [TOPOLOGY-KRYO-REGISTER]]}))))]
(is (= {"fake.config" 123
"fake.config2" 987
TOPOLOGY-MAX-TASK-PARALLELISM 2
@@ -548,7 +566,7 @@
TOPOLOGY-KRYO-REGISTER {"fake.type" "good.serializer"
"fake.type2" "a.serializer"
"fake.type3" "a.serializer3"}}
- (->> (read-tuples results "2")
+ (->> (Testing/readTuples results "2")
(apply concat)
(apply hash-map)))))))
@@ -582,7 +600,8 @@
))))
(deftest test-hooks
- (with-simulated-time-local-cluster [cluster]
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)))]
(let [topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. (Fields. ["conf"])))}
@@ -590,18 +609,19 @@
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareShuffleGrouping)}
hooks-bolt)})
- results (complete-topology cluster
+ results (Testing/completeTopology cluster
topology
- :mock-sources {"1" [[1]
+ (doto (CompleteTopologyParam.)
+ (.setMockedSources (MockedSources. {"1" [[1]
[1]
[1]
[1]
- ]})]
+ ]}))))]
(is (= [[0 0 0 0]
[2 1 0 1]
[4 1 1 2]
[6 2 1 3]]
- (read-tuples results "2")
+ (Testing/readTuples results "2")
)))))
(defbolt report-errors-bolt {}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
index 342bba6..5b437c7 100644
--- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
@@ -15,7 +15,7 @@
;; limitations under the License.
(ns integration.org.apache.storm.testing4j-test
(:use [clojure.test])
- (:use [org.apache.storm config testing util])
+ (:use [org.apache.storm config util])
(:use [org.apache.storm.internal clojure])
(:require [integration.org.apache.storm.integration-test :as it])
(:require [org.apache.storm.internal.thrift :as thrift])
@@ -23,7 +23,7 @@
[org.apache.storm.generated GlobalStreamId])
(:import [org.apache.storm.tuple Values Tuple])
(:import [org.apache.storm.utils Time Utils])
- (:import [org.apache.storm.testing MkClusterParam TestJob MockedSources TestWordSpout
+ (:import [org.apache.storm.testing MkClusterParam TestJob MockedSources TestWordSpout FeederSpout
TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam
AckFailMapTracker MkTupleParam])
(:import [org.apache.storm.utils Utils])
@@ -45,8 +45,7 @@
(Testing/withLocalCluster mk-cluster-param (reify TestJob
(^void run [this ^ILocalCluster cluster]
(is (not (nil? cluster)))
- (is (not (nil? (.getState cluster))))
- (is (not (nil? (:nimbus (.getState cluster))))))))))
+ (is (not (nil? (.getNimbus cluster)))))))))
(deftest test-with-simulated-time-local-cluster
(let [mk-cluster-param (doto (MkClusterParam.)
@@ -58,8 +57,7 @@
(Testing/withSimulatedTimeLocalCluster mk-cluster-param (reify TestJob
(^void run [this ^ILocalCluster cluster]
(is (not (nil? cluster)))
- (is (not (nil? (.getState cluster))))
- (is (not (nil? (:nimbus (.getState cluster)))))
+ (is (not (nil? (.getNimbus cluster))))
(is (Time/isSimulating)))))
(is (not (Time/isSimulating)))))
@@ -96,8 +94,8 @@
complete-topology-param)]
(is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
(Testing/readTuples results "1")))
- (is (Testing/multiseteq [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
- (read-tuples results "2")))
+ (is (Testing/multiseteq [["nathan" (int 1)] ["nathan" (int 2)] ["bob" (int 1)] ["joey" (int 1)]]
+ (Testing/readTuples results "2")))
(is (= [[1] [2] [3] [4]]
(Testing/readTuples results "3")))
(is (= [[1] [2] [3] [4]]
@@ -143,7 +141,7 @@
"test-acking2"
(Config.)
(.getTopology tracked))
- (advance-cluster-time (.getState cluster) 11)
+ (.advanceClusterTime cluster (int 11))
(.feed feeder [1])
(Testing/trackedWait tracked (int 1))
(checker 0)
@@ -161,7 +159,7 @@
mk-cluster-param
(reify TestJob
(^void run [this ^ILocalCluster cluster]
- (let [feeder (feeder-spout ["field1"])
+ (let [feeder (FeederSpout. ["field1"])
tracker (AckFailMapTracker.)
_ (.setAckFailDelegate feeder tracker)
topology (Thrift/buildTopology
@@ -195,7 +193,7 @@
mk-cluster-param
(reify TestJob
(^void run [this ^ILocalCluster cluster]
- (let [feeder (feeder-spout ["field1"])
+ (let [feeder (FeederSpout. ["field1"])
tracker (AckFailMapTracker.)
_ (.setAckFailDelegate feeder tracker)
topology (Thrift/buildTopology
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
index c55903e..d55009b 100644
--- a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
@@ -15,18 +15,23 @@
;; limitations under the License.
(ns integration.org.apache.storm.trident.integration-test
(:use [clojure test])
- (:require [org.apache.storm [testing :as t]])
+ (:import [org.apache.storm Testing LocalCluster$Builder LocalCluster LocalDRPC])
(:import [org.apache.storm.trident.testing Split CountAsAggregator StringLength TrueFilter
- MemoryMapState$Factory])
+ MemoryMapState$Factory FeederCommitterBatchSpout FeederBatchSpout])
(:import [org.apache.storm.trident.state StateSpec])
- (:import [org.apache.storm.trident.operation.impl CombinerAggStateUpdater]
+ (:import [org.apache.storm.trident TridentTopology]
+ [org.apache.storm.trident.operation.impl CombinerAggStateUpdater]
[org.apache.storm.trident.operation BaseFunction]
+ [org.apache.storm.trident.operation.builtin Count Sum Equals MapGet Debug FilterNull FirstN TupleCollectionGet]
+ [org.apache.storm.tuple Fields]
[org.json.simple.parser JSONParser]
+ [org.json.simple JSONValue]
[org.apache.storm Config])
- (:use [org.apache.storm.trident testing]
- [org.apache.storm log util config]))
+ (:use [org.apache.storm log util config]))
-(bootstrap-imports)
+(defn exec-drpc [^LocalDRPC drpc function-name args]
+ (if-let [res (.execute drpc function-name args)]
+ (clojurify-structure (JSONValue/parse res))))
(defmacro letlocals
[& body]
@@ -41,58 +46,58 @@
~(first lexpr))))
(deftest test-memory-map-get-tuples
- (t/with-local-cluster [cluster]
- (with-drpc [drpc]
+ (with-open [cluster (LocalCluster. )]
+ (with-open [drpc (LocalDRPC.)]
(letlocals
(bind topo (TridentTopology.))
- (bind feeder (feeder-spout ["sentence"]))
+ (bind feeder (FeederBatchSpout. ["sentence"]))
(bind word-counts
(-> topo
(.newStream "tester" feeder)
- (.each (fields "sentence") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+ (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+ (.groupBy (Fields. ["word"]))
+ (.persistentAggregate (MemoryMapState$Factory.) (Count.) (Fields. ["count"]))
(.parallelismHint 6)
))
(-> topo
(.newDRPCStream "all-tuples" drpc)
(.broadcast)
- (.stateQuery word-counts (fields "args") (TupleCollectionGet.) (fields "word" "count"))
- (.project (fields "word" "count")))
- (with-topology [cluster topo storm-topo]
- (feed feeder [["hello the man said"] ["the"]])
+ (.stateQuery word-counts (Fields. ["args"]) (TupleCollectionGet.) (Fields. ["word" "count"]))
+ (.project (Fields. ["word" "count"])))
+ (with-open [storm-topo (.submitTopology cluster "testing" {} (.build topo))]
+ (.feed feeder [["hello the man said"] ["the"]])
(is (= #{["hello" 1] ["said" 1] ["the" 2] ["man" 1]}
(into #{} (exec-drpc drpc "all-tuples" "man"))))
- (feed feeder [["the foo"]])
+ (.feed feeder [["the foo"]])
(is (= #{["hello" 1] ["said" 1] ["the" 3] ["man" 1] ["foo" 1]}
(into #{} (exec-drpc drpc "all-tuples" "man")))))))))
(deftest test-word-count
- (t/with-local-cluster [cluster]
- (with-drpc [drpc]
+ (with-open [cluster (LocalCluster. )]
+ (with-open [drpc (LocalDRPC.)]
(letlocals
(bind topo (TridentTopology.))
- (bind feeder (feeder-spout ["sentence"]))
+ (bind feeder (FeederBatchSpout. ["sentence"]))
(bind word-counts
(-> topo
(.newStream "tester" feeder)
- (.each (fields "sentence") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+ (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+ (.groupBy (Fields. ["word"]))
+ (.persistentAggregate (MemoryMapState$Factory.) (Count.) (Fields. ["count"]))
(.parallelismHint 6)
))
(-> topo
(.newDRPCStream "words" drpc)
- (.each (fields "args") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
- (.aggregate (fields "count") (Sum.) (fields "sum"))
- (.project (fields "sum")))
- (with-topology [cluster topo storm-topo]
- (feed feeder [["hello the man said"] ["the"]])
+ (.each (Fields. ["args"]) (Split.) (Fields. ["word"]))
+ (.groupBy (Fields. ["word"]))
+ (.stateQuery word-counts (Fields. ["word"]) (MapGet.) (Fields. ["count"]))
+ (.aggregate (Fields. ["count"]) (Sum.) (Fields. ["sum"]))
+ (.project (Fields. ["sum"])))
+ (with-open [storm-topo (.submitTopology cluster "testing" {} (.build topo))]
+ (.feed feeder [["hello the man said"] ["the"]])
(is (= [[2]] (exec-drpc drpc "words" "the")))
(is (= [[1]] (exec-drpc drpc "words" "hello")))
- (feed feeder [["the man on the moon"] ["where are you"]])
+ (.feed feeder [["the man on the moon"] ["where are you"]])
(is (= [[4]] (exec-drpc drpc "words" "the")))
(is (= [[2]] (exec-drpc drpc "words" "man")))
(is (= [[8]] (exec-drpc drpc "words" "man where you the")))
@@ -101,56 +106,56 @@
;; this test reproduces a bug where committer spouts freeze processing when
;; there's at least one repartitioning after the spout
(deftest test-word-count-committer-spout
- (t/with-local-cluster [cluster]
- (with-drpc [drpc]
+ (with-open [cluster (LocalCluster. )]
+ (with-open [drpc (LocalDRPC.)]
(letlocals
(bind topo (TridentTopology.))
- (bind feeder (feeder-committer-spout ["sentence"]))
+ (bind feeder (FeederCommitterBatchSpout. ["sentence"]))
(.setWaitToEmit feeder false) ;;this causes lots of empty batches
(bind word-counts
(-> topo
(.newStream "tester" feeder)
(.parallelismHint 2)
- (.each (fields "sentence") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+ (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+ (.groupBy (Fields. ["word"]))
+ (.persistentAggregate (MemoryMapState$Factory.) (Count.) (Fields. ["count"]))
(.parallelismHint 6)
))
(-> topo
(.newDRPCStream "words" drpc)
- (.each (fields "args") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
- (.aggregate (fields "count") (Sum.) (fields "sum"))
- (.project (fields "sum")))
- (with-topology [cluster topo storm-topo]
- (feed feeder [["hello the man said"] ["the"]])
+ (.each (Fields. ["args"]) (Split.) (Fields. ["word"]))
+ (.groupBy (Fields. ["word"]))
+ (.stateQuery word-counts (Fields. ["word"]) (MapGet.) (Fields. ["count"]))
+ (.aggregate (Fields. ["count"]) (Sum.) (Fields. ["sum"]))
+ (.project (Fields. ["sum"])))
+ (with-open [storm-topo (.submitTopology cluster "testing" {} (.build topo))]
+ (.feed feeder [["hello the man said"] ["the"]])
(is (= [[2]] (exec-drpc drpc "words" "the")))
(is (= [[1]] (exec-drpc drpc "words" "hello")))
(Thread/sleep 1000) ;; this is necessary to reproduce the bug where committer spouts freeze processing
- (feed feeder [["the man on the moon"] ["where are you"]])
+ (.feed feeder [["the man on the moon"] ["where are you"]])
(is (= [[4]] (exec-drpc drpc "words" "the")))
(is (= [[2]] (exec-drpc drpc "words" "man")))
(is (= [[8]] (exec-drpc drpc "words" "man where you the")))
- (feed feeder [["the the"]])
+ (.feed feeder [["the the"]])
(is (= [[6]] (exec-drpc drpc "words" "the")))
- (feed feeder [["the"]])
+ (.feed feeder [["the"]])
(is (= [[7]] (exec-drpc drpc "words" "the")))
)))))
(deftest test-count-agg
- (t/with-local-cluster [cluster]
- (with-drpc [drpc]
+ (with-open [cluster (LocalCluster. )]
+ (with-open [drpc (LocalDRPC.)]
(letlocals
(bind topo (TridentTopology.))
(-> topo
(.newDRPCStream "numwords" drpc)
- (.each (fields "args") (Split.) (fields "word"))
- (.aggregate (CountAsAggregator.) (fields "count"))
+ (.each (Fields. ["args"]) (Split.) (Fields. ["word"]))
+ (.aggregate (CountAsAggregator.) (Fields. ["count"]))
(.parallelismHint 2) ;;this makes sure batchGlobal is working correctly
- (.project (fields "count")))
- (with-topology [cluster topo storm-topo]
+ (.project (Fields. ["count"])))
+ (with-open [storm-topo (.submitTopology cluster "testing" {} (.build topo))]
(doseq [i (range 100)]
(is (= [[1]] (exec-drpc drpc "numwords" "the"))))
(is (= [[0]] (exec-drpc drpc "numwords" "")))
@@ -158,76 +163,76 @@
)))))
(deftest test-split-merge
- (t/with-local-cluster [cluster]
- (with-drpc [drpc]
+ (with-open [cluster (LocalCluster. )]
+ (with-open [drpc (LocalDRPC.)]
(letlocals
(bind topo (TridentTopology.))
(bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
(bind s1
(-> drpc-stream
- (.each (fields "args") (Split.) (fields "word"))
- (.project (fields "word"))))
+ (.each (Fields. ["args"]) (Split.) (Fields. ["word"]))
+ (.project (Fields. ["word"]))))
(bind s2
(-> drpc-stream
- (.each (fields "args") (StringLength.) (fields "len"))
- (.project (fields "len"))))
+ (.each (Fields. ["args"]) (StringLength.) (Fields. ["len"]))
+ (.project (Fields. ["len"]))))
(.merge topo [s1 s2])
- (with-topology [cluster topo storm-topo]
- (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
- (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
+ (with-open [storm-topo (.submitTopology cluster "testing" {} (.build topo))]
+ (is (Testing/multiseteq [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
+ (is (Testing/multiseteq [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
)))))
(deftest test-multiple-groupings-same-stream
- (t/with-local-cluster [cluster]
- (with-drpc [drpc]
+ (with-open [cluster (LocalCluster. )]
+ (with-open [drpc (LocalDRPC.)]
(letlocals
(bind topo (TridentTopology.))
(bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
- (.each (fields "args") (TrueFilter.))))
+ (.each (Fields. ["args"]) (TrueFilter.))))
(bind s1
(-> drpc-stream
- (.groupBy (fields "args"))
- (.aggregate (CountAsAggregator.) (fields "count"))))
+ (.groupBy (Fields. ["args"]))
+ (.aggregate (CountAsAggregator.) (Fields. ["count"]))))
(bind s2
(-> drpc-stream
- (.groupBy (fields "args"))
- (.aggregate (CountAsAggregator.) (fields "count"))))
+ (.groupBy (Fields. ["args"]))
+ (.aggregate (CountAsAggregator.) (Fields. ["count"]))))
(.merge topo [s1 s2])
- (with-topology [cluster topo storm-topo]
- (is (t/ms= [["the" 1] ["the" 1]] (exec-drpc drpc "tester" "the")))
- (is (t/ms= [["aaaaa" 1] ["aaaaa" 1]] (exec-drpc drpc "tester" "aaaaa")))
+ (with-open [storm-topo (.submitTopology cluster "testing" {} (.build topo))]
+ (is (Testing/multiseteq [["the" 1] ["the" 1]] (exec-drpc drpc "tester" "the")))
+ (is (Testing/multiseteq [["aaaaa" 1] ["aaaaa" 1]] (exec-drpc drpc "tester" "aaaaa")))
)))))
(deftest test-multi-repartition
- (t/with-local-cluster [cluster]
- (with-drpc [drpc]
+ (with-open [cluster (LocalCluster. )]
+ (with-open [drpc (LocalDRPC.)]
(letlocals
(bind topo (TridentTopology.))
(bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
- (.each (fields "args") (Split.) (fields "word"))
+ (.each (Fields. ["args"]) (Split.) (Fields. ["word"]))
(.localOrShuffle)
(.shuffle)
- (.aggregate (CountAsAggregator.) (fields "count"))
+ (.aggregate (CountAsAggregator.) (Fields. ["count"]))
))
- (with-topology [cluster topo storm-topo]
- (is (t/ms= [[2]] (exec-drpc drpc "tester" "the man")))
- (is (t/ms= [[1]] (exec-drpc drpc "tester" "aaa")))
+ (with-open [storm-topo (.submitTopology cluster "testing" {} (.build topo))]
+ (is (Testing/multiseteq [[2]] (exec-drpc drpc "tester" "the man")))
+ (is (Testing/multiseteq [[1]] (exec-drpc drpc "tester" "aaa")))
)))))
(deftest test-stream-projection-validation
- (t/with-local-cluster [cluster]
+ (with-open [cluster (LocalCluster. )]
(letlocals
- (bind feeder (feeder-committer-spout ["sentence"]))
+ (bind feeder (FeederCommitterBatchSpout. ["sentence"]))
(bind topo (TridentTopology.))
;; valid projection fields will not throw exceptions
(bind word-counts
(-> topo
(.newStream "tester" feeder)
- (.each (fields "sentence") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+ (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+ (.groupBy (Fields. ["word"]))
+ (.persistentAggregate (MemoryMapState$Factory.) (Count.) (Fields. ["count"]))
(.parallelismHint 6)
))
(bind stream (-> topo
@@ -235,63 +240,63 @@
;; test .each
(is (thrown? IllegalArgumentException
(-> stream
- (.each (fields "sentence1") (Split.) (fields "word")))))
+ (.each (Fields. ["sentence1"]) (Split.) (Fields. ["word"])))))
;; test .groupBy
(is (thrown? IllegalArgumentException
(-> stream
- (.each (fields "sentence") (Split.) (fields "word"))
- (.groupBy (fields "word1")))))
+ (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+ (.groupBy (Fields. ["word1"])))))
;; test .aggregate
(is (thrown? IllegalArgumentException
(-> stream
- (.each (fields "sentence") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.aggregate (fields "word1") (Count.) (fields "count")))))
+ (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+ (.groupBy (Fields. ["word"]))
+ (.aggregate (Fields. ["word1"]) (Count.) (Fields. ["count"])))))
;; test .project
(is (thrown? IllegalArgumentException
(-> stream
- (.project (fields "sentence1")))))
+ (.project (Fields. ["sentence1"])))))
;; test .partitionBy
(is (thrown? IllegalArgumentException
(-> stream
- (.partitionBy (fields "sentence1")))))
+ (.partitionBy (Fields. ["sentence1"])))))
;; test .partitionAggregate
(is (thrown? IllegalArgumentException
(-> stream
- (.each (fields "sentence") (Split.) (fields "word"))
- (.partitionAggregate (fields "word1") (Count.) (fields "count")))))
+ (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+ (.partitionAggregate (Fields. ["word1"]) (Count.) (Fields. ["count"])))))
;; test .persistentAggregate
(is (thrown? IllegalArgumentException
(-> stream
- (.each (fields "sentence") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.persistentAggregate (StateSpec. (MemoryMapState$Factory.)) (fields "non-existent") (Count.) (fields "count")))))
+ (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+ (.groupBy (Fields. ["word"]))
+ (.persistentAggregate (StateSpec. (MemoryMapState$Factory.)) (Fields. ["non-existent"]) (Count.) (Fields. ["count"])))))
;; test .partitionPersist
(is (thrown? IllegalArgumentException
(-> stream
- (.each (fields "sentence") (Split.) (fields "word"))
- (.groupBy (fields "word"))
+ (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+ (.groupBy (Fields. ["word"]))
(.partitionPersist (StateSpec. (MemoryMapState$Factory.))
- (fields "non-existent")
+ (Fields. ["non-existent"])
(CombinerAggStateUpdater. (Count.))
- (fields "count")))))
+ (Fields. ["count"])))))
;; test .stateQuery
- (with-drpc [drpc]
+ (with-open [drpc (LocalDRPC.)]
(is (thrown? IllegalArgumentException
(-> topo
(.newDRPCStream "words" drpc)
- (.each (fields "args") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.stateQuery word-counts (fields "word1") (MapGet.) (fields "count"))))))
+ (.each (Fields. ["args"]) (Split.) (Fields. ["word"]))
+ (.groupBy (Fields. ["word"]))
+ (.stateQuery word-counts (Fields. ["word1"]) (MapGet.) (Fields. ["count"]))))))
)))
(deftest test-set-component-resources
- (t/with-local-cluster [cluster]
- (with-drpc [drpc]
+ (with-open [cluster (LocalCluster. )]
+ (with-open [drpc (LocalDRPC.)]
(letlocals
(bind topo (TridentTopology.))
- (bind feeder (feeder-spout ["sentence"]))
+ (bind feeder (FeederBatchSpout. ["sentence"]))
(bind add-bang (proxy [BaseFunction] []
(execute [tuple collector]
(. collector emit (str (. tuple getString 0) "!")))))
@@ -304,18 +309,18 @@
(parallelismHint 5)
(setCPULoad 20)
(setMemoryLoad 512 256)
- (each (fields "sentence") (Split.) (fields "word"))
+ (each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
(setCPULoad 10)
(setMemoryLoad 512)
- (each (fields "word") add-bang (fields "word!"))
+ (each (Fields. ["word"]) add-bang (Fields. ["word!"]))
(parallelismHint 10)
(setCPULoad 50)
(setMemoryLoad 1024)
- (groupBy (fields "word!"))
- (persistentAggregate (memory-map-state) (Count.) (fields "count"))
+ (groupBy (Fields. ["word!"]))
+ (persistentAggregate (MemoryMapState$Factory.) (Count.) (Fields. ["count"]))
(setCPULoad 100)
(setMemoryLoad 2048)))
- (with-topology [cluster topo storm-topo]
+ (with-open [storm-topo (.submitTopology cluster "testing" {} (.build topo))]
(let [parse-fn (fn [[k v]]
[k (clojurify-structure (. (JSONParser.) parse (.. v get_common get_json_conf)))])
@@ -364,23 +369,4 @@
(get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
100.0)))))))))
-;; (deftest test-split-merge
-;; (t/with-local-cluster [cluster]
-;; (with-drpc [drpc]
-;; (letlocals
-;; (bind topo (TridentTopology.))
-;; (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
-;; (bind s1
-;; (-> drpc-stream
-;; (.each (fields "args") (Split.) (fields "word"))
-;; (.project (fields "word"))))
-;; (bind s2
-;; (-> drpc-stream
-;; (.each (fields "args") (StringLength.) (fields "len"))
-;; (.project (fields "len"))))
-;;
-;; (.merge topo [s1 s2])
-;; (with-topology [cluster topo]
-;; (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
-;; (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
-;; )))))
+
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/clojure_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/clojure_test.clj b/storm-core/test/clj/org/apache/storm/clojure_test.clj
deleted file mode 100644
index 13fdeb7..0000000
--- a/storm-core/test/clj/org/apache/storm/clojure_test.clj
+++ /dev/null
@@ -1,159 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.clojure-test
- (:use [clojure test])
- (:import [org.apache.storm.testing TestWordSpout TestPlannerSpout]
- [org.apache.storm.tuple Fields])
- (:use [org.apache.storm testing config])
- (:use [org.apache.storm.internal clojure])
- (:use [org.apache.storm.daemon common])
- (:require [org.apache.storm.internal [thrift :as thrift]])
- (:import [org.apache.storm Thrift])
- (:import [org.apache.storm.utils Utils]))
-
-(defbolt lalala-bolt1 ["word"] [[val :as tuple] collector]
- (let [ret (str val "lalala")]
- (emit-bolt! collector [ret] :anchor tuple)
- (ack! collector tuple)
- ))
-
-(defbolt lalala-bolt2 ["word"] {:prepare true}
- [conf context collector]
- (let [state (atom nil)]
- (reset! state "lalala")
- (bolt
- (execute [tuple]
- (let [ret (-> (.getValue tuple 0) (str @state))]
- (emit-bolt! collector [ret] :anchor tuple)
- (ack! collector tuple)
- ))
- )))
-
-(defbolt lalala-bolt3 ["word"] {:prepare true :params [prefix]}
- [conf context collector]
- (let [state (atom nil)]
- (bolt
- (prepare [_ _ _]
- (reset! state (str prefix "lalala")))
- (execute [{val "word" :as tuple}]
- (let [ret (-> (.getValue tuple 0) (str @state))]
- (emit-bolt! collector [ret] :anchor tuple)
- (ack! collector tuple)
- )))
- ))
-
-(deftest test-clojure-bolt
- (with-simulated-time-local-cluster [cluster :supervisors 4]
- (let [nimbus (:nimbus cluster)
- topology (Thrift/buildTopology
- {"1" (Thrift/prepareSpoutDetails (TestWordSpout. false))}
- {"2" (Thrift/prepareBoltDetails
- {(Utils/getGlobalStreamId "1" nil)
- (Thrift/prepareShuffleGrouping)}
- lalala-bolt1)
- "3" (Thrift/prepareBoltDetails
- {(Utils/getGlobalStreamId "1" nil)
- (Thrift/prepareLocalOrShuffleGrouping)}
- lalala-bolt2)
- "4" (Thrift/prepareBoltDetails
- {(Utils/getGlobalStreamId "1" nil)
- (Thrift/prepareShuffleGrouping)}
- (lalala-bolt3 "_nathan_"))}
- )
- results (complete-topology cluster
- topology
- :mock-sources {"1" [["david"]
- ["adam"]
- ]}
- )]
- (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "2")))
- (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "3")))
- (is (ms= [["david_nathan_lalala"] ["adam_nathan_lalala"]] (read-tuples results "4")))
- )))
-
-(defbolt punctuator-bolt ["word" "period" "question" "exclamation"]
- [tuple collector]
- (if (= (:word tuple) "bar")
- (do
- (emit-bolt! collector {:word "bar" :period "bar" :question "bar"
- "exclamation" "bar"})
- (ack! collector tuple))
- (let [ res (assoc tuple :period (str (:word tuple) "."))
- res (assoc res :exclamation (str (:word tuple) "!"))
- res (assoc res :question (str (:word tuple) "?")) ]
- (emit-bolt! collector res)
- (ack! collector tuple))))
-
-(deftest test-map-emit
- (with-simulated-time-local-cluster [cluster :supervisors 4]
- (let [topology (Thrift/buildTopology
- {"words" (Thrift/prepareSpoutDetails (TestWordSpout. false))}
- {"out" (Thrift/prepareBoltDetails
- {(Utils/getGlobalStreamId "words" nil)
- (Thrift/prepareShuffleGrouping)}
- punctuator-bolt)})
- results (complete-topology cluster
- topology
- :mock-sources {"words" [["foo"] ["bar"]]}
- )]
- (is (ms= [["foo" "foo." "foo?" "foo!"]
- ["bar" "bar" "bar" "bar"]] (read-tuples results "out"))))))
-
-(defbolt conf-query-bolt ["conf" "val"] {:prepare true :params [conf] :conf conf}
- [conf context collector]
- (bolt
- (execute [tuple]
- (let [name (.getValue tuple 0)
- val (if (= name "!MAX_MSG_TIMEOUT") (.maxTopologyMessageTimeout context) (get conf name))]
- (emit-bolt! collector [name val] :anchor tuple)
- (ack! collector tuple))
- )))
-
-(deftest test-component-specific-config-clojure
- (with-simulated-time-local-cluster [cluster]
- (let [topology (Thrift/buildTopology
- {"1" (Thrift/prepareSpoutDetails
- (TestPlannerSpout. (Fields. ["conf"]))
- nil
- {TOPOLOGY-MESSAGE-TIMEOUT-SECS 40})}
- {"2" (Thrift/prepareBoltDetails
- {(Utils/getGlobalStreamId "1" nil)
- (Thrift/prepareShuffleGrouping)}
- (conf-query-bolt {"fake.config" 1
- TOPOLOGY-MAX-TASK-PARALLELISM 2
- TOPOLOGY-MAX-SPOUT-PENDING 10})
- nil
- {TOPOLOGY-MAX-SPOUT-PENDING 3})})
- results (complete-topology cluster
- topology
- :topology-name "test123"
- :storm-conf {TOPOLOGY-MAX-TASK-PARALLELISM 10
- TOPOLOGY-MESSAGE-TIMEOUT-SECS 30}
- :mock-sources {"1" [["fake.config"]
- [TOPOLOGY-MAX-TASK-PARALLELISM]
- [TOPOLOGY-MAX-SPOUT-PENDING]
- ["!MAX_MSG_TIMEOUT"]
- [TOPOLOGY-NAME]
- ]})]
- (is (= {"fake.config" 1
- TOPOLOGY-MAX-TASK-PARALLELISM 2
- TOPOLOGY-MAX-SPOUT-PENDING 3
- "!MAX_MSG_TIMEOUT" 40
- TOPOLOGY-NAME "test123"}
- (->> (read-tuples results "2")
- (apply concat)
- (apply hash-map))
- )))))