You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/21 05:06:53 UTC

[5/7] storm git commit: Addressing review comments

Addressing review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e764a2bc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e764a2bc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e764a2bc

Branch: refs/heads/master
Commit: e764a2bcc93b0f39f1480b0bfbde7c44a74248f1
Parents: 60ff46e
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Tue Dec 15 14:16:43 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Tue Dec 15 14:16:43 2015 +0530

----------------------------------------------------------------------
 docs/documentation/Windowing.md                 |  3 ++-
 .../storm/topology/WindowedBoltExecutor.java    |  7 +++++--
 .../windowing/WaterMarkEventGenerator.java      | 14 +++++++++----
 .../windowing/WaterMarkEventGeneratorTest.java  | 21 ++++++++++++++++++--
 4 files changed, 36 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e764a2bc/docs/documentation/Windowing.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Windowing.md b/docs/documentation/Windowing.md
index 54e5e2d..803e5ca 100644
--- a/docs/documentation/Windowing.md
+++ b/docs/documentation/Windowing.md
@@ -145,7 +145,8 @@ If the field is not present in the tuple an exception will be thrown. Along with
 can also be specified which indicates the max time limit for tuples with out of order timestamps. 
 
 E.g. If the lag is 5 secs and a tuple `t1` arrived with timestamp `06:00:05` no tuples may arrive with tuple timestamp earlier than `06:00:00`. If a tuple
-arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`, it will be treated as a late tuple and not processed. 
+arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`, it will be treated as a late tuple and not processed. Currently the late
+ tuples are just logged in the worker log files at INFO level.
 
 ```java
 /**

http://git-wip-us.apache.org/repos/asf/storm/blob/e764a2bc/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
index 7b45cbc..b753c2c 100644
--- a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
@@ -236,8 +236,11 @@ public class WindowedBoltExecutor implements IRichBolt {
     public void execute(Tuple input) {
         if (isTupleTs()) {
             long ts = input.getLongByField(tupleTsFieldName);
-            windowManager.add(input, ts);
-            waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts);
+            if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
+                windowManager.add(input, ts);
+            } else {
+                LOG.info("Received a late tuple {} with ts {}. This will not processed.", input, ts);
+            }
         } else {
             windowManager.add(input);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/e764a2bc/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java b/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java
index 9820da6..88781db 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java
@@ -57,12 +57,18 @@ public class WaterMarkEventGenerator<T> implements Runnable {
         this.inputStreams = inputStreams;
     }
 
-    public void track(GlobalStreamId stream, long ts) {
+    /**
+     * Tracks the timestamp of the event in the stream, returns
+     * true if the event can be considered for processing or
+     * false if its a late event.
+     */
+    public boolean track(GlobalStreamId stream, long ts) {
         Long currentVal = streamToTs.get(stream);
         if (currentVal == null || ts > currentVal) {
             streamToTs.put(stream, ts);
         }
         checkFailures();
+        return ts >= lastWaterMarkTs;
     }
 
     @Override
@@ -70,7 +76,7 @@ public class WaterMarkEventGenerator<T> implements Runnable {
         try {
             long waterMarkTs = computeWaterMarkTs();
             if (waterMarkTs > lastWaterMarkTs) {
-                this.windowManager.add(new WaterMarkEvent<T>(waterMarkTs - eventTsLag));
+                this.windowManager.add(new WaterMarkEvent<T>(waterMarkTs));
                 lastWaterMarkTs = waterMarkTs;
             }
         } catch (Throwable th) {
@@ -83,7 +89,7 @@ public class WaterMarkEventGenerator<T> implements Runnable {
      * Computes the min ts across all streams.
      */
     private long computeWaterMarkTs() {
-        long ts = Long.MIN_VALUE;
+        long ts = 0;
         // only if some data has arrived on each input stream
         if(streamToTs.size() >= inputStreams.size()) {
             ts = Long.MAX_VALUE;
@@ -91,7 +97,7 @@ public class WaterMarkEventGenerator<T> implements Runnable {
                 ts = Math.min(ts, entry.getValue());
             }
         }
-        return ts;
+        return ts - eventTsLag;
     }
 
     private void checkFailures() {

http://git-wip-us.apache.org/repos/asf/storm/blob/e764a2bc/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java b/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
index 2e52236..a517045 100644
--- a/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
+++ b/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
@@ -31,7 +31,7 @@ import java.util.Set;
 import static org.junit.Assert.*;
 
 /**
- * Unit tests for {@link WaterMarkEventGeneratorTest}
+ * Unit tests for {@link WaterMarkEventGenerator}
  */
 public class WaterMarkEventGeneratorTest {
     WaterMarkEventGenerator<Integer> waterMarkEventGenerator;
@@ -79,7 +79,7 @@ public class WaterMarkEventGeneratorTest {
         Set<GlobalStreamId> streams = new HashSet<>();
         streams.add(streamId("s1"));
         streams.add(streamId("s2"));
-        waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 50, 5, streams);
+        waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 100000, 5, streams);
 
         waterMarkEventGenerator.track(streamId("s1"), 100);
         waterMarkEventGenerator.track(streamId("s1"), 110);
@@ -97,4 +97,21 @@ public class WaterMarkEventGeneratorTest {
         waterMarkEventGenerator.run();
         assertTrue(eventList.isEmpty());
     }
+
+    @Test
+    public void testLateEvent() throws Exception {
+        assertTrue(waterMarkEventGenerator.track(streamId("s1"), 100));
+        assertTrue(waterMarkEventGenerator.track(streamId("s1"), 110));
+        waterMarkEventGenerator.run();
+        assertTrue(eventList.get(0).isWatermark());
+        assertEquals(105, eventList.get(0).getTimestamp());
+        eventList.clear();
+        assertTrue(waterMarkEventGenerator.track(streamId("s1"), 105));
+        assertTrue(waterMarkEventGenerator.track(streamId("s1"), 106));
+        assertTrue(waterMarkEventGenerator.track(streamId("s1"), 115));
+        assertFalse(waterMarkEventGenerator.track(streamId("s1"), 104));
+        waterMarkEventGenerator.run();
+        assertTrue(eventList.get(0).isWatermark());
+        assertEquals(110, eventList.get(0).getTimestamp());
+    }
 }