You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/21 05:06:51 UTC
[3/7] storm git commit: Fix timing issues in unit tests
Fix timing issues in unit tests
Fix timing issues in WaterMarkEventGeneratorTest and WindowedBoltExecutorTest by triggering manually
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fa483838
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fa483838
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fa483838
Branch: refs/heads/master
Commit: fa4838381a121a2b83f0a01681296107400a198d
Parents: 06d4bb6
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Tue Dec 8 01:22:48 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Tue Dec 8 01:46:21 2015 +0530
----------------------------------------------------------------------
.../backtype/storm/topology/WindowedBoltExecutor.java | 3 ++-
.../storm/topology/WindowedBoltExecutorTest.java | 7 +++++--
.../storm/windowing/WaterMarkEventGeneratorTest.java | 13 +++++++------
3 files changed, 14 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fa483838/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
index ff2278e..7b45cbc 100644
--- a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
@@ -59,7 +59,8 @@ public class WindowedBoltExecutor implements IRichBolt {
private transient WindowManager<Tuple> windowManager;
private transient int maxLagMs;
private transient String tupleTsFieldName;
- private transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
+ // package level for unit tests
+ transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
public WindowedBoltExecutor(IWindowedBolt bolt) {
this.bolt = bolt;
http://git-wip-us.apache.org/repos/asf/storm/blob/fa483838/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java b/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
index 1c59f3a..d856a99 100644
--- a/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
+++ b/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
@@ -29,6 +29,7 @@ import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleImpl;
import backtype.storm.tuple.Values;
import backtype.storm.windowing.TupleWindow;
+import backtype.storm.windowing.WaterMarkEvent;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -104,7 +105,8 @@ public class WindowedBoltExecutorTest {
conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10);
conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME, "ts");
conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5);
- conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 100);
+ // trigger manually to avoid timing issues
+ conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 100000);
executor.prepare(conf, getTopologyContext(), getOutputCollector());
}
@@ -119,7 +121,8 @@ public class WindowedBoltExecutorTest {
for (long ts : timstamps) {
executor.execute(getTuple("s1", new Fields("ts"), new Values(ts)));
}
- Thread.sleep(120);
+ //Thread.sleep(120);
+ executor.waterMarkEventGenerator.run();
//System.out.println(testWindowedBolt.tupleWindows);
assertEquals(3, testWindowedBolt.tupleWindows.size());
TupleWindow first = testWindowedBolt.tupleWindows.get(0);
http://git-wip-us.apache.org/repos/asf/storm/blob/fa483838/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java b/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
index 00201be..2e52236 100644
--- a/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
+++ b/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
@@ -50,7 +50,8 @@ public class WaterMarkEventGeneratorTest {
eventList.add(event);
}
};
- waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 50, 5,
+ // set watermark interval to a high value and trigger manually to fix timing issues
+ waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 100000, 5,
Collections.singleton(streamId("s1")));
}
@@ -58,7 +59,7 @@ public class WaterMarkEventGeneratorTest {
public void testTrackSingleStream() throws Exception {
waterMarkEventGenerator.track(streamId("s1"), 100);
waterMarkEventGenerator.track(streamId("s1"), 110);
- Thread.sleep(60);
+ waterMarkEventGenerator.run();
assertTrue(eventList.get(0).isWatermark());
assertEquals(105, eventList.get(0).getTimestamp());
}
@@ -68,7 +69,7 @@ public class WaterMarkEventGeneratorTest {
waterMarkEventGenerator.track(streamId("s1"), 100);
waterMarkEventGenerator.track(streamId("s1"), 110);
waterMarkEventGenerator.track(streamId("s1"), 104);
- Thread.sleep(60);
+ waterMarkEventGenerator.run();
assertTrue(eventList.get(0).isWatermark());
assertEquals(105, eventList.get(0).getTimestamp());
}
@@ -82,18 +83,18 @@ public class WaterMarkEventGeneratorTest {
waterMarkEventGenerator.track(streamId("s1"), 100);
waterMarkEventGenerator.track(streamId("s1"), 110);
- Thread.sleep(60);
+ waterMarkEventGenerator.run();
assertTrue(eventList.isEmpty());
waterMarkEventGenerator.track(streamId("s2"), 95);
waterMarkEventGenerator.track(streamId("s2"), 98);
- Thread.sleep(60);
+ waterMarkEventGenerator.run();
assertTrue(eventList.get(0).isWatermark());
assertEquals(93, eventList.get(0).getTimestamp());
}
@Test
public void testNoEvents() throws Exception {
- Thread.sleep(60);
+ waterMarkEventGenerator.run();
assertTrue(eventList.isEmpty());
}
}