You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by revans2 <gi...@git.apache.org> on 2017/09/20 15:43:36 UTC
[GitHub] storm pull request #2334: STORM-2748: Fix TickTupleTest to actually test som...
GitHub user revans2 opened a pull request:
https://github.com/apache/storm/pull/2334
STORM-2748: Fix TickTupleTest to actually test something
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/revans2/incubator-storm STORM-2748
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/storm/pull/2334.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2334
----
commit b1bc23521e686216043c42f1f1f7d31cc4a92a26
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date: 2017-09-20T15:42:26Z
STORM-2748: Fix TickTupleTest to actually test something
----
---
[GitHub] storm pull request #2334: STORM-2748: Fix TickTupleTest to actually test som...
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2334#discussion_r140039574
--- Diff: storm-server/src/test/java/org/apache/storm/TickTupleTest.java ---
@@ -18,85 +18,113 @@
package org.apache.storm;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.ILocalCluster.ILocalTopology;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.Utils;
-import org.junit.Assert;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.TupleUtils;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.Map;
+import static org.junit.Assert.*;
public class TickTupleTest {
+ private final static Logger LOG = LoggerFactory.getLogger(TickTupleTest.class);
+ private static LinkedBlockingQueue<Long> tickTupleTimes = new LinkedBlockingQueue<>();
+ private static AtomicReference<Tuple> nonTickTuple = new AtomicReference<>(null);
@Test
public void testTickTupleWorksWithSystemBolt() throws Exception {
- ILocalCluster cluster = null;
- try {
- cluster = new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build();
+ try (ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build()){
StormTopology topology = createNoOpTopology();
Config topoConf = new Config();
- topoConf.putAll(Utils.readDefaultConfig());
- topoConf.put("storm.cluster.mode", "local");
topoConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
- cluster.submitTopology("test", topoConf, topology);
- cluster.advanceClusterTime(2);
- Assert.assertTrue("Test is passed", true);
- } finally {
- cluster.close();
+ try (ILocalTopology topo = cluster.submitTopology("test", topoConf, topology)) {
+ //Give the cluster some time to come up
+ long time = 0;
+ while (tickTupleTimes.size() <= 0) {
+ assert time <= 100_000 : "took over " + time + " ms of simulated time to get a message back...";
+ cluster.advanceClusterTime(10);
+ time += 10_000;
+ }
+ tickTupleTimes.clear();
+ cluster.advanceClusterTime(1);
+ time += 1000;
--- End diff --
I think this can be put in a loop now, the following lines don't differ anymore.
---
[GitHub] storm pull request #2334: STORM-2748: Fix TickTupleTest to actually test som...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/storm/pull/2334
---
[GitHub] storm issue #2334: STORM-2748: Fix TickTupleTest to actually test something
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:
https://github.com/apache/storm/pull/2334
@srdo I finally have the test passing predictably on travis. I also fixed the issues that you saw. Feel free to take another look.
---
[GitHub] storm issue #2334: STORM-2748: Fix TickTupleTest to actually test something
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:
https://github.com/apache/storm/pull/2334
@srdo thanks for the review. Trying to get the test to work on the slower VMs is still a work in progress. I'll let you know why I actually have it passing consistently.
---
[GitHub] storm pull request #2334: STORM-2748: Fix TickTupleTest to actually test som...
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2334#discussion_r140027547
--- Diff: storm-server/src/test/java/org/apache/storm/TickTupleTest.java ---
@@ -18,85 +18,103 @@
package org.apache.storm;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.ILocalCluster.ILocalTopology;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.Utils;
-import org.junit.Assert;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.TupleUtils;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.Map;
+import static org.junit.Assert.*;
public class TickTupleTest {
+ private final static Logger LOG = LoggerFactory.getLogger(TickTupleTest.class);
+ private static LinkedBlockingQueue<Long> tickTupleTimes = new LinkedBlockingQueue<>();
+ private static AtomicReference<Tuple> nonTickTuple = new AtomicReference<>(null);
@Test
public void testTickTupleWorksWithSystemBolt() throws Exception {
- ILocalCluster cluster = null;
- try {
- cluster = new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build();
+ try (ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build()){
StormTopology topology = createNoOpTopology();
Config topoConf = new Config();
- topoConf.putAll(Utils.readDefaultConfig());
- topoConf.put("storm.cluster.mode", "local");
topoConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
- cluster.submitTopology("test", topoConf, topology);
- cluster.advanceClusterTime(2);
- Assert.assertTrue("Test is passed", true);
- } finally {
- cluster.close();
+ try (ILocalTopology topo = cluster.submitTopology("test", topoConf, topology)) {
+ //Give the cluster some time to come up
+ cluster.advanceClusterTime(40);
+ tickTupleTimes.clear();
+ cluster.advanceClusterTime(1);
+ assertEquals(41000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+ cluster.advanceClusterTime(1);
+ assertEquals(42000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+ cluster.advanceClusterTime(1);
+ assertEquals(43000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+ cluster.advanceClusterTime(1);
+ assertEquals(44000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+ cluster.advanceClusterTime(1);
+ assertEquals(45000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+ }
+ assertNull("The bolt got a tuple that is not a tick tuple " + nonTickTuple.get(), nonTickTuple.get());
}
-
}
- private IRichSpout makeNoOpSpout() {
- return new BaseRichSpout() {
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("tuple"));
- }
-
- @Override
- public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
- }
+ private static class NoopSpout extends BaseRichSpout {
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("tuple"));
+ }
- @Override
- public void nextTuple() {
- }
+ @Override
+ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
+ }
- private void writeObject(java.io.ObjectOutputStream stream) {
- }
- };
+ @Override
+ public void nextTuple() {
+ }
}
- private BaseRichBolt makeNoOpBolt() {
- return new BaseRichBolt() {
- @Override
- public void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector outputCollector) {}
- @Override
- public void execute(Tuple tuple) {}
+ private static class NoopBlot extends BaseRichBolt {
+ @Override
+ public void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector outputCollector) {}
- @Override
- public void cleanup() { }
+ @Override
+ public void execute(Tuple tuple) {
+ LOG.error("GOT {}", tuple);
--- End diff --
Nit: Info or debug might be nicer so a grep for errors in the logs doesn't produce this line.
---
[GitHub] storm issue #2334: STORM-2748: Fix TickTupleTest to actually test something
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:
https://github.com/apache/storm/pull/2334
Thanks
---
[GitHub] storm pull request #2334: STORM-2748: Fix TickTupleTest to actually test som...
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2334#discussion_r140026143
--- Diff: storm-server/src/test/java/org/apache/storm/TickTupleTest.java ---
@@ -18,85 +18,103 @@
package org.apache.storm;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.ILocalCluster.ILocalTopology;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.Utils;
-import org.junit.Assert;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.TupleUtils;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.Map;
+import static org.junit.Assert.*;
public class TickTupleTest {
+ private final static Logger LOG = LoggerFactory.getLogger(TickTupleTest.class);
+ private static LinkedBlockingQueue<Long> tickTupleTimes = new LinkedBlockingQueue<>();
+ private static AtomicReference<Tuple> nonTickTuple = new AtomicReference<>(null);
@Test
public void testTickTupleWorksWithSystemBolt() throws Exception {
- ILocalCluster cluster = null;
- try {
- cluster = new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build();
+ try (ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build()){
StormTopology topology = createNoOpTopology();
Config topoConf = new Config();
- topoConf.putAll(Utils.readDefaultConfig());
- topoConf.put("storm.cluster.mode", "local");
topoConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
- cluster.submitTopology("test", topoConf, topology);
- cluster.advanceClusterTime(2);
- Assert.assertTrue("Test is passed", true);
- } finally {
- cluster.close();
+ try (ILocalTopology topo = cluster.submitTopology("test", topoConf, topology)) {
+ //Give the cluster some time to come up
+ cluster.advanceClusterTime(40);
+ tickTupleTimes.clear();
+ cluster.advanceClusterTime(1);
+ assertEquals(41000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+ cluster.advanceClusterTime(1);
+ assertEquals(42000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+ cluster.advanceClusterTime(1);
+ assertEquals(43000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+ cluster.advanceClusterTime(1);
+ assertEquals(44000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+ cluster.advanceClusterTime(1);
+ assertEquals(45000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+ }
+ assertNull("The bolt got a tuple that is not a tick tuple " + nonTickTuple.get(), nonTickTuple.get());
}
-
}
- private IRichSpout makeNoOpSpout() {
- return new BaseRichSpout() {
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("tuple"));
- }
-
- @Override
- public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
- }
+ private static class NoopSpout extends BaseRichSpout {
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("tuple"));
+ }
- @Override
- public void nextTuple() {
- }
+ @Override
+ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
+ }
- private void writeObject(java.io.ObjectOutputStream stream) {
- }
- };
+ @Override
+ public void nextTuple() {
+ }
}
- private BaseRichBolt makeNoOpBolt() {
- return new BaseRichBolt() {
- @Override
- public void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector outputCollector) {}
- @Override
- public void execute(Tuple tuple) {}
+ private static class NoopBlot extends BaseRichBolt {
--- End diff --
Nit: Blot -> Bolt
---
[GitHub] storm issue #2334: STORM-2748: Fix TickTupleTest to actually test something
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:
https://github.com/apache/storm/pull/2334
+1
---
[GitHub] storm issue #2334: STORM-2748: Fix TickTupleTest to actually test something
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:
https://github.com/apache/storm/pull/2334
@srdo I turned it into a loop like you requested
---