You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/21 18:00:21 UTC

[1/3] storm git commit: STORM-2748: Fix TickTupleTest to actually test something

Repository: storm
Updated Branches:
  refs/heads/master 50d55a951 -> 66ff5fd94


STORM-2748: Fix TickTupleTest to actually test something


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/54f0bf22
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/54f0bf22
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/54f0bf22

Branch: refs/heads/master
Commit: 54f0bf22fdcc7e68118ed5184aa8c1f65678e218
Parents: 50d55a9
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Sep 20 10:42:26 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Sep 20 12:06:44 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/storm/TickTupleTest.java    | 118 ++++++++++++-------
 storm-server/src/test/resources/log4j2.xml      |   6 +-
 2 files changed, 75 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/54f0bf22/storm-server/src/test/java/org/apache/storm/TickTupleTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/TickTupleTest.java b/storm-server/src/test/java/org/apache/storm/TickTupleTest.java
index 2ee9a0c..daa706b 100644
--- a/storm-server/src/test/java/org/apache/storm/TickTupleTest.java
+++ b/storm-server/src/test/java/org/apache/storm/TickTupleTest.java
@@ -18,85 +18,113 @@
 
 package org.apache.storm;
 
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.ILocalCluster.ILocalTopology;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.Utils;
-import org.junit.Assert;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.TupleUtils;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.Map;
+import static org.junit.Assert.*;
 
 public class TickTupleTest {
+    private final static Logger LOG = LoggerFactory.getLogger(TickTupleTest.class);
+    private static LinkedBlockingQueue<Long> tickTupleTimes = new LinkedBlockingQueue<>();
+    private static AtomicReference<Tuple> nonTickTuple = new AtomicReference<>(null);
 
     @Test
     public void testTickTupleWorksWithSystemBolt() throws Exception {
-        ILocalCluster cluster = null;
-        try {
-            cluster =  new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build();
+        try (ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build()){
             StormTopology topology = createNoOpTopology();
             Config topoConf = new Config();
-            topoConf.putAll(Utils.readDefaultConfig());
-            topoConf.put("storm.cluster.mode", "local");
             topoConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
-            cluster.submitTopology("test", topoConf,  topology);
-            cluster.advanceClusterTime(2);
-            Assert.assertTrue("Test is passed", true);
-        } finally {
-            cluster.close();
+            try (ILocalTopology topo = cluster.submitTopology("test", topoConf,  topology)) {
+                //Give the cluster some time to come up
+                long time = 0;
+                while (tickTupleTimes.size() <= 0) {
+                    assert time <= 100_000 : "took over " + time + " ms of simulated time to get a message back...";
+                    cluster.advanceClusterTime(10);
+                    time += 10_000;
+                }
+                tickTupleTimes.clear();
+                cluster.advanceClusterTime(1);
+                time += 1000;
+                assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+                cluster.advanceClusterTime(1);
+                time += 1000;
+                assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+                cluster.advanceClusterTime(1);
+                time += 1000;
+                assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+                cluster.advanceClusterTime(1);
+                time += 1000;
+                assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+                cluster.advanceClusterTime(1);
+                time += 1000;
+                assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+            }
+            assertNull("The bolt got a tuple that is not a tick tuple " + nonTickTuple.get(), nonTickTuple.get());
         }
-
     }
 
-    private IRichSpout makeNoOpSpout() {
-        return new BaseRichSpout() {
-            @Override
-            public void declareOutputFields(OutputFieldsDeclarer declarer) {
-                declarer.declare(new Fields("tuple"));
-            }
-
-            @Override
-            public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
-            }
+    private static class NoopSpout extends BaseRichSpout {
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("tuple"));
+        }
 
-            @Override
-            public void nextTuple() {
-            }
+        @Override
+        public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
+        }
 
-            private void writeObject(java.io.ObjectOutputStream stream) {
-            }
-        };
+        @Override
+        public void nextTuple() {
+        }
     }
 
-    private BaseRichBolt makeNoOpBolt() {
-        return new BaseRichBolt() {
-            @Override
-            public void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector outputCollector) {}
-            @Override
-            public void execute(Tuple tuple) {}
+    private static class NoopBolt extends BaseRichBolt {
+        @Override
+        public void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector outputCollector) {}
 
-            @Override
-            public void cleanup() { }
+        @Override
+        public void execute(Tuple tuple) {
+            LOG.info("GOT {}", tuple);
+            if (TupleUtils.isTick(tuple)) {
+                try {
+                    tickTupleTimes.put(Time.currentTimeMillis());
+                } catch (InterruptedException e) {
+                    //Ignored
+                }
+            } else {
+                nonTickTuple.set(tuple);
+            }
+        }
 
-            @Override
-            public void declareOutputFields(OutputFieldsDeclarer ofd) {}
+        @Override
+        public void cleanup() { }
 
-            private void writeObject(java.io.ObjectOutputStream stream) {}
-        };
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer ofd) {}
     }
 
     private StormTopology createNoOpTopology() {
         TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("1", makeNoOpSpout());
-        builder.setBolt("2", makeNoOpBolt()).fieldsGrouping("1", new Fields("tuple"));
+        builder.setSpout("1", new NoopSpout());
+        builder.setBolt("2", new NoopBolt()).fieldsGrouping("1", new Fields("tuple"));
         return builder.createTopology();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/54f0bf22/storm-server/src/test/resources/log4j2.xml
----------------------------------------------------------------------
diff --git a/storm-server/src/test/resources/log4j2.xml b/storm-server/src/test/resources/log4j2.xml
index fe097c6..2348548 100644
--- a/storm-server/src/test/resources/log4j2.xml
+++ b/storm-server/src/test/resources/log4j2.xml
@@ -22,11 +22,9 @@
     </Console>
   </Appenders>
   <Loggers>
-    <!-- suppress ERROR org.apache.storm.blobstore.BlobStoreUtils - Could not update the blob with key: key when testing -->
-    <Logger name="org.apache.storm.blobstore" level="FATAL" />
-    <Root level="ERROR">
+    <Root level="INFO">
       <appender-ref ref="console" />
     </Root>
   </Loggers>
 </Configuration>
-      
\ No newline at end of file
+      


[3/3] storm git commit: Merge branch 'STORM-2748' of https://github.com/revans2/incubator-storm into STORM-2748

Posted by bo...@apache.org.
Merge branch 'STORM-2748' of https://github.com/revans2/incubator-storm into STORM-2748

STORM-2748: Fix TickTupleTest to actually test something

This closes #2334


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/66ff5fd9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/66ff5fd9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/66ff5fd9

Branch: refs/heads/master
Commit: 66ff5fd9400aa1195c2e00409b96f991e869a824
Parents: 50d55a9 b182dc6
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Thu Sep 21 12:49:35 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Thu Sep 21 12:49:35 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/storm/TickTupleTest.java    | 108 +++++++++++--------
 storm-server/src/test/resources/log4j2.xml      |   6 +-
 2 files changed, 65 insertions(+), 49 deletions(-)
----------------------------------------------------------------------



[2/3] storm git commit: Addressed review comments

Posted by bo...@apache.org.
Addressed review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b182dc6a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b182dc6a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b182dc6a

Branch: refs/heads/master
Commit: b182dc6a76abf1a1208bf7df861bf5316468ed7c
Parents: 54f0bf2
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Sep 20 16:06:09 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Sep 20 16:06:09 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/storm/TickTupleTest.java    | 26 ++++++--------------
 1 file changed, 8 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b182dc6a/storm-server/src/test/java/org/apache/storm/TickTupleTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/TickTupleTest.java b/storm-server/src/test/java/org/apache/storm/TickTupleTest.java
index daa706b..65d2529 100644
--- a/storm-server/src/test/java/org/apache/storm/TickTupleTest.java
+++ b/storm-server/src/test/java/org/apache/storm/TickTupleTest.java
@@ -53,7 +53,7 @@ public class TickTupleTest {
             Config topoConf = new Config();
             topoConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
             try (ILocalTopology topo = cluster.submitTopology("test", topoConf,  topology)) {
-                //Give the cluster some time to come up
+                //Give the topology some time to come up
                 long time = 0;
                 while (tickTupleTimes.size() <= 0) {
                     assert time <= 100_000 : "took over " + time + " ms of simulated time to get a message back...";
@@ -61,21 +61,11 @@ public class TickTupleTest {
                     time += 10_000;
                 }
                 tickTupleTimes.clear();
-                cluster.advanceClusterTime(1);
-                time += 1000;
-                assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
-                cluster.advanceClusterTime(1);
-                time += 1000;
-                assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
-                cluster.advanceClusterTime(1);
-                time += 1000;
-                assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
-                cluster.advanceClusterTime(1);
-                time += 1000;
-                assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
-                cluster.advanceClusterTime(1);
-                time += 1000;
-                assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue());
+                for (int i = 0; i < 5; i++) {
+                    cluster.advanceClusterTime(1);
+                    time += 1_000;
+                    assertEquals("Iteration " + i, (Long)time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS));
+                }
             }
             assertNull("The bolt got a tuple that is not a tick tuple " + nonTickTuple.get(), nonTickTuple.get());
         }
@@ -123,8 +113,8 @@ public class TickTupleTest {
 
     private StormTopology createNoOpTopology() {
         TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("1", new NoopSpout());
-        builder.setBolt("2", new NoopBolt()).fieldsGrouping("1", new Fields("tuple"));
+        builder.setSpout("Spout", new NoopSpout());
+        builder.setBolt("Bolt", new NoopBolt()).fieldsGrouping("Spout", new Fields("tuple"));
         return builder.createTopology();
     }
 }