You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/21 05:06:51 UTC

[3/7] storm git commit: Fix timing issues in unit tests

Fix timing issues in unit tests

Fix timing issues in WaterMarkEventGeneratorTest and WindowedBoltExecutorTest by triggering manually


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fa483838
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fa483838
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fa483838

Branch: refs/heads/master
Commit: fa4838381a121a2b83f0a01681296107400a198d
Parents: 06d4bb6
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Tue Dec 8 01:22:48 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Tue Dec 8 01:46:21 2015 +0530

----------------------------------------------------------------------
 .../backtype/storm/topology/WindowedBoltExecutor.java  |  3 ++-
 .../storm/topology/WindowedBoltExecutorTest.java       |  7 +++++--
 .../storm/windowing/WaterMarkEventGeneratorTest.java   | 13 +++++++------
 3 files changed, 14 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fa483838/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
index ff2278e..7b45cbc 100644
--- a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
@@ -59,7 +59,8 @@ public class WindowedBoltExecutor implements IRichBolt {
     private transient WindowManager<Tuple> windowManager;
     private transient int maxLagMs;
     private transient String tupleTsFieldName;
-    private transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
+    // package level for unit tests
+    transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
 
     public WindowedBoltExecutor(IWindowedBolt bolt) {
         this.bolt = bolt;

http://git-wip-us.apache.org/repos/asf/storm/blob/fa483838/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java b/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
index 1c59f3a..d856a99 100644
--- a/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
+++ b/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
@@ -29,6 +29,7 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.TupleImpl;
 import backtype.storm.tuple.Values;
 import backtype.storm.windowing.TupleWindow;
+import backtype.storm.windowing.WaterMarkEvent;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -104,7 +105,8 @@ public class WindowedBoltExecutorTest {
         conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10);
         conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME, "ts");
         conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5);
-        conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 100);
+        // trigger manually to avoid timing issues
+        conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 100000);
         executor.prepare(conf, getTopologyContext(), getOutputCollector());
     }
 
@@ -119,7 +121,8 @@ public class WindowedBoltExecutorTest {
         for (long ts : timstamps) {
             executor.execute(getTuple("s1", new Fields("ts"), new Values(ts)));
         }
-        Thread.sleep(120);
+        //Thread.sleep(120);
+        executor.waterMarkEventGenerator.run();
         //System.out.println(testWindowedBolt.tupleWindows);
         assertEquals(3, testWindowedBolt.tupleWindows.size());
         TupleWindow first = testWindowedBolt.tupleWindows.get(0);

http://git-wip-us.apache.org/repos/asf/storm/blob/fa483838/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java b/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
index 00201be..2e52236 100644
--- a/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
+++ b/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
@@ -50,7 +50,8 @@ public class WaterMarkEventGeneratorTest {
                 eventList.add(event);
             }
         };
-        waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 50, 5,
+        // set watermark interval to a high value and trigger manually to fix timing issues
+        waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 100000, 5,
                                                                 Collections.singleton(streamId("s1")));
     }
 
@@ -58,7 +59,7 @@ public class WaterMarkEventGeneratorTest {
     public void testTrackSingleStream() throws Exception {
         waterMarkEventGenerator.track(streamId("s1"), 100);
         waterMarkEventGenerator.track(streamId("s1"), 110);
-        Thread.sleep(60);
+        waterMarkEventGenerator.run();
         assertTrue(eventList.get(0).isWatermark());
         assertEquals(105, eventList.get(0).getTimestamp());
     }
@@ -68,7 +69,7 @@ public class WaterMarkEventGeneratorTest {
         waterMarkEventGenerator.track(streamId("s1"), 100);
         waterMarkEventGenerator.track(streamId("s1"), 110);
         waterMarkEventGenerator.track(streamId("s1"), 104);
-        Thread.sleep(60);
+        waterMarkEventGenerator.run();
         assertTrue(eventList.get(0).isWatermark());
         assertEquals(105, eventList.get(0).getTimestamp());
     }
@@ -82,18 +83,18 @@ public class WaterMarkEventGeneratorTest {
 
         waterMarkEventGenerator.track(streamId("s1"), 100);
         waterMarkEventGenerator.track(streamId("s1"), 110);
-        Thread.sleep(60);
+        waterMarkEventGenerator.run();
         assertTrue(eventList.isEmpty());
         waterMarkEventGenerator.track(streamId("s2"), 95);
         waterMarkEventGenerator.track(streamId("s2"), 98);
-        Thread.sleep(60);
+        waterMarkEventGenerator.run();
         assertTrue(eventList.get(0).isWatermark());
         assertEquals(93, eventList.get(0).getTimestamp());
     }
 
     @Test
     public void testNoEvents() throws Exception {
-        Thread.sleep(60);
+        waterMarkEventGenerator.run();
         assertTrue(eventList.isEmpty());
     }
 }