You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/21 18:00:21 UTC
[1/3] storm git commit: STORM-2748: Fix TickTupleTest to actually
test something
Repository: storm
Updated Branches:
refs/heads/master 50d55a951 -> 66ff5fd94
STORM-2748: Fix TickTupleTest to actually test something
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/54f0bf22
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/54f0bf22
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/54f0bf22
Branch: refs/heads/master
Commit: 54f0bf22fdcc7e68118ed5184aa8c1f65678e218
Parents: 50d55a9
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Sep 20 10:42:26 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Sep 20 12:06:44 2017 -0500
----------------------------------------------------------------------
.../java/org/apache/storm/TickTupleTest.java | 118 ++++++++++++-------
storm-server/src/test/resources/log4j2.xml | 6 +-
2 files changed, 75 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/54f0bf22/storm-server/src/test/java/org/apache/storm/TickTupleTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/TickTupleTest.java b/storm-server/src/test/java/org/apache/storm/TickTupleTest.java
index 2ee9a0c..daa706b 100644
--- a/storm-server/src/test/java/org/apache/storm/TickTupleTest.java
+++ b/storm-server/src/test/java/org/apache/storm/TickTupleTest.java
@@ -18,85 +18,113 @@
package org.apache.storm;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.ILocalCluster.ILocalTopology;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.Utils;
-import org.junit.Assert;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.TupleUtils;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.Map;
+import static org.junit.Assert.*;
public class TickTupleTest {
+ private final static Logger LOG = LoggerFactory.getLogger(TickTupleTest.class);
+ private static LinkedBlockingQueue<Long> tickTupleTimes = new LinkedBlockingQueue<>();
+ private static AtomicReference<Tuple> nonTickTuple = new AtomicReference<>(null);
@Test
public void testTickTupleWorksWithSystemBolt() throws Exception {
- ILocalCluster cluster = null;
- try {
- cluster = new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build();
+ try (ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build()){
StormTopology topology = createNoOpTopology();
Config topoConf = new Config();
- topoConf.putAll(Utils.readDefaultConfig());
- topoConf.put("storm.cluster.mode", "local");
topoConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
- cluster.submitTopology("test", topoConf, topology);
- cluster.advanceClusterTime(2);
- Assert.assertTrue("Test is passed", true);
- } finally {
- cluster.close();
+ try (ILocalTopology topo = cluster.submitTopology("test", topoConf, topology)) {
+ //Give the cluster some time to come up
+ long time = 0;
+ while (tickTupleTimes.size() <= 0) {
+ assert time <= 100_000 : "took over " + time + " ms of simulated time to get a message back...";
+ cluster.advanceClusterTime(10);
+ time += 10_000;
+ }
+ tickTupleTimes.clear();
+ cluster.advanceClusterTime(1);
+ time += 1000;
+ assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+ cluster.advanceClusterTime(1);
+ time += 1000;
+ assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+ cluster.advanceClusterTime(1);
+ time += 1000;
+ assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+ cluster.advanceClusterTime(1);
+ time += 1000;
+ assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+ cluster.advanceClusterTime(1);
+ time += 1000;
+ assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+ }
+ assertNull("The bolt got a tuple that is not a tick tuple " + nonTickTuple.get(), nonTickTuple.get());
}
-
}
- private IRichSpout makeNoOpSpout() {
- return new BaseRichSpout() {
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("tuple"));
- }
-
- @Override
- public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
- }
+ private static class NoopSpout extends BaseRichSpout {
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("tuple"));
+ }
- @Override
- public void nextTuple() {
- }
+ @Override
+ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
+ }
- private void writeObject(java.io.ObjectOutputStream stream) {
- }
- };
+ @Override
+ public void nextTuple() {
+ }
}
- private BaseRichBolt makeNoOpBolt() {
- return new BaseRichBolt() {
- @Override
- public void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector outputCollector) {}
- @Override
- public void execute(Tuple tuple) {}
+ private static class NoopBolt extends BaseRichBolt {
+ @Override
+ public void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector outputCollector) {}
- @Override
- public void cleanup() { }
+ @Override
+ public void execute(Tuple tuple) {
+ LOG.info("GOT {}", tuple);
+ if (TupleUtils.isTick(tuple)) {
+ try {
+ tickTupleTimes.put(Time.currentTimeMillis());
+ } catch (InterruptedException e) {
+ //Ignored
+ }
+ } else {
+ nonTickTuple.set(tuple);
+ }
+ }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer ofd) {}
+ @Override
+ public void cleanup() { }
- private void writeObject(java.io.ObjectOutputStream stream) {}
- };
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer ofd) {}
}
private StormTopology createNoOpTopology() {
TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("1", makeNoOpSpout());
- builder.setBolt("2", makeNoOpBolt()).fieldsGrouping("1", new Fields("tuple"));
+ builder.setSpout("1", new NoopSpout());
+ builder.setBolt("2", new NoopBolt()).fieldsGrouping("1", new Fields("tuple"));
return builder.createTopology();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/54f0bf22/storm-server/src/test/resources/log4j2.xml
----------------------------------------------------------------------
diff --git a/storm-server/src/test/resources/log4j2.xml b/storm-server/src/test/resources/log4j2.xml
index fe097c6..2348548 100644
--- a/storm-server/src/test/resources/log4j2.xml
+++ b/storm-server/src/test/resources/log4j2.xml
@@ -22,11 +22,9 @@
</Console>
</Appenders>
<Loggers>
- <!-- suppress ERROR org.apache.storm.blobstore.BlobStoreUtils - Could not update the blob with key: key when testing -->
- <Logger name="org.apache.storm.blobstore" level="FATAL" />
- <Root level="ERROR">
+ <Root level="INFO">
<appender-ref ref="console" />
</Root>
</Loggers>
</Configuration>
-
\ No newline at end of file
+
[3/3] storm git commit: Merge branch 'STORM-2748' of
https://github.com/revans2/incubator-storm into STORM-2748
Posted by bo...@apache.org.
Merge branch 'STORM-2748' of https://github.com/revans2/incubator-storm into STORM-2748
STORM-2748: Fix TickTupleTest to actually test something
This closes #2334
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/66ff5fd9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/66ff5fd9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/66ff5fd9
Branch: refs/heads/master
Commit: 66ff5fd9400aa1195c2e00409b96f991e869a824
Parents: 50d55a9 b182dc6
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Thu Sep 21 12:49:35 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Thu Sep 21 12:49:35 2017 -0500
----------------------------------------------------------------------
.../java/org/apache/storm/TickTupleTest.java | 108 +++++++++++--------
storm-server/src/test/resources/log4j2.xml | 6 +-
2 files changed, 65 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
[2/3] storm git commit: Addressed review comments
Posted by bo...@apache.org.
Addressed review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b182dc6a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b182dc6a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b182dc6a
Branch: refs/heads/master
Commit: b182dc6a76abf1a1208bf7df861bf5316468ed7c
Parents: 54f0bf2
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Sep 20 16:06:09 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Sep 20 16:06:09 2017 -0500
----------------------------------------------------------------------
.../java/org/apache/storm/TickTupleTest.java | 26 ++++++--------------
1 file changed, 8 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b182dc6a/storm-server/src/test/java/org/apache/storm/TickTupleTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/TickTupleTest.java b/storm-server/src/test/java/org/apache/storm/TickTupleTest.java
index daa706b..65d2529 100644
--- a/storm-server/src/test/java/org/apache/storm/TickTupleTest.java
+++ b/storm-server/src/test/java/org/apache/storm/TickTupleTest.java
@@ -53,7 +53,7 @@ public class TickTupleTest {
Config topoConf = new Config();
topoConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
try (ILocalTopology topo = cluster.submitTopology("test", topoConf, topology)) {
- //Give the cluster some time to come up
+ //Give the topology some time to come up
long time = 0;
while (tickTupleTimes.size() <= 0) {
assert time <= 100_000 : "took over " + time + " ms of simulated time to get a message back...";
@@ -61,21 +61,11 @@ public class TickTupleTest {
time += 10_000;
}
tickTupleTimes.clear();
- cluster.advanceClusterTime(1);
- time += 1000;
- assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
- cluster.advanceClusterTime(1);
- time += 1000;
- assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
- cluster.advanceClusterTime(1);
- time += 1000;
- assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
- cluster.advanceClusterTime(1);
- time += 1000;
- assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
- cluster.advanceClusterTime(1);
- time += 1000;
- assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+ for (int i = 0; i < 5; i++) {
+ cluster.advanceClusterTime(1);
+ time += 1_000;
+ assertEquals("Iteration " + i, (Long)time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS));
+ }
}
assertNull("The bolt got a tuple that is not a tick tuple " + nonTickTuple.get(), nonTickTuple.get());
}
@@ -123,8 +113,8 @@ public class TickTupleTest {
private StormTopology createNoOpTopology() {
TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("1", new NoopSpout());
- builder.setBolt("2", new NoopBolt()).fieldsGrouping("1", new Fields("tuple"));
+ builder.setSpout("Spout", new NoopSpout());
+ builder.setBolt("Bolt", new NoopBolt()).fieldsGrouping("Spout", new Fields("tuple"));
return builder.createTopology();
}
}