You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by revans2 <gi...@git.apache.org> on 2017/09/20 15:43:36 UTC

[GitHub] storm pull request #2334: STORM-2748: Fix TickTupleTest to actually test som...

GitHub user revans2 opened a pull request:

    https://github.com/apache/storm/pull/2334

    STORM-2748: Fix TickTupleTest to actually test something

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/revans2/incubator-storm STORM-2748

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2334.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2334
    
----
commit b1bc23521e686216043c42f1f1f7d31cc4a92a26
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2017-09-20T15:42:26Z

    STORM-2748: Fix TickTupleTest to actually test something

----


---

[GitHub] storm pull request #2334: STORM-2748: Fix TickTupleTest to actually test som...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2334#discussion_r140039574
  
    --- Diff: storm-server/src/test/java/org/apache/storm/TickTupleTest.java ---
    @@ -18,85 +18,113 @@
     
     package org.apache.storm;
     
    +import java.util.Map;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import org.apache.storm.ILocalCluster.ILocalTopology;
     import org.apache.storm.generated.StormTopology;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.OutputCollector;
     import org.apache.storm.task.TopologyContext;
    -import org.apache.storm.topology.IRichSpout;
     import org.apache.storm.topology.OutputFieldsDeclarer;
     import org.apache.storm.topology.TopologyBuilder;
     import org.apache.storm.topology.base.BaseRichBolt;
     import org.apache.storm.topology.base.BaseRichSpout;
     import org.apache.storm.tuple.Fields;
     import org.apache.storm.tuple.Tuple;
    -import org.apache.storm.utils.Utils;
    -import org.junit.Assert;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.TupleUtils;
     import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
    -import java.util.Map;
    +import static org.junit.Assert.*;
     
     public class TickTupleTest {
    +    private final static Logger LOG = LoggerFactory.getLogger(TickTupleTest.class);
    +    private static LinkedBlockingQueue<Long> tickTupleTimes = new LinkedBlockingQueue<>();
    +    private static AtomicReference<Tuple> nonTickTuple = new AtomicReference<>(null);
     
         @Test
         public void testTickTupleWorksWithSystemBolt() throws Exception {
    -        ILocalCluster cluster = null;
    -        try {
    -            cluster =  new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build();
    +        try (ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build()){
                 StormTopology topology = createNoOpTopology();
                 Config topoConf = new Config();
    -            topoConf.putAll(Utils.readDefaultConfig());
    -            topoConf.put("storm.cluster.mode", "local");
                 topoConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
    -            cluster.submitTopology("test", topoConf,  topology);
    -            cluster.advanceClusterTime(2);
    -            Assert.assertTrue("Test is passed", true);
    -        } finally {
    -            cluster.close();
    +            try (ILocalTopology topo = cluster.submitTopology("test", topoConf,  topology)) {
    +                //Give the cluster some time to come up
    +                long time = 0;
    +                while (tickTupleTimes.size() <= 0) {
    +                    assert time <= 100_000 : "took over " + time + " ms of simulated time to get a message back...";
    +                    cluster.advanceClusterTime(10);
    +                    time += 10_000;
    +                }
    +                tickTupleTimes.clear();
    +                cluster.advanceClusterTime(1);
    +                time += 1000;
    --- End diff --
    
    I think this can be put in a loop now, the following lines don't differ anymore.


---

[GitHub] storm pull request #2334: STORM-2748: Fix TickTupleTest to actually test som...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2334


---

[GitHub] storm issue #2334: STORM-2748: Fix TickTupleTest to actually test something

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2334
  
    @srdo I finally have the test passing predictably on travis.  I also fixed the issues that you saw.  Feel free to take another look.


---

[GitHub] storm issue #2334: STORM-2748: Fix TickTupleTest to actually test something

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2334
  
    @srdo thanks for the review.  Trying to get the test to work on the slower VMs is still a work in progress.  I'll let you know why I actually have it passing consistently.


---

[GitHub] storm pull request #2334: STORM-2748: Fix TickTupleTest to actually test som...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2334#discussion_r140027547
  
    --- Diff: storm-server/src/test/java/org/apache/storm/TickTupleTest.java ---
    @@ -18,85 +18,103 @@
     
     package org.apache.storm;
     
    +import java.util.Map;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import org.apache.storm.ILocalCluster.ILocalTopology;
     import org.apache.storm.generated.StormTopology;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.OutputCollector;
     import org.apache.storm.task.TopologyContext;
    -import org.apache.storm.topology.IRichSpout;
     import org.apache.storm.topology.OutputFieldsDeclarer;
     import org.apache.storm.topology.TopologyBuilder;
     import org.apache.storm.topology.base.BaseRichBolt;
     import org.apache.storm.topology.base.BaseRichSpout;
     import org.apache.storm.tuple.Fields;
     import org.apache.storm.tuple.Tuple;
    -import org.apache.storm.utils.Utils;
    -import org.junit.Assert;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.TupleUtils;
     import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
    -import java.util.Map;
    +import static org.junit.Assert.*;
     
     public class TickTupleTest {
    +    private final static Logger LOG = LoggerFactory.getLogger(TickTupleTest.class);
    +    private static LinkedBlockingQueue<Long> tickTupleTimes = new LinkedBlockingQueue<>();
    +    private static AtomicReference<Tuple> nonTickTuple = new AtomicReference<>(null);
     
         @Test
         public void testTickTupleWorksWithSystemBolt() throws Exception {
    -        ILocalCluster cluster = null;
    -        try {
    -            cluster =  new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build();
    +        try (ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build()){
                 StormTopology topology = createNoOpTopology();
                 Config topoConf = new Config();
    -            topoConf.putAll(Utils.readDefaultConfig());
    -            topoConf.put("storm.cluster.mode", "local");
                 topoConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
    -            cluster.submitTopology("test", topoConf,  topology);
    -            cluster.advanceClusterTime(2);
    -            Assert.assertTrue("Test is passed", true);
    -        } finally {
    -            cluster.close();
    +            try (ILocalTopology topo = cluster.submitTopology("test", topoConf,  topology)) {
    +                //Give the cluster some time to come up
    +                cluster.advanceClusterTime(40);
    +                tickTupleTimes.clear();
    +                cluster.advanceClusterTime(1);
    +                assertEquals(41000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
    +                cluster.advanceClusterTime(1);
    +                assertEquals(42000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
    +                cluster.advanceClusterTime(1);
    +                assertEquals(43000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
    +                cluster.advanceClusterTime(1);
    +                assertEquals(44000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
    +                cluster.advanceClusterTime(1);
    +                assertEquals(45000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
    +            }
    +            assertNull("The bolt got a tuple that is not a tick tuple " + nonTickTuple.get(), nonTickTuple.get());
             }
    -
         }
     
    -    private IRichSpout makeNoOpSpout() {
    -        return new BaseRichSpout() {
    -            @Override
    -            public void declareOutputFields(OutputFieldsDeclarer declarer) {
    -                declarer.declare(new Fields("tuple"));
    -            }
    -
    -            @Override
    -            public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
    -            }
    +    private static class NoopSpout extends BaseRichSpout {
    +        @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("tuple"));
    +        }
     
    -            @Override
    -            public void nextTuple() {
    -            }
    +        @Override
    +        public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
    +        }
     
    -            private void writeObject(java.io.ObjectOutputStream stream) {
    -            }
    -        };
    +        @Override
    +        public void nextTuple() {
    +        }
         }
     
    -    private BaseRichBolt makeNoOpBolt() {
    -        return new BaseRichBolt() {
    -            @Override
    -            public void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector outputCollector) {}
    -            @Override
    -            public void execute(Tuple tuple) {}
    +    private static class NoopBlot extends BaseRichBolt {
    +        @Override
    +        public void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector outputCollector) {}
     
    -            @Override
    -            public void cleanup() { }
    +        @Override
    +        public void execute(Tuple tuple) {
    +            LOG.error("GOT {}", tuple);
    --- End diff --
    
    Nit: Info or debug might be nicer so a grep for errors in the logs doesn't produce this line.


---

[GitHub] storm issue #2334: STORM-2748: Fix TickTupleTest to actually test something

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2334
  
    Thanks


---

[GitHub] storm pull request #2334: STORM-2748: Fix TickTupleTest to actually test som...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2334#discussion_r140026143
  
    --- Diff: storm-server/src/test/java/org/apache/storm/TickTupleTest.java ---
    @@ -18,85 +18,103 @@
     
     package org.apache.storm;
     
    +import java.util.Map;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import org.apache.storm.ILocalCluster.ILocalTopology;
     import org.apache.storm.generated.StormTopology;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.OutputCollector;
     import org.apache.storm.task.TopologyContext;
    -import org.apache.storm.topology.IRichSpout;
     import org.apache.storm.topology.OutputFieldsDeclarer;
     import org.apache.storm.topology.TopologyBuilder;
     import org.apache.storm.topology.base.BaseRichBolt;
     import org.apache.storm.topology.base.BaseRichSpout;
     import org.apache.storm.tuple.Fields;
     import org.apache.storm.tuple.Tuple;
    -import org.apache.storm.utils.Utils;
    -import org.junit.Assert;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.TupleUtils;
     import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
    -import java.util.Map;
    +import static org.junit.Assert.*;
     
     public class TickTupleTest {
    +    private final static Logger LOG = LoggerFactory.getLogger(TickTupleTest.class);
    +    private static LinkedBlockingQueue<Long> tickTupleTimes = new LinkedBlockingQueue<>();
    +    private static AtomicReference<Tuple> nonTickTuple = new AtomicReference<>(null);
     
         @Test
         public void testTickTupleWorksWithSystemBolt() throws Exception {
    -        ILocalCluster cluster = null;
    -        try {
    -            cluster =  new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build();
    +        try (ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build()){
                 StormTopology topology = createNoOpTopology();
                 Config topoConf = new Config();
    -            topoConf.putAll(Utils.readDefaultConfig());
    -            topoConf.put("storm.cluster.mode", "local");
                 topoConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
    -            cluster.submitTopology("test", topoConf,  topology);
    -            cluster.advanceClusterTime(2);
    -            Assert.assertTrue("Test is passed", true);
    -        } finally {
    -            cluster.close();
    +            try (ILocalTopology topo = cluster.submitTopology("test", topoConf,  topology)) {
    +                //Give the cluster some time to come up
    +                cluster.advanceClusterTime(40);
    +                tickTupleTimes.clear();
    +                cluster.advanceClusterTime(1);
    +                assertEquals(41000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
    +                cluster.advanceClusterTime(1);
    +                assertEquals(42000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
    +                cluster.advanceClusterTime(1);
    +                assertEquals(43000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
    +                cluster.advanceClusterTime(1);
    +                assertEquals(44000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
    +                cluster.advanceClusterTime(1);
    +                assertEquals(45000, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
    +            }
    +            assertNull("The bolt got a tuple that is not a tick tuple " + nonTickTuple.get(), nonTickTuple.get());
             }
    -
         }
     
    -    private IRichSpout makeNoOpSpout() {
    -        return new BaseRichSpout() {
    -            @Override
    -            public void declareOutputFields(OutputFieldsDeclarer declarer) {
    -                declarer.declare(new Fields("tuple"));
    -            }
    -
    -            @Override
    -            public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
    -            }
    +    private static class NoopSpout extends BaseRichSpout {
    +        @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("tuple"));
    +        }
     
    -            @Override
    -            public void nextTuple() {
    -            }
    +        @Override
    +        public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
    +        }
     
    -            private void writeObject(java.io.ObjectOutputStream stream) {
    -            }
    -        };
    +        @Override
    +        public void nextTuple() {
    +        }
         }
     
    -    private BaseRichBolt makeNoOpBolt() {
    -        return new BaseRichBolt() {
    -            @Override
    -            public void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector outputCollector) {}
    -            @Override
    -            public void execute(Tuple tuple) {}
    +    private static class NoopBlot extends BaseRichBolt {
    --- End diff --
    
    Nit: Blot -> Bolt


---

[GitHub] storm issue #2334: STORM-2748: Fix TickTupleTest to actually test something

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2334
  
    +1


---

[GitHub] storm issue #2334: STORM-2748: Fix TickTupleTest to actually test something

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2334
  
    @srdo I turned it into a loop like you requested


---