You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/21 05:06:49 UTC

[1/7] storm git commit: STORM-1187 Support windowing based on tuple ts

Repository: storm
Updated Branches:
  refs/heads/master 3e76fd5b9 -> 8f9ed06db


http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java b/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
new file mode 100644
index 0000000..00201be
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import backtype.storm.generated.GlobalStreamId;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit tests for {@link WaterMarkEventGeneratorTest}
+ */
+public class WaterMarkEventGeneratorTest {
+    WaterMarkEventGenerator<Integer> waterMarkEventGenerator;
+    WindowManager<Integer> windowManager;
+    List<Event<Integer>> eventList = new ArrayList<>();
+
+    private GlobalStreamId streamId(String component) {
+        return new GlobalStreamId(component, "default");
+    }
+
+    @Before
+    public void setUp() {
+        windowManager = new WindowManager<Integer>(null) {
+            @Override
+            public void add(Event<Integer> event) {
+                eventList.add(event);
+            }
+        };
+        waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 50, 5,
+                                                                Collections.singleton(streamId("s1")));
+    }
+
+    @Test
+    public void testTrackSingleStream() throws Exception {
+        waterMarkEventGenerator.track(streamId("s1"), 100);
+        waterMarkEventGenerator.track(streamId("s1"), 110);
+        Thread.sleep(60);
+        assertTrue(eventList.get(0).isWatermark());
+        assertEquals(105, eventList.get(0).getTimestamp());
+    }
+
+    @Test
+    public void testTrackSingleStreamOutOfOrder() throws Exception {
+        waterMarkEventGenerator.track(streamId("s1"), 100);
+        waterMarkEventGenerator.track(streamId("s1"), 110);
+        waterMarkEventGenerator.track(streamId("s1"), 104);
+        Thread.sleep(60);
+        assertTrue(eventList.get(0).isWatermark());
+        assertEquals(105, eventList.get(0).getTimestamp());
+    }
+
+    @Test
+    public void testTrackTwoStreams() throws Exception {
+        Set<GlobalStreamId> streams = new HashSet<>();
+        streams.add(streamId("s1"));
+        streams.add(streamId("s2"));
+        waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 50, 5, streams);
+
+        waterMarkEventGenerator.track(streamId("s1"), 100);
+        waterMarkEventGenerator.track(streamId("s1"), 110);
+        Thread.sleep(60);
+        assertTrue(eventList.isEmpty());
+        waterMarkEventGenerator.track(streamId("s2"), 95);
+        waterMarkEventGenerator.track(streamId("s2"), 98);
+        Thread.sleep(60);
+        assertTrue(eventList.get(0).isWatermark());
+        assertEquals(93, eventList.get(0).getTimestamp());
+    }
+
+    @Test
+    public void testNoEvents() throws Exception {
+        Thread.sleep(60);
+        assertTrue(eventList.isEmpty());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/test/jvm/backtype/storm/windowing/WindowManagerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/windowing/WindowManagerTest.java b/storm-core/test/jvm/backtype/storm/windowing/WindowManagerTest.java
index 6531817..b1584bc 100644
--- a/storm-core/test/jvm/backtype/storm/windowing/WindowManagerTest.java
+++ b/storm-core/test/jvm/backtype/storm/windowing/WindowManagerTest.java
@@ -22,8 +22,11 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import static backtype.storm.topology.base.BaseWindowedBolt.Count;
@@ -44,16 +47,26 @@ public class WindowManagerTest {
         List<Integer> onActivationNewEvents = Collections.emptyList();
         List<Integer> onActivationExpiredEvents = Collections.emptyList();
 
+        // all events since last clear
+        List<List<Integer>> allOnExpiryEvents = new ArrayList<>();
+        List<List<Integer>> allOnActivationEvents = new ArrayList<>();
+        List<List<Integer>> allOnActivationNewEvents = new ArrayList<>();
+        List<List<Integer>> allOnActivationExpiredEvents = new ArrayList<>();
+
         @Override
         public void onExpiry(List<Integer> events) {
             onExpiryEvents = events;
+            allOnExpiryEvents.add(events);
         }
 
         @Override
         public void onActivation(List<Integer> events, List<Integer> newEvents, List<Integer> expired) {
             onActivationEvents = events;
+            allOnActivationEvents.add(events);
             onActivationNewEvents = newEvents;
+            allOnActivationNewEvents.add(newEvents);
             onActivationExpiredEvents = expired;
+            allOnActivationExpiredEvents.add(expired);
         }
 
         void clear() {
@@ -61,6 +74,11 @@ public class WindowManagerTest {
             onActivationEvents = Collections.emptyList();
             onActivationNewEvents = Collections.emptyList();
             onActivationExpiredEvents = Collections.emptyList();
+
+            allOnExpiryEvents.clear();
+            allOnActivationEvents.clear();
+            allOnActivationNewEvents.clear();
+            allOnActivationExpiredEvents.clear();
         }
     }
 
@@ -77,8 +95,9 @@ public class WindowManagerTest {
 
     @Test
     public void testCountBasedWindow() throws Exception {
-        windowManager.setWindowLength(new Count(5));
-        windowManager.setSlidingInterval(new Count(2));
+        EvictionPolicy<Integer> evictionPolicy = new CountEvictionPolicy<Integer>(5);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        windowManager.setTriggerPolicy(new CountTriggerPolicy<Integer>(2, windowManager, evictionPolicy));
         windowManager.add(1);
         windowManager.add(2);
         // nothing expired yet
@@ -116,8 +135,8 @@ public class WindowManagerTest {
     public void testExpireThreshold() throws Exception {
         int threshold = WindowManager.EXPIRE_EVENTS_THRESHOLD;
         int windowLength = 5;
-        windowManager.setWindowLength(new Count(5));
-        windowManager.setSlidingInterval(new Duration(1, TimeUnit.HOURS));
+        windowManager.setEvictionPolicy(new CountEvictionPolicy<Integer>(5));
+        windowManager.setTriggerPolicy(new TimeTriggerPolicy<Integer>(new Duration(1, TimeUnit.HOURS).value, windowManager));
         for (int i : seq(1, 5)) {
             windowManager.add(i);
         }
@@ -136,8 +155,13 @@ public class WindowManagerTest {
 
     @Test
     public void testTimeBasedWindow() throws Exception {
-        windowManager.setWindowLength(new Duration(1, TimeUnit.SECONDS));
-        windowManager.setSlidingInterval(new Duration(100, TimeUnit.MILLISECONDS));
+        EvictionPolicy<Integer> evictionPolicy = new TimeEvictionPolicy<Integer>(new Duration(1, TimeUnit.SECONDS).value);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        /*
+         * Don't wait for Timetrigger to fire since this could lead to timing issues in unit tests.
+         * Set it to a large value and trigger manually.
+          */
+        windowManager.setTriggerPolicy(new TimeTriggerPolicy<Integer>(new Duration(1, TimeUnit.DAYS).value, windowManager, evictionPolicy));
         long now = System.currentTimeMillis();
 
         // add with past ts
@@ -156,8 +180,9 @@ public class WindowManagerTest {
         for (int i : seq(WindowManager.EXPIRE_EVENTS_THRESHOLD + 1, WindowManager.EXPIRE_EVENTS_THRESHOLD + 100)) {
             windowManager.add(i, now - 1000);
         }
-        // wait for time trigger
-        Thread.sleep(120);
+        // simulate the time trigger by setting the reference time and invoking onTrigger() manually
+        evictionPolicy.setContext(now + 100);
+        windowManager.onTrigger();
 
         // 100 events with past ts should expire
         assertEquals(100, listener.onExpiryEvents.size());
@@ -178,8 +203,9 @@ public class WindowManagerTest {
             windowManager.add(i, now);
         }
         activationsEvents.addAll(newEvents);
-        // wait for time trigger
-        Thread.sleep(120);
+        // simulate the time trigger by setting the reference time and invoking onTrigger() manually
+        evictionPolicy.setContext(now + 200);
+        windowManager.onTrigger();
         assertTrue(listener.onExpiryEvents.isEmpty());
         assertEquals(activationsEvents, listener.onActivationEvents);
         assertEquals(newEvents, listener.onActivationNewEvents);
@@ -189,22 +215,34 @@ public class WindowManagerTest {
 
     @Test
     public void testTimeBasedWindowExpiry() throws Exception {
-        windowManager.setWindowLength(new Duration(100, TimeUnit.MILLISECONDS));
-        windowManager.setSlidingInterval(new Duration(60, TimeUnit.MILLISECONDS));
+        EvictionPolicy<Integer> evictionPolicy = new TimeEvictionPolicy<Integer>(new Duration(100, TimeUnit.MILLISECONDS).value);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        /*
+         * Don't wait for Timetrigger to fire since this could lead to timing issues in unit tests.
+         * Set it to a large value and trigger manually.
+          */
+        windowManager.setTriggerPolicy(new TimeTriggerPolicy<Integer>(new Duration(1, TimeUnit.DAYS).value, windowManager));
+        long now = System.currentTimeMillis();
         // add 10 events
         for (int i : seq(1, 10)) {
             windowManager.add(i);
         }
-        Thread.sleep(70);
+        // simulate the time trigger by setting the reference time and invoking onTrigger() manually
+        evictionPolicy.setContext(now + 60);
+        windowManager.onTrigger();
+
         assertEquals(seq(1, 10), listener.onActivationEvents);
         assertTrue(listener.onActivationExpiredEvents.isEmpty());
         listener.clear();
         // wait so all events expire
-        Thread.sleep(70);
-        assertEquals(seq(1, 10), listener.onActivationExpiredEvents);
+        evictionPolicy.setContext(now + 120);
+        windowManager.onTrigger();
+
+        assertEquals(seq(1, 10), listener.onExpiryEvents);
         assertTrue(listener.onActivationEvents.isEmpty());
         listener.clear();
-        Thread.sleep(70);
+        evictionPolicy.setContext(now + 180);
+        windowManager.onTrigger();
         assertTrue(listener.onActivationExpiredEvents.isEmpty());
         assertTrue(listener.onActivationEvents.isEmpty());
 
@@ -212,8 +250,9 @@ public class WindowManagerTest {
 
     @Test
     public void testTumblingWindow() throws Exception {
-        windowManager.setWindowLength(new Count(3));
-        windowManager.setSlidingInterval(new Count(3));
+        EvictionPolicy<Integer> evictionPolicy = new CountEvictionPolicy<Integer>(3);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        windowManager.setTriggerPolicy(new CountTriggerPolicy<Integer>(3, windowManager, evictionPolicy));
         windowManager.add(1);
         windowManager.add(2);
         // nothing expired yet
@@ -236,6 +275,211 @@ public class WindowManagerTest {
 
     }
 
+
+    @Test
+    public void testEventTimeBasedWindow() throws Exception {
+        EvictionPolicy<Integer> evictionPolicy = new WatermarkTimeEvictionPolicy<>(20);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        windowManager.setTriggerPolicy(new WatermarkTimeTriggerPolicy<Integer>(10, windowManager, evictionPolicy, windowManager));
+
+        windowManager.add(1, 603);
+        windowManager.add(2, 605);
+        windowManager.add(3, 607);
+
+        // This should trigger the scan to find
+        // the next aligned window end ts, but not produce any activations
+        windowManager.add(new WaterMarkEvent<Integer>(609));
+        assertEquals(Collections.emptyList(), listener.allOnActivationEvents);
+
+        windowManager.add(4, 618);
+        windowManager.add(5, 626);
+        windowManager.add(6, 636);
+        // send a watermark event, which should trigger three windows.
+        windowManager.add(new WaterMarkEvent<Integer>(631));
+
+//        System.out.println(listener.allOnActivationEvents);
+        assertEquals(3, listener.allOnActivationEvents.size());
+        assertEquals(seq(1,3), listener.allOnActivationEvents.get(0));
+        assertEquals(seq(1, 4), listener.allOnActivationEvents.get(1));
+        assertEquals(seq(4, 5), listener.allOnActivationEvents.get(2));
+
+        assertEquals(Collections.emptyList(), listener.allOnActivationExpiredEvents.get(0));
+        assertEquals(Collections.emptyList(), listener.allOnActivationExpiredEvents.get(1));
+        assertEquals(seq(1, 3), listener.allOnActivationExpiredEvents.get(2));
+
+        assertEquals(seq(1, 3), listener.allOnActivationNewEvents.get(0));
+        assertEquals(seq(4, 4), listener.allOnActivationNewEvents.get(1));
+        assertEquals(seq(5, 5), listener.allOnActivationNewEvents.get(2));
+
+        assertEquals(seq(1, 3), listener.allOnExpiryEvents.get(0));
+
+        // add more events with a gap in ts
+        windowManager.add(7, 825);
+        windowManager.add(8, 826);
+        windowManager.add(9, 827);
+        windowManager.add(10, 839);
+
+        listener.clear();
+        windowManager.add(new WaterMarkEvent<Integer>(834));
+
+        assertEquals(3, listener.allOnActivationEvents.size());
+        assertEquals(seq(5, 6), listener.allOnActivationEvents.get(0));
+        assertEquals(seq(6, 6), listener.allOnActivationEvents.get(1));
+        assertEquals(seq(7, 9), listener.allOnActivationEvents.get(2));
+
+        assertEquals(seq(4, 4), listener.allOnActivationExpiredEvents.get(0));
+        assertEquals(seq(5, 5), listener.allOnActivationExpiredEvents.get(1));
+        assertEquals(Collections.emptyList(), listener.allOnActivationExpiredEvents.get(2));
+
+        assertEquals(seq(6,6), listener.allOnActivationNewEvents.get(0));
+        assertEquals(Collections.emptyList(), listener.allOnActivationNewEvents.get(1));
+        assertEquals(seq(7, 9), listener.allOnActivationNewEvents.get(2));
+
+        assertEquals(seq(4, 4), listener.allOnExpiryEvents.get(0));
+        assertEquals(seq(5, 5), listener.allOnExpiryEvents.get(1));
+        assertEquals(seq(6, 6), listener.allOnExpiryEvents.get(2));
+    }
+
+    @Test
+    public void testCountBasedWindowWithEventTs() throws Exception {
+        EvictionPolicy<Integer> evictionPolicy = new WatermarkCountEvictionPolicy<>(3, windowManager);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        windowManager.setTriggerPolicy(new WatermarkTimeTriggerPolicy<Integer>(10, windowManager, evictionPolicy, windowManager));
+
+        windowManager.add(1, 603);
+        windowManager.add(2, 605);
+        windowManager.add(3, 607);
+        windowManager.add(4, 618);
+        windowManager.add(5, 626);
+        windowManager.add(6, 636);
+        // send a watermark event, which should trigger three windows.
+        windowManager.add(new WaterMarkEvent<Integer>(631));
+
+        assertEquals(3, listener.allOnActivationEvents.size());
+        assertEquals(seq(1, 3), listener.allOnActivationEvents.get(0));
+        assertEquals(seq(2, 4), listener.allOnActivationEvents.get(1));
+        assertEquals(seq(3, 5), listener.allOnActivationEvents.get(2));
+
+        // add more events with a gap in ts
+        windowManager.add(7, 665);
+        windowManager.add(8, 666);
+        windowManager.add(9, 667);
+        windowManager.add(10, 679);
+
+        listener.clear();
+        windowManager.add(new WaterMarkEvent<Integer>(674));
+//        System.out.println(listener.allOnActivationEvents);
+        assertEquals(4, listener.allOnActivationEvents.size());
+        // same set of events part of three windows
+        assertEquals(seq(4, 6), listener.allOnActivationEvents.get(0));
+        assertEquals(seq(4, 6), listener.allOnActivationEvents.get(1));
+        assertEquals(seq(4, 6), listener.allOnActivationEvents.get(2));
+        assertEquals(seq(7, 9), listener.allOnActivationEvents.get(3));
+    }
+
+    @Test
+    public void testCountBasedTriggerWithEventTs() throws Exception {
+        EvictionPolicy<Integer> evictionPolicy = new WatermarkTimeEvictionPolicy<Integer>(20);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        windowManager.setTriggerPolicy(new WatermarkCountTriggerPolicy<Integer>(3, windowManager,
+                                                                                evictionPolicy, windowManager));
+
+        windowManager.add(1, 603);
+        windowManager.add(2, 605);
+        windowManager.add(3, 607);
+        windowManager.add(4, 618);
+        windowManager.add(5, 625);
+        windowManager.add(6, 626);
+        windowManager.add(7, 629);
+        windowManager.add(8, 636);
+        // send a watermark event, which should trigger three windows.
+        windowManager.add(new WaterMarkEvent<Integer>(631));
+//        System.out.println(listener.allOnActivationEvents);
+
+        assertEquals(2, listener.allOnActivationEvents.size());
+        assertEquals(seq(1, 3), listener.allOnActivationEvents.get(0));
+        assertEquals(seq(3, 6), listener.allOnActivationEvents.get(1));
+
+        // add more events with a gap in ts
+        windowManager.add(9, 665);
+        windowManager.add(10, 666);
+        windowManager.add(11, 667);
+        windowManager.add(12, 669);
+        windowManager.add(12, 679);
+
+        listener.clear();
+        windowManager.add(new WaterMarkEvent<Integer>(674));
+//        System.out.println(listener.allOnActivationEvents);
+        assertEquals(2, listener.allOnActivationEvents.size());
+        // same set of events part of three windows
+        assertEquals(seq(9), listener.allOnActivationEvents.get(0));
+        assertEquals(seq(9, 12), listener.allOnActivationEvents.get(1));
+    }
+    @Test
+    public void testEventTimeLag() throws Exception {
+        EvictionPolicy<Integer> evictionPolicy = new WatermarkTimeEvictionPolicy<>(20, 5);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        windowManager.setTriggerPolicy(new WatermarkTimeTriggerPolicy<Integer>(10, windowManager, evictionPolicy, windowManager));
+
+        windowManager.add(1, 603);
+        windowManager.add(2, 605);
+        windowManager.add(3, 607);
+        windowManager.add(4, 618);
+        windowManager.add(5, 626);
+        windowManager.add(6, 632);
+        windowManager.add(7, 629);
+        windowManager.add(8, 636);
+        // send a watermark event, which should trigger three windows.
+        windowManager.add(new WaterMarkEvent<Integer>(631));
+//        System.out.println(listener.allOnActivationEvents);
+        assertEquals(3, listener.allOnActivationEvents.size());
+        assertEquals(seq(1, 3), listener.allOnActivationEvents.get(0));
+        assertEquals(seq(1, 4), listener.allOnActivationEvents.get(1));
+        // out of order events should be processed upto the lag
+        assertEquals(Arrays.asList(4, 5, 7), listener.allOnActivationEvents.get(2));
+    }
+
+    @Test
+    public void testScanStop() throws Exception {
+        final Set<Integer> eventsScanned = new HashSet<>();
+        EvictionPolicy<Integer> evictionPolicy = new WatermarkTimeEvictionPolicy<Integer>(20, 5) {
+
+            @Override
+            public Action evict(Event<Integer> event) {
+                eventsScanned.add(event.get());
+                return super.evict(event);
+            }
+
+        };
+        windowManager.setEvictionPolicy(evictionPolicy);
+        windowManager.setTriggerPolicy(new WatermarkTimeTriggerPolicy<Integer>(10, windowManager, evictionPolicy, windowManager));
+
+        windowManager.add(1, 603);
+        windowManager.add(2, 605);
+        windowManager.add(3, 607);
+        windowManager.add(4, 618);
+        windowManager.add(5, 626);
+        windowManager.add(6, 629);
+        windowManager.add(7, 636);
+        windowManager.add(8, 637);
+        windowManager.add(9, 638);
+        windowManager.add(10, 639);
+
+        // send a watermark event, which should trigger three windows.
+        windowManager.add(new WaterMarkEvent<Integer>(631));
+
+        assertEquals(3, listener.allOnActivationEvents.size());
+        assertEquals(seq(1, 3), listener.allOnActivationEvents.get(0));
+        assertEquals(seq(1, 4), listener.allOnActivationEvents.get(1));
+
+        // out of order events should be processed upto the lag
+        assertEquals(Arrays.asList(4, 5, 6), listener.allOnActivationEvents.get(2));
+
+        // events 8, 9, 10 should not be scanned at all since TimeEvictionPolicy lag 5s should break
+        // the WindowManager scan loop early.
+        assertEquals(new HashSet<>(seq(1, 7)), eventsScanned);
+    }
+
     private List<Integer> seq(int start) {
         return seq(start, start);
     }


[3/7] storm git commit: Fix timing issues in unit tests

Posted by sr...@apache.org.
Fix timing issues in unit tests

Fix timing issues in WaterMarkEventGeneratorTest and WindowedBoltExecutorTest by triggering manually


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fa483838
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fa483838
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fa483838

Branch: refs/heads/master
Commit: fa4838381a121a2b83f0a01681296107400a198d
Parents: 06d4bb6
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Tue Dec 8 01:22:48 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Tue Dec 8 01:46:21 2015 +0530

----------------------------------------------------------------------
 .../backtype/storm/topology/WindowedBoltExecutor.java  |  3 ++-
 .../storm/topology/WindowedBoltExecutorTest.java       |  7 +++++--
 .../storm/windowing/WaterMarkEventGeneratorTest.java   | 13 +++++++------
 3 files changed, 14 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fa483838/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
index ff2278e..7b45cbc 100644
--- a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
@@ -59,7 +59,8 @@ public class WindowedBoltExecutor implements IRichBolt {
     private transient WindowManager<Tuple> windowManager;
     private transient int maxLagMs;
     private transient String tupleTsFieldName;
-    private transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
+    // package level for unit tests
+    transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
 
     public WindowedBoltExecutor(IWindowedBolt bolt) {
         this.bolt = bolt;

http://git-wip-us.apache.org/repos/asf/storm/blob/fa483838/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java b/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
index 1c59f3a..d856a99 100644
--- a/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
+++ b/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
@@ -29,6 +29,7 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.TupleImpl;
 import backtype.storm.tuple.Values;
 import backtype.storm.windowing.TupleWindow;
+import backtype.storm.windowing.WaterMarkEvent;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -104,7 +105,8 @@ public class WindowedBoltExecutorTest {
         conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10);
         conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME, "ts");
         conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5);
-        conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 100);
+        // trigger manually to avoid timing issues
+        conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 100000);
         executor.prepare(conf, getTopologyContext(), getOutputCollector());
     }
 
@@ -119,7 +121,8 @@ public class WindowedBoltExecutorTest {
         for (long ts : timstamps) {
             executor.execute(getTuple("s1", new Fields("ts"), new Values(ts)));
         }
-        Thread.sleep(120);
+        //Thread.sleep(120);
+        executor.waterMarkEventGenerator.run();
         //System.out.println(testWindowedBolt.tupleWindows);
         assertEquals(3, testWindowedBolt.tupleWindows.size());
         TupleWindow first = testWindowedBolt.tupleWindows.get(0);

http://git-wip-us.apache.org/repos/asf/storm/blob/fa483838/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java b/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
index 00201be..2e52236 100644
--- a/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
+++ b/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
@@ -50,7 +50,8 @@ public class WaterMarkEventGeneratorTest {
                 eventList.add(event);
             }
         };
-        waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 50, 5,
+        // set watermark interval to a high value and trigger manually to fix timing issues
+        waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 100000, 5,
                                                                 Collections.singleton(streamId("s1")));
     }
 
@@ -58,7 +59,7 @@ public class WaterMarkEventGeneratorTest {
     public void testTrackSingleStream() throws Exception {
         waterMarkEventGenerator.track(streamId("s1"), 100);
         waterMarkEventGenerator.track(streamId("s1"), 110);
-        Thread.sleep(60);
+        waterMarkEventGenerator.run();
         assertTrue(eventList.get(0).isWatermark());
         assertEquals(105, eventList.get(0).getTimestamp());
     }
@@ -68,7 +69,7 @@ public class WaterMarkEventGeneratorTest {
         waterMarkEventGenerator.track(streamId("s1"), 100);
         waterMarkEventGenerator.track(streamId("s1"), 110);
         waterMarkEventGenerator.track(streamId("s1"), 104);
-        Thread.sleep(60);
+        waterMarkEventGenerator.run();
         assertTrue(eventList.get(0).isWatermark());
         assertEquals(105, eventList.get(0).getTimestamp());
     }
@@ -82,18 +83,18 @@ public class WaterMarkEventGeneratorTest {
 
         waterMarkEventGenerator.track(streamId("s1"), 100);
         waterMarkEventGenerator.track(streamId("s1"), 110);
-        Thread.sleep(60);
+        waterMarkEventGenerator.run();
         assertTrue(eventList.isEmpty());
         waterMarkEventGenerator.track(streamId("s2"), 95);
         waterMarkEventGenerator.track(streamId("s2"), 98);
-        Thread.sleep(60);
+        waterMarkEventGenerator.run();
         assertTrue(eventList.get(0).isWatermark());
         assertEquals(93, eventList.get(0).getTimestamp());
     }
 
     @Test
     public void testNoEvents() throws Exception {
-        Thread.sleep(60);
+        waterMarkEventGenerator.run();
         assertTrue(eventList.isEmpty());
     }
 }


[2/7] storm git commit: STORM-1187 Support windowing based on tuple ts

Posted by sr...@apache.org.
STORM-1187 Support windowing based on tuple ts

Support for doing window calculations based to tuple timestamps and handle
out of order events based on time lag.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/06d4bb6e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/06d4bb6e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/06d4bb6e

Branch: refs/heads/master
Commit: 06d4bb6ef10a11e19e1d718a76d7062ef5f69834
Parents: a8d253a
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Fri Nov 13 18:06:56 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Mon Dec 7 01:02:56 2015 +0530

----------------------------------------------------------------------
 docs/documentation/Windowing.md                 |  90 ++++++
 .../storm/starter/SlidingTupleTsTopology.java   |  62 ++++
 .../storm/starter/SlidingWindowTopology.java    |  81 +-----
 .../starter/bolt/SlidingWindowSumBolt.java      |  80 ++++++
 .../storm/starter/spout/RandomIntegerSpout.java |  55 ++++
 .../src/clj/backtype/storm/daemon/executor.clj  |   2 +
 storm-core/src/jvm/backtype/storm/Config.java   |  30 +-
 .../storm/topology/WindowedBoltExecutor.java    | 106 ++++++-
 .../storm/topology/base/BaseWindowedBolt.java   |  33 +++
 .../storm/windowing/CountEvictionPolicy.java    |  17 +-
 .../storm/windowing/CountTriggerPolicy.java     |  11 +-
 .../src/jvm/backtype/storm/windowing/Event.java |   8 +
 .../jvm/backtype/storm/windowing/EventImpl.java |  13 +
 .../storm/windowing/EvictionPolicy.java         |  38 ++-
 .../storm/windowing/TimeEvictionPolicy.java     |  39 ++-
 .../storm/windowing/TimeTriggerPolicy.java      |  13 +
 .../storm/windowing/TriggerHandler.java         |   6 +-
 .../storm/windowing/WaterMarkEvent.java         |  38 +++
 .../windowing/WaterMarkEventGenerator.java      | 110 ++++++++
 .../windowing/WatermarkCountEvictionPolicy.java |  65 +++++
 .../windowing/WatermarkCountTriggerPolicy.java  |  83 ++++++
 .../windowing/WatermarkTimeEvictionPolicy.java  |  77 +++++
 .../windowing/WatermarkTimeTriggerPolicy.java   | 109 ++++++++
 .../backtype/storm/windowing/WindowManager.java | 153 +++++++---
 .../topology/WindowedBoltExecutorTest.java      | 139 +++++++++
 .../windowing/WaterMarkEventGeneratorTest.java  |  99 +++++++
 .../storm/windowing/WindowManagerTest.java      | 280 +++++++++++++++++--
 27 files changed, 1664 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/docs/documentation/Windowing.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Windowing.md b/docs/documentation/Windowing.md
index 8f9d758..54e5e2d 100644
--- a/docs/documentation/Windowing.md
+++ b/docs/documentation/Windowing.md
@@ -126,6 +126,96 @@ Time duration based tumbling window that tumbles after the specified time durati
 
 ```
 
+## Tuple timestamp and out of order tuples
+By default the timestamp tracked in the window is the time when the tuple is processed by the bolt. The window calculations
+are performed based on the processing timestamp. Storm has support for tracking windows based on the source generated timestamp.
+
+```java
+/**
+* Specify a field in the tuple that represents the timestamp as a long value. If this
+* field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
+*
+* @param fieldName the name of the field that contains the timestamp
+*/
+public BaseWindowedBolt withTimestampField(String fieldName)
+```
+
+The value for the above `fieldName` will be looked up from the incoming tuple and considered for windowing calculations. 
+If the field is not present in the tuple an exception will be thrown. Along with the timestamp field name, a time lag parameter 
+can also be specified which indicates the max time limit for tuples with out of order timestamps. 
+
+E.g. If the lag is 5 secs and a tuple `t1` arrived with timestamp `06:00:05` no tuples may arrive with tuple timestamp earlier than `06:00:00`. If a tuple
+arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`, it will be treated as a late tuple and not processed. 
+
+```java
+/**
+* Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
+* cannot be out of order by more than this amount.
+*
+* @param duration the max lag duration
+*/
+public BaseWindowedBolt withLag(Duration duration)
+```
+
+### Watermarks
+For processing tuples with timestamp field, storm internally computes watermarks based on the incoming tuple timestamp. Watermark is 
+the minimum of the latest tuple timestamps (minus the lag) across all the input streams. At a higher level this is similar to the watermark concept
+used by Flink and Google's MillWheel for tracking event based timestamps.
+
+Periodically (default every sec), the watermark timestamps are emitted and this is considered as the clock tick for the window calculation if 
+tuple based timestamps are in use. The interval at which watermarks are emitted can be changed with the below api.
+ 
+```java
+/**
+* Specify the watermark event generation interval. For tuple based timestamps, watermark events
+* are used to track the progress of time
+*
+* @param interval the interval at which watermark events are generated
+*/
+public BaseWindowedBolt withWatermarkInterval(Duration interval)
+```
+
+
+When a watermark is received, all windows up to that timestamp will be evaluated.
+
+For example, consider tuple timestamp based processing with following window parameters,
+
+`Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s`
+
+```
+|-----|-----|-----|-----|-----|-----|-----|
+0     10    20    30    40    50    60    70
+````
+
+Current ts = `09:00:00`
+
+Tuples `e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36)` are received between `9:00:00` and `9:00:01`
+
+At time t = `09:00:01`, watermark w1 = `6:00:31` is emitted since no tuples earlier than `6:00:31` can arrive.
+
+Three windows will be evaluated. The first window end ts (06:00:10) is computed by taking the earliest event timestamp (06:00:03) 
+and computing the ceiling based on the sliding interval (10s).
+
+1. `5:59:50 - 06:00:10` with tuples e1, e2, e3
+2. `6:00:00 - 06:00:20` with tuples e1, e2, e3, e4
+3. `6:00:10 - 06:00:30` with tuples e4, e5
+
+e6 is not evaluated since watermark timestamp `6:00:31` is older than the tuple ts `6:00:36`.
+
+Tuples `e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)` are received between `9:00:01` and `9:00:02`
+
+At time t = `09:00:02` another watermark w2 = `08:00:34` is emitted since no tuples earlier than `8:00:34` can arrive now.
+
+Three windows will be evaluated,
+
+1. `6:00:20 - 06:00:40` with tuples e5, e6 (from earlier batch)
+2. `6:00:30 - 06:00:50` with tuple e6 (from earlier batch)
+3. `8:00:10 - 08:00:30` with tuples e7, e8, e9
+
+e10 is not evaluated since the tuple ts `8:00:39` is beyond the watermark time `8:00:34`.
+
+The window calculation considers the time gaps and computes the windows based on the tuple timestamp.
+
 ## Guarentees
 The windowing functionality in storm core currently provides at-least once guarentee. The values emitted from the bolts
 `execute(TupleWindow inputWindow)` method are automatically anchored to all the tuples in the inputWindow. The downstream

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java b/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java
new file mode 100644
index 0000000..598335d
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseWindowedBolt;
+import backtype.storm.utils.Utils;
+import storm.starter.bolt.PrinterBolt;
+import storm.starter.bolt.SlidingWindowSumBolt;
+import storm.starter.spout.RandomIntegerSpout;
+
+import java.util.concurrent.TimeUnit;
+
+import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
+
+/**
+ * Windowing based on tuple timestamp (e.g. the time when tuple is generated
+ * rather than when its processed).
+ */
+public class SlidingTupleTsTopology {
+    public static void main(String[] args) throws Exception {
+        TopologyBuilder builder = new TopologyBuilder();
+        BaseWindowedBolt bolt = new SlidingWindowSumBolt()
+                .withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS))
+                .withTimestampField("ts")
+                .withLag(new Duration(5, TimeUnit.SECONDS));
+        builder.setSpout("integer", new RandomIntegerSpout(), 1);
+        builder.setBolt("slidingsum", bolt, 1).shuffleGrouping("integer");
+        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingsum");
+        Config conf = new Config();
+        conf.setDebug(true);
+
+        if (args != null && args.length > 0) {
+            conf.setNumWorkers(1);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+        } else {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", conf, builder.createTopology());
+            Utils.sleep(40000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java b/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
index beee8de..5031f8d 100644
--- a/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
@@ -20,12 +20,10 @@ package storm.starter;
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.StormSubmitter;
-import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.topology.base.BaseWindowedBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
@@ -35,10 +33,11 @@ import backtype.storm.windowing.TupleWindow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.starter.bolt.PrinterBolt;
+import storm.starter.bolt.SlidingWindowSumBolt;
+import storm.starter.spout.RandomIntegerSpout;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 
 import static backtype.storm.topology.base.BaseWindowedBolt.Count;
 
@@ -51,79 +50,6 @@ public class SlidingWindowTopology {
     private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowTopology.class);
 
     /*
-     * emits a random integer every 100 ms
-     */
-
-    private static class RandomIntegerSpout extends BaseRichSpout {
-        SpoutOutputCollector collector;
-        Random rand;
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("value"));
-        }
-
-        @Override
-        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-            this.collector = collector;
-            this.rand = new Random();
-        }
-
-        @Override
-        public void nextTuple() {
-            Utils.sleep(100);
-            collector.emit(new Values(rand.nextInt(1000)));
-        }
-    }
-
-    /*
-     * Computes sliding window sum
-     */
-    private static class SlidingWindowSumBolt extends BaseWindowedBolt {
-        private int sum = 0;
-        private OutputCollector collector;
-
-        @Override
-        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-            this.collector = collector;
-        }
-
-        @Override
-        public void execute(TupleWindow inputWindow) {
-            /*
-             * The inputWindow gives a view of
-             * (a) all the events in the window
-             * (b) events that expired since last activation of the window
-             * (c) events that newly arrived since last activation of the window
-             */
-            List<Tuple> tuplesInWindow = inputWindow.get();
-            List<Tuple> newTuples = inputWindow.getNew();
-            List<Tuple> expiredTuples = inputWindow.getExpired();
-
-            LOG.debug("Events in current window: " + tuplesInWindow.size());
-            /*
-             * Instead of iterating over all the tuples in the window to compute
-             * the sum, the values for the new events are added and old events are
-             * subtracted. Similar optimizations might be possible in other
-             * windowing computations.
-             */
-            for (Tuple tuple : newTuples) {
-                sum += (int) tuple.getValue(0);
-            }
-            for (Tuple tuple : expiredTuples) {
-                sum -= (int) tuple.getValue(0);
-            }
-            collector.emit(new Values(sum));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("sum"));
-        }
-    }
-
-
-    /*
      * Computes tumbling window average
      */
     private static class TumblingWindowAvgBolt extends BaseWindowedBolt {
@@ -168,13 +94,10 @@ public class SlidingWindowTopology {
         builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
         Config conf = new Config();
         conf.setDebug(true);
-
         if (args != null && args.length > 0) {
             conf.setNumWorkers(1);
-
             StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
         } else {
-
             LocalCluster cluster = new LocalCluster();
             cluster.submitTopology("test", conf, builder.createTopology());
             Utils.sleep(40000);

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java
new file mode 100644
index 0000000..ef3a0b8
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseWindowedBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.windowing.TupleWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Computes sliding window sum
+ */
+public class SlidingWindowSumBolt extends BaseWindowedBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowSumBolt.class);
+
+    private int sum = 0;
+    private OutputCollector collector;
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(TupleWindow inputWindow) {
+            /*
+             * The inputWindow gives a view of
+             * (a) all the events in the window
+             * (b) events that expired since last activation of the window
+             * (c) events that newly arrived since last activation of the window
+             */
+        List<Tuple> tuplesInWindow = inputWindow.get();
+        List<Tuple> newTuples = inputWindow.getNew();
+        List<Tuple> expiredTuples = inputWindow.getExpired();
+
+        LOG.debug("Events in current window: " + tuplesInWindow.size());
+            /*
+             * Instead of iterating over all the tuples in the window to compute
+             * the sum, the values for the new events are added and old events are
+             * subtracted. Similar optimizations might be possible in other
+             * windowing computations.
+             */
+        for (Tuple tuple : newTuples) {
+            sum += (int) tuple.getValue(0);
+        }
+        for (Tuple tuple : expiredTuples) {
+            sum -= (int) tuple.getValue(0);
+        }
+        collector.emit(new Values(sum));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("sum"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java b/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java
new file mode 100644
index 0000000..5778c8e
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter.spout;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Emits a random integer and a timestamp value (offset by one day),
+ * every 100 ms. The ts field can be used in tuple time based windowing.
+ */
+public class RandomIntegerSpout extends BaseRichSpout {
+    private SpoutOutputCollector collector;
+    private Random rand;
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("value", "ts"));
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.collector = collector;
+        this.rand = new Random();
+    }
+
+    @Override
+    public void nextTuple() {
+        Utils.sleep(100);
+        collector.emit(new Values(rand.nextInt(1000), System.currentTimeMillis() - (24 * 60 * 60 * 1000)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 2aeb5e7..58b56cf 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -171,6 +171,8 @@
                         TOPOLOGY-BOLTS-WINDOW-LENGTH-DURATION-MS
                         TOPOLOGY-BOLTS-SLIDING-INTERVAL-COUNT
                         TOPOLOGY-BOLTS-SLIDING-INTERVAL-DURATION-MS
+                        TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-FIELD-NAME
+                        TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-MAX-LAG-MS
                         )
         spec-conf (-> general-context
                       (.getComponentCommon component-id)

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index a1da8fe..4f86786 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1526,19 +1526,45 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS = "topology.bolts.window.length.duration.ms";
 
     /*
-     * Bolt-specific configuration for windowed bolts to specifiy the sliding interval as a count of number of tuples.
+     * Bolt-specific configuration for windowed bolts to specify the sliding interval as a count of number of tuples.
      */
     @isInteger
     @isPositiveNumber
     public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT = "topology.bolts.window.sliding.interval.count";
 
     /*
-     * Bolt-specific configuration for windowed bolts to specifiy the sliding interval in time duration.
+     * Bolt-specific configuration for windowed bolts to specify the sliding interval in time duration.
      */
     @isInteger
     @isPositiveNumber
     public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS = "topology.bolts.window.sliding.interval.duration.ms";
 
+    /*
+     * Bolt-specific configuration for windowed bolts to specify the name of the field in the tuple that holds
+     * the timestamp (e.g. the ts when the tuple was actually generated). If this config is specified and the
+     * field is not present in the incoming tuple, a java.lang.IllegalArgumentException will be thrown.
+     */
+    @isString
+    public static final String TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME = "topology.bolts.tuple.timestamp.field.name";
+
+    /*
+     * Bolt-specific configuration for windowed bolts to specify the maximum time lag of the tuple timestamp
+     * in milliseconds. It means that the tuple timestamps cannot be out of order by more than this amount.
+     * This config will be effective only if the TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME is also specified.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS = "topology.bolts.tuple.timestamp.max.lag.ms";
+
+    /*
+     * Bolt-specific configuration for windowed bolts to specify the time interval for generating
+     * watermark events. Watermark event tracks the progress of time when tuple timestamp is used.
+     * This config is effective only if TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME is also specified.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS = "topology.bolts.watermark.event.interval.ms";
+
     /**
      * This config is available for TransactionalSpouts, and contains the id ( a String) for
      * the transactional topology. This id is used to store the state of the transactional

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
index c7d6f70..ff2278e 100644
--- a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
@@ -22,7 +22,18 @@ import backtype.storm.task.IOutputCollector;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.windowing.CountEvictionPolicy;
+import backtype.storm.windowing.CountTriggerPolicy;
+import backtype.storm.windowing.EvictionPolicy;
+import backtype.storm.windowing.TimeEvictionPolicy;
+import backtype.storm.windowing.TimeTriggerPolicy;
+import backtype.storm.windowing.TriggerPolicy;
 import backtype.storm.windowing.TupleWindowImpl;
+import backtype.storm.windowing.WaterMarkEventGenerator;
+import backtype.storm.windowing.WatermarkCountEvictionPolicy;
+import backtype.storm.windowing.WatermarkCountTriggerPolicy;
+import backtype.storm.windowing.WatermarkTimeEvictionPolicy;
+import backtype.storm.windowing.WatermarkTimeTriggerPolicy;
 import backtype.storm.windowing.WindowLifecycleListener;
 import backtype.storm.windowing.WindowManager;
 import org.slf4j.Logger;
@@ -40,11 +51,15 @@ import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
  */
 public class WindowedBoltExecutor implements IRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
-
-    private IWindowedBolt bolt;
+    private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
+    private static final int DEFAULT_MAX_LAG_MS = 0; // no lag
+    private final IWindowedBolt bolt;
     private transient WindowedOutputCollector windowedOutputCollector;
     private transient WindowLifecycleListener<Tuple> listener;
     private transient WindowManager<Tuple> windowManager;
+    private transient int maxLagMs;
+    private transient String tupleTsFieldName;
+    private transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
 
     public WindowedBoltExecutor(IWindowedBolt bolt) {
         this.bolt = bolt;
@@ -93,6 +108,10 @@ public class WindowedBoltExecutor implements IRichBolt {
 
         int topologyTimeout = getTopologyTimeoutMillis(stormConf);
         int maxSpoutPending = getMaxSpoutPending(stormConf);
+        if (windowLengthCount == null && windowLengthDuration == null) {
+            throw new IllegalArgumentException("Window length is not specified");
+        }
+
         if (windowLengthDuration != null && slidingIntervalDuration != null) {
             ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout);
         } else if (windowLengthDuration != null) {
@@ -110,12 +129,14 @@ public class WindowedBoltExecutor implements IRichBolt {
         }
     }
 
-    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf) {
+    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf,
+                                                   TopologyContext context) {
         WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener);
         Duration windowLengthDuration = null;
         Count windowLengthCount = null;
         Duration slidingIntervalDuration = null;
         Count slidingIntervalCount = null;
+        // window length
         if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
             windowLengthCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
         } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
@@ -123,7 +144,7 @@ public class WindowedBoltExecutor implements IRichBolt {
                     ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
                     TimeUnit.MILLISECONDS);
         }
-
+        // sliding interval
         if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
             slidingIntervalCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
         } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
@@ -132,20 +153,73 @@ public class WindowedBoltExecutor implements IRichBolt {
             // default is a sliding window of count 1
             slidingIntervalCount = new Count(1);
         }
+        // tuple ts
+        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME)) {
+            tupleTsFieldName = (String) stormConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME);
+            // max lag
+            if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) {
+                maxLagMs = ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue();
+            } else {
+                maxLagMs = DEFAULT_MAX_LAG_MS;
+            }
+            // watermark interval
+            int watermarkInterval;
+            if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) {
+                watermarkInterval = ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue();
+            } else {
+                watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS;
+            }
+            waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval,
+                                                                    maxLagMs, context.getThisSources().keySet());
+        }
         // validate
         validate(stormConf, windowLengthCount, windowLengthDuration,
                  slidingIntervalCount, slidingIntervalDuration);
-        if (windowLengthCount != null) {
-            manager.setWindowLength(windowLengthCount);
+        EvictionPolicy<Tuple> evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration,
+                                                                 manager);
+        TriggerPolicy<Tuple> triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration,
+                                                              manager, evictionPolicy);
+        manager.setEvictionPolicy(evictionPolicy);
+        manager.setTriggerPolicy(triggerPolicy);
+        return manager;
+    }
+
+    private boolean isTupleTs() {
+        return tupleTsFieldName != null;
+    }
+
+    private TriggerPolicy<Tuple> getTriggerPolicy(Count slidingIntervalCount, Duration slidingIntervalDuration,
+                                                  WindowManager<Tuple> manager, EvictionPolicy<Tuple> evictionPolicy) {
+        if (slidingIntervalCount != null) {
+            if (isTupleTs()) {
+                return new WatermarkCountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy, manager);
+            } else {
+                return new CountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy);
+            }
         } else {
-            manager.setWindowLength(windowLengthDuration);
+            if (isTupleTs()) {
+                return new WatermarkTimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy, manager);
+            } else {
+                return new TimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy);
+            }
         }
-        if (slidingIntervalCount != null) {
-            manager.setSlidingInterval(slidingIntervalCount);
+    }
+
+    private EvictionPolicy<Tuple> getEvictionPolicy(Count windowLengthCount, Duration windowLengthDuration,
+                                                    WindowManager<Tuple> manager) {
+        if (windowLengthCount != null) {
+            if (isTupleTs()) {
+                return new WatermarkCountEvictionPolicy<>(windowLengthCount.value, manager);
+            } else {
+                return new CountEvictionPolicy<>(windowLengthCount.value);
+            }
         } else {
-            manager.setSlidingInterval(slidingIntervalDuration);
+            if (isTupleTs()) {
+                return new WatermarkTimeEvictionPolicy<>(windowLengthDuration.value, maxLagMs);
+            } else {
+                return new TimeEvictionPolicy<>(windowLengthDuration.value);
+            }
         }
-        return manager;
     }
 
     @Override
@@ -153,13 +227,19 @@ public class WindowedBoltExecutor implements IRichBolt {
         this.windowedOutputCollector = new WindowedOutputCollector(collector);
         bolt.prepare(stormConf, context, windowedOutputCollector);
         this.listener = newWindowLifecycleListener();
-        this.windowManager = initWindowManager(listener, stormConf);
+        this.windowManager = initWindowManager(listener, stormConf, context);
         LOG.debug("Initialized window manager {} ", this.windowManager);
     }
 
     @Override
     public void execute(Tuple input) {
-        windowManager.add(input);
+        if (isTupleTs()) {
+            long ts = input.getLongByField(tupleTsFieldName);
+            windowManager.add(input, ts);
+            waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts);
+        } else {
+            windowManager.add(input);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java b/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
index fd4af90..2f49661 100644
--- a/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
+++ b/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
@@ -157,6 +157,39 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
         return withWindowLength(duration).withSlidingInterval(duration);
     }
 
+    /**
+     * Specify a field in the tuple that represents the timestamp as a long value. If this
+     * field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
+     *
+     * @param fieldName the name of the field that contains the timestamp
+     */
+    public BaseWindowedBolt withTimestampField(String fieldName) {
+        windowConfiguration.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME, fieldName);
+        return this;
+    }
+
+    /**
+     * Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
+     * cannot be out of order by more than this amount.
+     *
+     * @param duration the max lag duration
+     */
+    public BaseWindowedBolt withLag(Duration duration) {
+        windowConfiguration.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, duration.value);
+        return this;
+    }
+
+    /**
+     * Specify the watermark event generation interval. For tuple based timestamps, watermark events
+     * are used to track the progress of time
+     *
+     * @param interval the interval at which watermark events are generated
+     */
+    public BaseWindowedBolt withWatermarkInterval(Duration interval) {
+        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, interval.value);
+        return this;
+    }
+
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
         // NOOP

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
index a6779a8..a49702c 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class CountEvictionPolicy<T> implements EvictionPolicy<T> {
     private final int threshold;
-    private final AtomicInteger currentCount;
+    protected final AtomicInteger currentCount;
 
     public CountEvictionPolicy(int count) {
         this.threshold = count;
@@ -35,7 +35,7 @@ public class CountEvictionPolicy<T> implements EvictionPolicy<T> {
     }
 
     @Override
-    public boolean evict(Event<T> event) {
+    public Action evict(Event<T> event) {
         /*
          * atomically decrement the count if its greater than threshold and
          * return if the event should be evicted
@@ -44,18 +44,25 @@ public class CountEvictionPolicy<T> implements EvictionPolicy<T> {
             int curVal = currentCount.get();
             if (curVal > threshold) {
                 if (currentCount.compareAndSet(curVal, curVal - 1)) {
-                    return true;
+                    return Action.EXPIRE;
                 }
             } else {
                 break;
             }
         }
-        return false;
+        return Action.PROCESS;
     }
 
     @Override
     public void track(Event<T> event) {
-        currentCount.incrementAndGet();
+        if (!event.isWatermark()) {
+            currentCount.incrementAndGet();
+        }
+    }
+
+    @Override
+    public void setContext(Object context) {
+        // NOOP
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
index 3b1bf9f..ffe1b90 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
@@ -29,17 +29,22 @@ public class CountTriggerPolicy<T> implements TriggerPolicy<T> {
     private final int count;
     private final AtomicInteger currentCount;
     private final TriggerHandler handler;
+    private final EvictionPolicy<T> evictionPolicy;
 
-    public CountTriggerPolicy(int count, TriggerHandler handler) {
+    public CountTriggerPolicy(int count, TriggerHandler handler, EvictionPolicy<T> evictionPolicy) {
         this.count = count;
         this.currentCount = new AtomicInteger();
         this.handler = handler;
+        this.evictionPolicy = evictionPolicy;
     }
 
     @Override
     public void track(Event<T> event) {
-        if (currentCount.incrementAndGet() >= count) {
-            handler.onTrigger();
+        if (!event.isWatermark()) {
+            if (currentCount.incrementAndGet() >= count) {
+                evictionPolicy.setContext(System.currentTimeMillis());
+                handler.onTrigger();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/Event.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/Event.java b/storm-core/src/jvm/backtype/storm/windowing/Event.java
index 4855701..a7bcb9b 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/Event.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/Event.java
@@ -38,4 +38,12 @@ interface Event<T> {
      * @return the wrapped object.
      */
     T get();
+
+    /**
+     * If this is a watermark event or not. Watermark events are used
+     * for tracking time while processing event based ts.
+     *
+     * @return true if this is a watermark event
+     */
+    boolean isWatermark();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java b/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
index 09c974c..035f41b 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
@@ -35,4 +35,17 @@ class EventImpl<T> implements Event<T> {
     public T get() {
         return event;
     }
+
+    @Override
+    public boolean isWatermark() {
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return "EventImpl{" +
+                "event=" + event +
+                ", ts=" + ts +
+                '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
index 8820e92..c3b3032 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
@@ -25,12 +25,36 @@ package backtype.storm.windowing;
  */
 public interface EvictionPolicy<T> {
     /**
-     * Decides if an event should be evicted from the window or not.
+     * The action to be taken when {@link EvictionPolicy#evict(Event)} is invoked.
+     */
+    enum Action {
+        /**
+         * expire the event and remove it from the queue
+         */
+        EXPIRE,
+        /**
+         * process the event in the current window of events
+         */
+        PROCESS,
+        /**
+         * don't include in the current window but keep the event
+         * in the queue for evaluating as a part of future windows
+         */
+        KEEP,
+        /**
+         * stop processing the queue, there cannot be anymore events
+         * satisfying the eviction policy
+         */
+        STOP
+    }
+    /**
+     * Decides if an event should be expired from the window, processed in the current
+     * window or kept for later processing.
      *
      * @param event the input event
-     * @return true if the event should be evicted, false otherwise
+     * @return the {@link backtype.storm.windowing.EvictionPolicy.Action} to be taken based on the input event
      */
-    boolean evict(Event<T> event);
+    Action evict(Event<T> event);
 
     /**
      * Tracks the event to later decide whether
@@ -39,4 +63,12 @@ public interface EvictionPolicy<T> {
      * @param event the input event to be tracked
      */
     void track(Event<T> event);
+
+    /**
+     * Sets a context in the eviction policy that can be used while evicting the events.
+     * E.g. For TimeEvictionPolicy, this could be used to set the reference timestamp.
+     *
+     * @param context
+     */
+    void setContext(Object context);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
index 16408f3..117df92 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
@@ -21,21 +21,34 @@ package backtype.storm.windowing;
  * Eviction policy that evicts events based on time duration.
  */
 public class TimeEvictionPolicy<T> implements EvictionPolicy<T> {
-    private final long duration;
+    private final int windowLength;
+    /**
+     * The reference time in millis for window calculations and
+     * expiring events. If not set it will default to System.currentTimeMillis()
+     */
+    protected Long referenceTime;
 
-    public TimeEvictionPolicy(long millis) {
-        this.duration = millis;
+    /**
+     * Constructs a TimeEvictionPolicy that evicts events older
+     * than the given window length in millis
+     *
+     * @param windowLength the duration in milliseconds
+     */
+    public TimeEvictionPolicy(int windowLength) {
+        this.windowLength = windowLength;
     }
 
     /**
-     * Returns true if the event falls out of the window based on the window duration
-     *
-     * @param event
-     * @return
+     * {@inheritDoc}
      */
     @Override
-    public boolean evict(Event<T> event) {
-        return (System.currentTimeMillis() - event.getTimestamp()) >= duration;
+    public Action evict(Event<T> event) {
+        long now = referenceTime == null ? System.currentTimeMillis() : referenceTime;
+        long diff = now - event.getTimestamp();
+        if (diff >= windowLength) {
+            return Action.EXPIRE;
+        }
+        return Action.PROCESS;
     }
 
     @Override
@@ -44,9 +57,15 @@ public class TimeEvictionPolicy<T> implements EvictionPolicy<T> {
     }
 
     @Override
+    public void setContext(Object context) {
+        referenceTime = ((Number) context).longValue();
+    }
+
+    @Override
     public String toString() {
         return "TimeEvictionPolicy{" +
-                "duration=" + duration +
+                "windowLength=" + windowLength +
+                ", referenceTime=" + referenceTime +
                 '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
index db1fbeb..a32cb4d 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
@@ -37,12 +37,18 @@ public class TimeTriggerPolicy<T> implements TriggerPolicy<T> {
     private final TriggerHandler handler;
     private final ScheduledExecutorService executor;
     private final ScheduledFuture<?> executorFuture;
+    private final EvictionPolicy<T> evictionPolicy;
 
     public TimeTriggerPolicy(long millis, TriggerHandler handler) {
+        this(millis, handler, null);
+    }
+
+    public TimeTriggerPolicy(long millis, TriggerHandler handler, EvictionPolicy evictionPolicy) {
         this.duration = millis;
         this.handler = handler;
         this.executor = Executors.newSingleThreadScheduledExecutor();
         this.executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
+        this.evictionPolicy = evictionPolicy;
     }
 
     @Override
@@ -100,6 +106,13 @@ public class TimeTriggerPolicy<T> implements TriggerPolicy<T> {
             @Override
             public void run() {
                 try {
+                    /*
+                     * set the current timestamp as the reference time for the eviction policy
+                     * to evict the events
+                     */
+                    if(evictionPolicy != null) {
+                        evictionPolicy.setContext(System.currentTimeMillis());
+                    }
                     handler.onTrigger();
                 } catch (Throwable th) {
                     LOG.error("handler.onTrigger failed ", th);

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java b/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java
index f947951..29fe90d 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java
@@ -23,7 +23,9 @@ package backtype.storm.windowing;
  */
 interface TriggerHandler {
     /**
-     * the code to execute when the {@link TriggerPolicy} condition is satisfied.
+     * The code to execute when the {@link TriggerPolicy} condition is satisfied.
+     *
+     * @return true if the window was evaluated with at least one event in the window, false otherwise
      */
-    void onTrigger();
+    boolean onTrigger();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEvent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEvent.java b/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEvent.java
new file mode 100644
index 0000000..bada76d
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+/**
+ * Watermark event used for tracking progress of time when
+ * processing event based ts.
+ */
+public class WaterMarkEvent<T> extends EventImpl<T> {
+    public WaterMarkEvent(long ts) {
+        super(null, ts);
+    }
+
+    @Override
+    public boolean isWatermark() {
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "WaterMarkEvent{} " + super.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java b/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java
new file mode 100644
index 0000000..9820da6
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.topology.FailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tracks tuples across input streams and periodically emits watermark events.
+ * Watermark event timestamp is the minimum of the latest tuple timestamps
+ * across all the input streams (minus the lag). Once a watermark event is emitted
+ * any tuple coming with an earlier timestamp can be considered as late events.
+ */
+public class WaterMarkEventGenerator<T> implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(WaterMarkEventGenerator.class);
+    private final WindowManager<T> windowManager;
+    private final int eventTsLag;
+    private final Set<GlobalStreamId> inputStreams;
+    private final Map<GlobalStreamId, Long> streamToTs;
+    private final ScheduledExecutorService executorService;
+    private final ScheduledFuture<?> executorFuture;
+    private long lastWaterMarkTs = 0;
+
+    public WaterMarkEventGenerator(WindowManager<T> windowManager, int interval,
+                                   int eventTsLag, Set<GlobalStreamId> inputStreams) {
+        this.windowManager = windowManager;
+        streamToTs = new ConcurrentHashMap<>();
+        executorService = Executors.newSingleThreadScheduledExecutor();
+        this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS);
+        this.eventTsLag = eventTsLag;
+        this.inputStreams = inputStreams;
+    }
+
+    public void track(GlobalStreamId stream, long ts) {
+        Long currentVal = streamToTs.get(stream);
+        if (currentVal == null || ts > currentVal) {
+            streamToTs.put(stream, ts);
+        }
+        checkFailures();
+    }
+
+    @Override
+    public void run() {
+        try {
+            long waterMarkTs = computeWaterMarkTs();
+            if (waterMarkTs > lastWaterMarkTs) {
+                this.windowManager.add(new WaterMarkEvent<T>(waterMarkTs - eventTsLag));
+                lastWaterMarkTs = waterMarkTs;
+            }
+        } catch (Throwable th) {
+            LOG.error("Failed while processing watermark event ", th);
+            throw th;
+        }
+    }
+
+    /**
+     * Computes the min ts across all streams.
+     */
+    private long computeWaterMarkTs() {
+        long ts = Long.MIN_VALUE;
+        // only if some data has arrived on each input stream
+        if(streamToTs.size() >= inputStreams.size()) {
+            ts = Long.MAX_VALUE;
+            for (Map.Entry<GlobalStreamId, Long> entry : streamToTs.entrySet()) {
+                ts = Math.min(ts, entry.getValue());
+            }
+        }
+        return ts;
+    }
+
+    private void checkFailures() {
+        if (executorFuture.isDone()) {
+            try {
+                executorFuture.get();
+            } catch (InterruptedException ex) {
+                LOG.error("Got exception ", ex);
+                throw new FailedException(ex);
+            } catch (ExecutionException ex) {
+                LOG.error("Got exception ", ex);
+                throw new FailedException(ex.getCause());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountEvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountEvictionPolicy.java
new file mode 100644
index 0000000..0aa1c6b
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountEvictionPolicy.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+/**
+ * An eviction policy that tracks count based on watermark ts and
+ * evicts events upto the watermark based on a threshold count.
+ *
+ * @param <T> the type of event tracked by this policy.
+ */
+public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> {
+    private final WindowManager<T> windowManager;
+    /*
+     * The reference time in millis for window calculations and
+     * expiring events. If not set it will default to System.currentTimeMillis()
+     */
+    private long referenceTime;
+
+    public WatermarkCountEvictionPolicy(int count, WindowManager<T> windowManager) {
+        super(count);
+        this.windowManager = windowManager;
+    }
+
+    @Override
+    public Action evict(Event<T> event) {
+        if (event.getTimestamp() <= referenceTime) {
+            return super.evict(event);
+        } else {
+            return Action.KEEP;
+        }
+    }
+
+    @Override
+    public void track(Event<T> event) {
+        // NOOP
+    }
+
+    @Override
+    public void setContext(Object context) {
+        referenceTime = (Long) context;
+        currentCount.set(windowManager.getEventCount(referenceTime));
+    }
+
+    @Override
+    public String toString() {
+        return "WatermarkCountEvictionPolicy{" +
+                "referenceTime=" + referenceTime +
+                "} " + super.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountTriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountTriggerPolicy.java
new file mode 100644
index 0000000..510d451
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountTriggerPolicy.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import java.util.List;
+
+/**
+ * A trigger policy that tracks event counts and sets the context for
+ * eviction policy to evict based on latest watermark time.
+ *
+ * @param <T> the type of event tracked by this policy.
+ */
+public class WatermarkCountTriggerPolicy<T> implements TriggerPolicy<T> {
+    private final int count;
+    private final TriggerHandler handler;
+    private final EvictionPolicy<T> evictionPolicy;
+    private final WindowManager<T> windowManager;
+    private long lastProcessedTs = 0;
+
+    public WatermarkCountTriggerPolicy(int count, TriggerHandler handler,
+                                       EvictionPolicy<T> evictionPolicy, WindowManager<T> windowManager) {
+        this.count = count;
+        this.handler = handler;
+        this.evictionPolicy = evictionPolicy;
+        this.windowManager = windowManager;
+    }
+
+    @Override
+    public void track(Event<T> event) {
+        if (event.isWatermark()) {
+            handleWaterMarkEvent(event);
+        }
+    }
+
+    @Override
+    public void reset() {
+        // NOOP
+    }
+
+    @Override
+    public void shutdown() {
+        // NOOP
+    }
+
+    /**
+     * Triggers all the pending windows up to the waterMarkEvent timestamp
+     * based on the sliding interval count.
+     *
+     * @param waterMarkEvent the watermark event
+     */
+    private void handleWaterMarkEvent(Event<T> waterMarkEvent) {
+        long watermarkTs = waterMarkEvent.getTimestamp();
+        List<Long> eventTs = windowManager.getSlidingCountTimestamps(lastProcessedTs, watermarkTs, count);
+        for (long ts : eventTs) {
+            evictionPolicy.setContext(ts);
+            handler.onTrigger();
+            lastProcessedTs = ts;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "WatermarkCountTriggerPolicy{" +
+                "count=" + count +
+                ", lastProcessedTs=" + lastProcessedTs +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeEvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeEvictionPolicy.java
new file mode 100644
index 0000000..202726e
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeEvictionPolicy.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+/**
+ * An eviction policy that evicts events based on time duration taking
+ * watermark time and event lag into account.
+ */
+public class WatermarkTimeEvictionPolicy<T> extends TimeEvictionPolicy<T> {
+    private final int lag;
+
+    /**
+     * Constructs a WatermarkTimeEvictionPolicy that evicts events older
+     * than the given window length in millis.
+     *
+     * @param windowLength the window length in milliseconds
+     */
+    public WatermarkTimeEvictionPolicy(int windowLength) {
+        this(windowLength, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Constructs a WatermarkTimeEvictionPolicy that evicts events older
+     * than the given window length in millis. The lag parameter
+     * can be used in the case of event based ts to break the queue
+     * scan early.
+     *
+     * @param windowLength the window length in milliseconds
+     * @param lag          the max event lag in milliseconds
+     */
+    public WatermarkTimeEvictionPolicy(int windowLength, int lag) {
+        super(windowLength);
+        referenceTime = 0L;
+        this.lag = lag;
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p/>
+     * Keeps events with future ts in the queue for processing in the next
+     * window. If the ts difference is more than the lag, stops scanning
+     * the queue for the current window.
+     */
+    @Override
+    public Action evict(Event<T> event) {
+        long diff = referenceTime - event.getTimestamp();
+        if (diff < -lag) {
+            return Action.STOP;
+        } else if (diff < 0) {
+            return Action.KEEP;
+        } else {
+            return super.evict(event);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "WatermarkTimeEvictionPolicy{" +
+                "lag=" + lag +
+                "} " + super.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeTriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeTriggerPolicy.java
new file mode 100644
index 0000000..8b5cd60
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeTriggerPolicy.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles watermark events and triggers {@link TriggerHandler#onTrigger()} for each window
+ * interval that has events to be processed up to the watermark ts.
+ */
+public class WatermarkTimeTriggerPolicy<T> implements TriggerPolicy<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(WatermarkTimeTriggerPolicy.class);
+    private final long slidingIntervalMs;
+    private final TriggerHandler handler;
+    private final EvictionPolicy<T> evictionPolicy;
+    private final WindowManager<T> windowManager;
+    private long nextWindowEndTs = 0;
+
+    public WatermarkTimeTriggerPolicy(long slidingIntervalMs, TriggerHandler handler, EvictionPolicy evictionPolicy,
+                                      WindowManager<T> windowManager) {
+        this.slidingIntervalMs = slidingIntervalMs;
+        this.handler = handler;
+        this.evictionPolicy = evictionPolicy;
+        this.windowManager = windowManager;
+    }
+
+    @Override
+    public void track(Event<T> event) {
+        if (event.isWatermark()) {
+            handleWaterMarkEvent(event);
+        }
+    }
+
+    @Override
+    public void reset() {
+        // NOOP
+    }
+
+    @Override
+    public void shutdown() {
+        // NOOP
+    }
+
+    /**
+     * Invokes the trigger all pending windows up to the
+     * watermark timestamp. The end ts of the window is set
+     * in the eviction policy context so that the events falling
+     * within that window can be processed.
+     */
+    private void handleWaterMarkEvent(Event<T> event) {
+        long watermarkTs = event.getTimestamp();
+        long windowEndTs = nextWindowEndTs;
+        LOG.debug("Window end ts {} Watermark ts {}", windowEndTs, watermarkTs);
+        while (windowEndTs <= watermarkTs) {
+            evictionPolicy.setContext(windowEndTs);
+            if (handler.onTrigger()) {
+                windowEndTs += slidingIntervalMs;
+            } else {
+                /*
+                 * No events were found in the previous window interval.
+                 * Scan through the events in the queue to find the next
+                 * window intervals based on event ts.
+                 */
+                long ts = getNextAlignedWindowTs(windowEndTs, watermarkTs);
+                LOG.debug("Next aligned window end ts {}", ts);
+                if (ts == Long.MAX_VALUE) {
+                    LOG.debug("No events to process between {} and watermark ts {}", windowEndTs, watermarkTs);
+                    break;
+                }
+                windowEndTs = ts;
+            }
+        }
+        nextWindowEndTs = windowEndTs;
+    }
+
+    /**
+     * Computes the next window by scanning the events in the window and
+     * finds the next aligned window between the startTs and endTs. Return the end ts
+     * of the next aligned window, i.e. the ts when the window should fire.
+     *
+     * @param startTs the start timestamp (excluding)
+     * @param endTs the end timestamp (including)
+     * @return the aligned window end ts for the next window or Long.MAX_VALUE if there
+     * are no more events to be processed.
+     */
+    private long getNextAlignedWindowTs(long startTs, long endTs) {
+        long nextTs = windowManager.getEarliestEventTs(startTs, endTs);
+        if (nextTs == Long.MAX_VALUE || (nextTs % slidingIntervalMs == 0)) {
+            return nextTs;
+        }
+        return nextTs + (slidingIntervalMs - (nextTs % slidingIntervalMs));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java b/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
index 6603abf..b783eb0 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
@@ -17,11 +17,11 @@
  */
 package backtype.storm.windowing;
 
+import backtype.storm.windowing.EvictionPolicy.Action;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -30,8 +30,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static backtype.storm.topology.base.BaseWindowedBolt.Count;
-import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
+import static backtype.storm.windowing.EvictionPolicy.Action.EXPIRE;
+import static backtype.storm.windowing.EvictionPolicy.Action.PROCESS;
+import static backtype.storm.windowing.EvictionPolicy.Action.STOP;
 
 /**
  * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks
@@ -49,7 +50,7 @@ public class WindowManager<T> implements TriggerHandler {
     public static final int EXPIRE_EVENTS_THRESHOLD = 100;
 
     private final WindowLifecycleListener<T> windowLifecycleListener;
-    private final ConcurrentLinkedQueue<Event<T>> window;
+    private final ConcurrentLinkedQueue<Event<T>> queue;
     private final List<T> expiredEvents;
     private final Set<Event<T>> prevWindowEvents;
     private final AtomicInteger eventsSinceLastExpiry;
@@ -59,27 +60,19 @@ public class WindowManager<T> implements TriggerHandler {
 
     public WindowManager(WindowLifecycleListener<T> lifecycleListener) {
         windowLifecycleListener = lifecycleListener;
-        window = new ConcurrentLinkedQueue<>();
+        queue = new ConcurrentLinkedQueue<>();
         expiredEvents = new ArrayList<>();
         prevWindowEvents = new HashSet<>();
         eventsSinceLastExpiry = new AtomicInteger();
         lock = new ReentrantLock(true);
     }
 
-    public void setWindowLength(Count count) {
-        this.evictionPolicy = new CountEvictionPolicy<>(count.value);
+    public void setEvictionPolicy(EvictionPolicy<T> evictionPolicy) {
+        this.evictionPolicy = evictionPolicy;
     }
 
-    public void setWindowLength(Duration duration) {
-        this.evictionPolicy = new TimeEvictionPolicy<>(duration.value);
-    }
-
-    public void setSlidingInterval(Count count) {
-        this.triggerPolicy = new CountTriggerPolicy<>(count.value, this);
-    }
-
-    public void setSlidingInterval(Duration duration) {
-        this.triggerPolicy = new TimeTriggerPolicy<>(duration.value, this);
+    public void setTriggerPolicy(TriggerPolicy<T> triggerPolicy) {
+        this.triggerPolicy = triggerPolicy;
     }
 
     /**
@@ -99,8 +92,21 @@ public class WindowManager<T> implements TriggerHandler {
      * @param ts    the timestamp
      */
     public void add(T event, long ts) {
-        Event<T> windowEvent = new EventImpl<T>(event, ts);
-        window.add(windowEvent);
+        add(new EventImpl<T>(event, ts));
+    }
+
+    /**
+     * Tracks a window event
+     *
+     * @param windowEvent the window event to track
+     */
+    public void add(Event<T> windowEvent) {
+        // watermark events are not added to the queue.
+        if (!windowEvent.isWatermark()) {
+            queue.add(windowEvent);
+        } else {
+            LOG.debug("Got watermark event with ts {}", windowEvent.getTimestamp());
+        }
         track(windowEvent);
         compactWindow();
     }
@@ -109,7 +115,7 @@ public class WindowManager<T> implements TriggerHandler {
      * The callback invoked by the trigger policy.
      */
     @Override
-    public void onTrigger() {
+    public boolean onTrigger() {
         List<Event<T>> windowEvents = null;
         List<T> expired = null;
         try {
@@ -118,7 +124,7 @@ public class WindowManager<T> implements TriggerHandler {
              * scan the entire window to handle out of order events in
              * the case of time based windows.
              */
-            windowEvents = expireEvents(true);
+            windowEvents = scanEvents(true);
             expired = new ArrayList<>(expiredEvents);
             expiredEvents.clear();
         } finally {
@@ -133,10 +139,15 @@ public class WindowManager<T> implements TriggerHandler {
             }
         }
         prevWindowEvents.clear();
-        prevWindowEvents.addAll(windowEvents);
-        LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", windowEvents.size());
-        windowLifecycleListener.onActivation(events, newEvents, expired);
+        if (!events.isEmpty()) {
+            prevWindowEvents.addAll(windowEvents);
+            LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
+            windowLifecycleListener.onActivation(events, newEvents, expired);
+        } else {
+            LOG.debug("No events in the window, skipping onActivation");
+        }
         triggerPolicy.reset();
+        return !events.isEmpty();
     }
 
     public void shutdown() {
@@ -153,7 +164,7 @@ public class WindowManager<T> implements TriggerHandler {
      */
     private void compactWindow() {
         if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) {
-            expireEvents(false);
+            scanEvents(false);
         }
     }
 
@@ -167,29 +178,30 @@ public class WindowManager<T> implements TriggerHandler {
     }
 
     /**
-     * Expire events from the window, using the expiration policy to check
+     * Scan events in the queue, using the expiration policy to check
      * if the event should be evicted or not.
      *
-     * @param fullScan if set, will scan the entire window; if not set, will stop
+     * @param fullScan if set, will scan the entire queue; if not set, will stop
      *                 as soon as an event not satisfying the expiration policy is found
-     * @return the list of remaining events in the window after expiry
+     * @return the list of events to be processed as a part of the current window
      */
-    private List<Event<T>> expireEvents(boolean fullScan) {
-        LOG.debug("Expire events, eviction policy {}", evictionPolicy);
+    private List<Event<T>> scanEvents(boolean fullScan) {
+        LOG.debug("Scan events, eviction policy {}", evictionPolicy);
         List<T> eventsToExpire = new ArrayList<>();
-        List<Event<T>> remaining = new ArrayList<>();
+        List<Event<T>> eventsToProcess = new ArrayList<>();
         try {
             lock.lock();
-            Iterator<Event<T>> it = window.iterator();
+            Iterator<Event<T>> it = queue.iterator();
             while (it.hasNext()) {
                 Event<T> windowEvent = it.next();
-                if (evictionPolicy.evict(windowEvent)) {
+                Action action = evictionPolicy.evict(windowEvent);
+                if (action == EXPIRE) {
                     eventsToExpire.add(windowEvent.get());
                     it.remove();
-                } else if (!fullScan) {
+                } else if (!fullScan || action == STOP) {
                     break;
-                } else {
-                    remaining.add(windowEvent);
+                } else if (action == PROCESS) {
+                    eventsToProcess.add(windowEvent);
                 }
             }
             expiredEvents.addAll(eventsToExpire);
@@ -198,8 +210,73 @@ public class WindowManager<T> implements TriggerHandler {
         }
         eventsSinceLastExpiry.set(0);
         LOG.debug("[{}] events expired from window.", eventsToExpire.size());
-        windowLifecycleListener.onExpiry(eventsToExpire);
-        return remaining;
+        if (!eventsToExpire.isEmpty()) {
+            LOG.debug("invoking windowLifecycleListener.onExpiry");
+            windowLifecycleListener.onExpiry(eventsToExpire);
+        }
+        return eventsToProcess;
+    }
+
+    /**
+     * Scans the event queue and returns the next earliest event ts
+     * between the startTs and endTs
+     *
+     * @param startTs the start ts (exclusive)
+     * @param endTs the end ts (inclusive)
+     * @return the earliest event ts between startTs and endTs
+     */
+    public long getEarliestEventTs(long startTs, long endTs) {
+        long minTs = Long.MAX_VALUE;
+        for (Event<T> event : queue) {
+            if (event.getTimestamp() > startTs && event.getTimestamp() <= endTs) {
+                minTs = Math.min(minTs, event.getTimestamp());
+            }
+        }
+        return minTs;
+    }
+
+    /**
+     * Scans the event queue and returns number of events having
+     * timestamp less than or equal to the reference time.
+     *
+     * @param referenceTime the reference timestamp in millis
+     * @return the count of events with timestamp less than or equal to referenceTime
+     */
+    public int getEventCount(long referenceTime) {
+        int count = 0;
+        for (Event<T> event : queue) {
+            if (event.getTimestamp() <= referenceTime) {
+                ++count;
+            }
+        }
+        return count;
+    }
+
+    /**
+     * Scans the event queue and returns the list of event ts
+     * falling between startTs (exclusive) and endTs (inclusive)
+     * at each sliding interval counts.
+     *
+     * @param startTs the start timestamp (exclusive)
+     * @param endTs the end timestamp (inclusive)
+     * @param slidingCount the sliding interval count
+     * @return the list of event ts
+     */
+    public List<Long> getSlidingCountTimestamps(long startTs, long endTs, int slidingCount) {
+        List<Long> timestamps = new ArrayList<>();
+        if (endTs > startTs) {
+            int count = 0;
+            long ts = Long.MIN_VALUE;
+            for (Event<T> event : queue) {
+                if (event.getTimestamp() > startTs && event.getTimestamp() <= endTs) {
+                    ts = Math.max(ts, event.getTimestamp());
+                    if (++count % slidingCount == 0) {
+                        timestamps.add(ts);
+                    }
+                }
+            }
+        }
+        return timestamps;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java b/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
new file mode 100644
index 0000000..1c59f3a
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.topology;
+
+import backtype.storm.Config;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseWindowedBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import backtype.storm.tuple.Values;
+import backtype.storm.windowing.TupleWindow;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit tests for {@link WindowedBoltExecutor}
+ */
+public class WindowedBoltExecutorTest {
+
+    private WindowedBoltExecutor executor;
+    private TestWindowedBolt testWindowedBolt;
+
+    private static class TestWindowedBolt extends BaseWindowedBolt {
+        List<TupleWindow> tupleWindows = new ArrayList<>();
+
+        @Override
+        public void execute(TupleWindow input) {
+            //System.out.println(input);
+            tupleWindows.add(input);
+        }
+    }
+
+    private GeneralTopologyContext getContext(final Fields fields) {
+        TopologyBuilder builder = new TopologyBuilder();
+        return new GeneralTopologyContext(builder.createTopology(),
+                                          new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+            @Override
+            public Fields getComponentOutputFields(String componentId, String streamId) {
+                return fields;
+            }
+
+        };
+    }
+
+    private Tuple getTuple(String streamId, final Fields fields, Values values) {
+        return new TupleImpl(getContext(fields), values, 1, streamId) {
+            @Override
+            public GlobalStreamId getSourceGlobalStreamId() {
+                return new GlobalStreamId("s1", "default");
+            }
+        };
+    }
+
+    private OutputCollector getOutputCollector() {
+        return Mockito.mock(OutputCollector.class);
+    }
+
+    private TopologyContext getTopologyContext() {
+        TopologyContext context = Mockito.mock(TopologyContext.class);
+        Map<GlobalStreamId, Grouping> sources = Collections.singletonMap(
+                new GlobalStreamId("s1", "default"),
+                null
+        );
+        Mockito.when(context.getThisSources()).thenReturn(sources);
+        return context;
+    }
+
+    @Before
+    public void setUp() {
+        testWindowedBolt = new TestWindowedBolt();
+        executor = new WindowedBoltExecutor(testWindowedBolt);
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+        conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20);
+        conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10);
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME, "ts");
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5);
+        conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 100);
+        executor.prepare(conf, getTopologyContext(), getOutputCollector());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testExecuteWithoutTs() throws Exception {
+        executor.execute(getTuple("s1", new Fields("a"), new Values(1)));
+    }
+
+    @Test
+    public void testExecuteWithTs() throws Exception {
+        long[] timstamps = {603, 605, 607, 618, 626, 636};
+        for (long ts : timstamps) {
+            executor.execute(getTuple("s1", new Fields("ts"), new Values(ts)));
+        }
+        Thread.sleep(120);
+        //System.out.println(testWindowedBolt.tupleWindows);
+        assertEquals(3, testWindowedBolt.tupleWindows.size());
+        TupleWindow first = testWindowedBolt.tupleWindows.get(0);
+        assertArrayEquals(new long[]{603, 605, 607},
+                          new long[]{(long) first.get().get(0).getValue(0), (long)first.get().get(1).getValue(0),
+                                  (long)first.get().get(2).getValue(0)});
+
+        TupleWindow second = testWindowedBolt.tupleWindows.get(1);
+        assertArrayEquals(new long[]{603, 605, 607, 618},
+                          new long[]{(long) second.get().get(0).getValue(0), (long) second.get().get(1).getValue(0),
+                                  (long) second.get().get(2).getValue(0), (long) second.get().get(3).getValue(0)});
+
+        TupleWindow third = testWindowedBolt.tupleWindows.get(2);
+        assertArrayEquals(new long[]{618, 626},
+                          new long[]{(long) third.get().get(0).getValue(0), (long)third.get().get(1).getValue(0)});
+    }
+}
\ No newline at end of file


[7/7] storm git commit: Added STORM-1187 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-1187 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8f9ed06d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8f9ed06d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8f9ed06d

Branch: refs/heads/master
Commit: 8f9ed06db57918b16b9e96f444585cc372fe6754
Parents: 8129a8b
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Sun Dec 20 20:02:57 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sun Dec 20 20:02:57 2015 -0800

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8f9ed06d/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0323646..5213e44 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1187: Support windowing based on tuple ts.
  * STORM-1400: Netty Context removeClient() called after term() causes NullPointerException.
  * STORM-1383: Supervisors should not crash if nimbus is unavailable
  * STORM-1381: Client side topology submission hook.


[4/7] storm git commit: Merge remote-tracking branch 'upstream/master' into windowing-eventts

Posted by sr...@apache.org.
Merge remote-tracking branch 'upstream/master' into windowing-eventts


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/60ff46ea
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/60ff46ea
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/60ff46ea

Branch: refs/heads/master
Commit: 60ff46ea69bb170746f52e300f30ba931136513c
Parents: fa48383 ceb3a0c
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Wed Dec 9 15:24:42 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Wed Dec 9 15:24:42 2015 +0530

----------------------------------------------------------------------
 .travis.yml                                     |    13 +-
 CHANGELOG.md                                    |    23 +
 LICENSE                                         |     9 -
 README.markdown                                 |     9 +-
 bin/storm                                       |     2 +-
 bin/storm.py                                    |    78 +-
 conf/defaults.yaml                              |    27 +-
 dev-tools/travis/travis-install.sh              |    12 +-
 dev-tools/travis/travis-script.sh               |    23 +-
 docs/documentation/Pacemaker.md                 |   108 +
 .../starter/ResourceAwareExampleTopology.java   |     2 +-
 .../src/jvm/storm/starter/RollingTopWords.java  |     6 +-
 .../storm/starter/SkewedRollingTopWords.java    |     6 +-
 .../starter/trident/TridentKafkaWordCount.java  |    15 +-
 external/flux/README.md                         |     3 +
 .../java/org/apache/storm/flux/FluxBuilder.java |    48 +-
 .../org/apache/storm/flux/model/ObjectDef.java  |     2 +
 .../org/apache/storm/flux/test/TestBolt.java    |     4 +
 .../resources/configs/config-methods-test.yaml  |     1 +
 external/flux/flux-examples/README.md           |     9 +
 .../storm/flux/examples/TestPrintBolt.java      |    39 +
 .../storm/flux/examples/TestWindowBolt.java     |    47 +
 .../src/main/resources/simple_windowing.yaml    |    69 +
 external/sql/README.md                          |   117 +
 external/sql/pom.xml                            |    44 +
 external/sql/storm-sql-core/pom.xml             |   261 +
 .../sql/storm-sql-core/src/codegen/config.fmpp  |    23 +
 .../storm-sql-core/src/codegen/data/Parser.tdd  |    64 +
 .../src/codegen/includes/license.ftl            |    17 +
 .../src/codegen/includes/parserImpls.ftl        |    86 +
 .../src/jvm/org/apache/storm/sql/StormSql.java  |    54 +
 .../jvm/org/apache/storm/sql/StormSqlImpl.java  |   187 +
 .../org/apache/storm/sql/StormSqlRunner.java    |    44 +
 .../apache/storm/sql/compiler/CompilerUtil.java |   168 +
 .../apache/storm/sql/compiler/ExprCompiler.java |   471 +
 .../sql/compiler/PostOrderRelNodeVisitor.java   |   122 +
 .../backends/standalone/PlanCompiler.java       |   132 +
 .../backends/standalone/RelNodeCompiler.java    |   111 +
 .../compiler/backends/trident/PlanCompiler.java |   201 +
 .../backends/trident/RelNodeCompiler.java       |   116 +
 .../storm/sql/javac/CompilingClassLoader.java   |   225 +
 .../storm/sql/parser/ColumnConstraint.java      |    42 +
 .../storm/sql/parser/ColumnDefinition.java      |    44 +
 .../apache/storm/sql/parser/SqlCreateTable.java |   136 +
 .../apache/storm/sql/parser/SqlDDLKeywords.java |    27 +
 .../apache/storm/sql/parser/StormParser.java    |    42 +
 .../apache/storm/sql/parser/UnparseUtil.java    |    60 +
 .../test/org/apache/storm/sql/TestStormSql.java |    82 +
 .../storm/sql/compiler/TestCompilerUtils.java   |    64 +
 .../storm/sql/compiler/TestExprCompiler.java    |    93 +
 .../storm/sql/compiler/TestExprSemantic.java    |   140 +
 .../backends/standalone/TestPlanCompiler.java   |    69 +
 .../standalone/TestRelNodeCompiler.java         |    62 +
 .../backends/trident/TestPlanCompiler.java      |   116 +
 .../apache/storm/sql/parser/TestSqlParser.java  |    48 +
 external/sql/storm-sql-kafka/pom.xml            |   111 +
 .../org/apache/storm/sql/kafka/JsonScheme.java  |    58 +
 .../apache/storm/sql/kafka/JsonSerializer.java  |    56 +
 .../sql/kafka/KafkaDataSourcesProvider.java     |   205 +
 ...apache.storm.sql.runtime.DataSourcesProvider |    16 +
 .../storm/sql/kafka/TestJsonRepresentation.java |    50 +
 .../sql/kafka/TestKafkaDataSourcesProvider.java |   114 +
 external/sql/storm-sql-runtime/pom.xml          |    73 +
 .../sql/runtime/AbstractChannelHandler.java     |    44 +
 .../sql/runtime/AbstractValuesProcessor.java    |    49 +
 .../storm/sql/runtime/ChannelContext.java       |    30 +
 .../storm/sql/runtime/ChannelHandler.java       |    39 +
 .../org/apache/storm/sql/runtime/Channels.java  |    80 +
 .../apache/storm/sql/runtime/DataSource.java    |    29 +
 .../storm/sql/runtime/DataSourcesProvider.java  |    49 +
 .../storm/sql/runtime/DataSourcesRegistry.java  |    78 +
 .../org/apache/storm/sql/runtime/FieldInfo.java |    45 +
 .../storm/sql/runtime/IOutputSerializer.java    |    31 +
 .../sql/runtime/ISqlTridentDataSource.java      |    30 +
 .../storm/sql/runtime/StormSqlFunctions.java    |    36 +
 .../trident/AbstractTridentProcessor.java       |    43 +
 .../test/org/apache/storm/sql/TestUtils.java    |   163 +
 external/storm-cassandra/README.md              |   202 +
 external/storm-cassandra/pom.xml                |   124 +
 .../AbstractExecutionResultHandler.java         |    60 +
 .../cassandra/BaseExecutionResultHandler.java   |    85 +
 .../storm/cassandra/CassandraContext.java       |    92 +
 .../cassandra/DynamicStatementBuilder.java      |   199 +
 .../storm/cassandra/ExecutionResultHandler.java |    98 +
 .../storm/cassandra/Murmur3StreamGrouping.java  |   133 +
 .../storm/cassandra/bolt/BaseCassandraBolt.java |   194 +
 .../bolt/BatchCassandraWriterBolt.java          |   201 +
 .../cassandra/bolt/CassandraWriterBolt.java     |    72 +
 .../cassandra/bolt/GroupingBatchBuilder.java    |    68 +
 .../bolt/PairBatchStatementTuples.java          |    52 +
 .../cassandra/bolt/PairStatementTuple.java      |    50 +
 .../storm/cassandra/client/CassandraConf.java   |   202 +
 .../storm/cassandra/client/ClusterFactory.java  |    73 +
 .../storm/cassandra/client/SimpleClient.java    |    42 +
 .../cassandra/client/SimpleClientProvider.java  |    35 +
 .../cassandra/client/impl/DefaultClient.java    |   123 +
 .../cassandra/context/BaseBeanFactory.java      |    65 +
 .../storm/cassandra/context/BeanFactory.java    |    48 +
 .../storm/cassandra/context/WorkerCtx.java      |    89 +
 .../storm/cassandra/executor/AsyncExecutor.java |   153 +
 .../executor/AsyncExecutorProvider.java         |    40 +
 .../cassandra/executor/AsyncResultHandler.java  |    64 +
 .../executor/ExecutionResultCollector.java      |    99 +
 .../executor/impl/BatchAsyncResultHandler.java  |    73 +
 .../executor/impl/SingleAsyncResultHandler.java |    72 +
 .../query/BatchStatementTupleMapper.java        |    57 +
 .../cassandra/query/CQLClauseTupleMapper.java   |    36 +
 .../cassandra/query/CQLStatementBuilder.java    |    31 +
 .../query/CQLStatementTupleMapper.java          |    86 +
 .../cassandra/query/CQLTableTupleMapper.java    |    39 +
 .../cassandra/query/CQLValuesTupleMapper.java   |    74 +
 .../storm/cassandra/query/ContextQuery.java     |   101 +
 .../query/SimpleCQLStatementTupleMapper.java    |    51 +
 .../query/impl/BoundStatementMapperBuilder.java |   107 +
 .../query/impl/InsertStatementBuilder.java      |   153 +
 .../query/impl/UpdateStatementBuilder.java      |   118 +
 .../cassandra/query/selector/FieldSelector.java |    68 +
 .../cassandra/DynamicStatementBuilderTest.java  |   133 +
 .../apache/storm/cassandra/WeatherSpout.java    |    84 +
 .../storm/cassandra/bolt/BaseTopologyTest.java  |    60 +
 .../bolt/BatchCassandraWriterBoltTest.java      |    63 +
 .../cassandra/bolt/CassandraWriterBoltTest.java |    64 +
 .../src/test/resources/schema.cql               |    24 +
 external/storm-hdfs/pom.xml                     |    70 +
 .../storm/hdfs/blobstore/HdfsBlobStore.java     |   384 +
 .../storm/hdfs/blobstore/HdfsBlobStoreFile.java |   196 +
 .../storm/hdfs/blobstore/HdfsBlobStoreImpl.java |   312 +
 .../hdfs/blobstore/HdfsClientBlobStore.java     |   120 +
 .../ha/codedistributor/HDFSCodeDistributor.java |   118 -
 .../storm/hdfs/blobstore/BlobStoreTest.java     |   530 +
 .../hdfs/blobstore/HdfsBlobStoreImplTest.java   |   231 +
 external/storm-kafka/README.md                  |    37 +-
 .../jvm/storm/kafka/ByteBufferSerializer.java   |    41 +
 .../src/jvm/storm/kafka/IntSerializer.java      |    42 +
 .../src/jvm/storm/kafka/KafkaUtils.java         |     8 +-
 .../src/jvm/storm/kafka/KeyValueScheme.java     |     5 +-
 .../kafka/KeyValueSchemeAsMultiScheme.java      |     5 +-
 .../jvm/storm/kafka/MessageMetadataScheme.java  |     6 +-
 .../MessageMetadataSchemeAsMultiScheme.java     |     3 +-
 .../jvm/storm/kafka/StringKeyValueScheme.java   |     3 +-
 .../kafka/StringMessageAndMetadataScheme.java   |     7 +-
 .../storm/kafka/StringMultiSchemeWithTopic.java |    21 +-
 .../src/jvm/storm/kafka/StringScheme.java       |    20 +-
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |    13 +-
 .../storm/kafka/trident/TridentKafkaState.java  |    10 +-
 .../kafka/trident/TridentKafkaStateFactory.java |    10 +-
 .../storm/kafka/StringKeyValueSchemeTest.java   |    17 +-
 .../src/test/storm/kafka/TestStringScheme.java  |    40 +
 .../src/test/storm/kafka/TestUtils.java         |     8 +-
 .../src/test/storm/kafka/TridentKafkaTest.java  |    13 +-
 .../test/storm/kafka/TridentKafkaTopology.java  |    33 +-
 .../test/storm/kafka/bolt/KafkaBoltTest.java    |     6 +-
 external/storm-metrics/pom.xml                  |    53 +-
 .../resources/libsigar-amd64-freebsd-6.so       |   Bin 210641 -> 0 bytes
 .../resources/resources/libsigar-amd64-linux.so |   Bin 246605 -> 0 bytes
 .../resources/libsigar-amd64-solaris.so         |   Bin 251360 -> 0 bytes
 .../resources/libsigar-ia64-hpux-11.sl          |   Bin 577452 -> 0 bytes
 .../resources/resources/libsigar-ia64-linux.so  |   Bin 494929 -> 0 bytes
 .../resources/resources/libsigar-pa-hpux-11.sl  |   Bin 516096 -> 0 bytes
 .../resources/resources/libsigar-ppc-aix-5.so   |   Bin 400925 -> 0 bytes
 .../resources/resources/libsigar-ppc-linux.so   |   Bin 258547 -> 0 bytes
 .../resources/resources/libsigar-ppc64-aix-5.so |   Bin 425077 -> 0 bytes
 .../resources/resources/libsigar-ppc64-linux.so |   Bin 330767 -> 0 bytes
 .../resources/resources/libsigar-s390x-linux.so |   Bin 269932 -> 0 bytes
 .../resources/libsigar-sparc-solaris.so         |   Bin 285004 -> 0 bytes
 .../resources/libsigar-sparc64-solaris.so       |   Bin 261896 -> 0 bytes
 .../resources/libsigar-universal-macosx.dylib   |   Bin 377668 -> 0 bytes
 .../resources/libsigar-universal64-macosx.dylib |   Bin 397440 -> 0 bytes
 .../resources/libsigar-x86-freebsd-5.so         |   Bin 179751 -> 0 bytes
 .../resources/libsigar-x86-freebsd-6.so         |   Bin 179379 -> 0 bytes
 .../resources/resources/libsigar-x86-linux.so   |   Bin 233385 -> 0 bytes
 .../resources/resources/libsigar-x86-solaris.so |   Bin 242880 -> 0 bytes
 .../resources/resources/sigar-amd64-winnt.dll   |   Bin 402432 -> 0 bytes
 .../resources/resources/sigar-x86-winnt.dll     |   Bin 266240 -> 0 bytes
 .../resources/resources/sigar-x86-winnt.lib     |   Bin 99584 -> 0 bytes
 pom.xml                                         |    58 +-
 storm-core/pom.xml                              |    43 +-
 storm-core/src/clj/backtype/storm/blobstore.clj |    28 +
 storm-core/src/clj/backtype/storm/cluster.clj   |   355 +-
 .../cluster_state/zookeeper_state_factory.clj   |   161 +
 .../clj/backtype/storm/command/blobstore.clj    |   162 +
 .../clj/backtype/storm/command/heartbeats.clj   |    52 +
 storm-core/src/clj/backtype/storm/config.clj    |    30 +-
 .../src/clj/backtype/storm/daemon/common.clj    |    30 +-
 .../src/clj/backtype/storm/daemon/drpc.clj      |     6 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj |     6 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   684 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |   487 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |    32 +-
 storm-core/src/clj/backtype/storm/testing.clj   |     5 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |     8 +-
 storm-core/src/clj/backtype/storm/util.clj      |    37 +-
 storm-core/src/clj/backtype/storm/zookeeper.clj |    31 +-
 .../org/apache/storm/pacemaker/pacemaker.clj    |   241 +
 .../storm/pacemaker/pacemaker_state_factory.clj |   125 +
 storm-core/src/jvm/backtype/storm/Config.java   |   195 +-
 .../src/jvm/backtype/storm/StormSubmitter.java  |     2 +
 .../storm/blobstore/AtomicOutputStream.java     |    32 +
 .../storm/blobstore/BlobKeySequenceInfo.java    |    40 +
 .../jvm/backtype/storm/blobstore/BlobStore.java |   445 +
 .../storm/blobstore/BlobStoreAclHandler.java    |   399 +
 .../backtype/storm/blobstore/BlobStoreFile.java |    50 +
 .../storm/blobstore/BlobStoreUtils.java         |   257 +
 .../storm/blobstore/BlobSynchronizer.java       |   124 +
 .../storm/blobstore/ClientBlobStore.java        |   184 +
 .../storm/blobstore/FileBlobStoreImpl.java      |   248 +
 .../storm/blobstore/InputStreamWithMeta.java    |    26 +
 .../jvm/backtype/storm/blobstore/KeyFilter.java |    22 +
 .../storm/blobstore/KeySequenceNumber.java      |   229 +
 .../storm/blobstore/LocalFsBlobStore.java       |   323 +
 .../storm/blobstore/LocalFsBlobStoreFile.java   |   159 +
 .../storm/blobstore/NimbusBlobStore.java        |   420 +
 .../backtype/storm/cluster/ClusterState.java    |   217 +
 .../storm/cluster/ClusterStateContext.java      |    41 +
 .../storm/cluster/ClusterStateFactory.java      |    28 +
 .../storm/cluster/ClusterStateListener.java     |    22 +
 .../backtype/storm/cluster/ConnectionState.java |    24 +
 .../jvm/backtype/storm/cluster/DaemonType.java  |    27 +
 .../storm/codedistributor/ICodeDistributor.java |    73 -
 .../LocalFileSystemCodeDistributor.java         |   123 -
 .../backtype/storm/generated/AccessControl.java |   627 +
 .../storm/generated/AccessControlType.java      |    62 +
 .../backtype/storm/generated/Assignment.java    |   244 +-
 .../storm/generated/BeginDownloadResult.java    |   608 +
 .../jvm/backtype/storm/generated/BoltStats.java |   340 +-
 .../storm/generated/ClusterSummary.java         |   108 +-
 .../storm/generated/ClusterWorkerHeartbeat.java |    52 +-
 .../storm/generated/ComponentPageInfo.java      |   220 +-
 .../backtype/storm/generated/Credentials.java   |    44 +-
 .../backtype/storm/generated/ExecutorStats.java |   160 +-
 .../generated/HBAuthorizationException.java     |   406 +
 .../storm/generated/HBExecutionException.java   |   406 +
 .../jvm/backtype/storm/generated/HBMessage.java |   636 +
 .../backtype/storm/generated/HBMessageData.java |   640 +
 .../jvm/backtype/storm/generated/HBNodes.java   |   461 +
 .../jvm/backtype/storm/generated/HBPulse.java   |   522 +
 .../jvm/backtype/storm/generated/HBRecords.java |   466 +
 .../storm/generated/HBServerMessageType.java    |   113 +
 .../generated/KeyAlreadyExistsException.java    |   406 +
 .../storm/generated/KeyNotFoundException.java   |   406 +
 .../storm/generated/LSApprovedWorkers.java      |    44 +-
 .../generated/LSSupervisorAssignments.java      |    48 +-
 .../backtype/storm/generated/LSTopoHistory.java |    64 +-
 .../storm/generated/LSTopoHistoryList.java      |    36 +-
 .../storm/generated/LSWorkerHeartbeat.java      |    36 +-
 .../storm/generated/ListBlobsResult.java        |   556 +
 .../storm/generated/LocalAssignment.java        |    36 +-
 .../storm/generated/LocalStateData.java         |    48 +-
 .../jvm/backtype/storm/generated/LogConfig.java |    48 +-
 .../jvm/backtype/storm/generated/Nimbus.java    | 26917 +++++++++++++----
 .../jvm/backtype/storm/generated/NodeInfo.java  |    32 +-
 .../storm/generated/ReadableBlobMeta.java       |   510 +
 .../storm/generated/RebalanceOptions.java       |    44 +-
 .../storm/generated/SettableBlobMeta.java       |   567 +
 .../backtype/storm/generated/SpoutStats.java    |   224 +-
 .../jvm/backtype/storm/generated/StormBase.java |    92 +-
 .../backtype/storm/generated/StormTopology.java |   251 +-
 .../storm/generated/SupervisorInfo.java         |   152 +-
 .../storm/generated/SupervisorSummary.java      |    44 +-
 .../storm/generated/TopologyHistoryInfo.java    |    32 +-
 .../backtype/storm/generated/TopologyInfo.java  |   164 +-
 .../storm/generated/TopologyPageInfo.java       |    96 +-
 .../backtype/storm/generated/TopologyStats.java |   220 +-
 .../backtype/storm/hooks/BaseWorkerHook.java    |    51 +
 .../jvm/backtype/storm/hooks/IWorkerHook.java   |    44 +
 .../backtype/storm/localizer/LocalResource.java |    44 +
 .../storm/localizer/LocalizedResource.java      |   130 +
 .../LocalizedResourceRetentionSet.java          |   140 +
 .../storm/localizer/LocalizedResourceSet.java   |   101 +
 .../jvm/backtype/storm/localizer/Localizer.java |   695 +
 .../storm/messaging/netty/ControlMessage.java   |    17 +-
 .../messaging/netty/INettySerializable.java     |    26 +
 .../netty/KerberosSaslClientHandler.java        |   152 +
 .../netty/KerberosSaslNettyClient.java          |   203 +
 .../netty/KerberosSaslNettyClientState.java     |    31 +
 .../netty/KerberosSaslNettyServer.java          |   210 +
 .../netty/KerberosSaslNettyServerState.java     |    30 +
 .../netty/KerberosSaslServerHandler.java        |   133 +
 .../storm/messaging/netty/MessageDecoder.java   |     4 +-
 .../netty/NettyRenameThreadFactory.java         |    10 +-
 .../netty/NettyUncaughtExceptionHandler.java    |    35 +
 .../storm/messaging/netty/SaslMessageToken.java |    37 +-
 .../storm/messaging/netty/SaslNettyClient.java  |    22 +-
 .../storm/messaging/netty/SaslNettyServer.java  |   244 +-
 .../messaging/netty/SaslNettyServerState.java   |    13 +-
 .../messaging/netty/SaslStormServerHandler.java |    21 +-
 .../storm/messaging/netty/SaslUtils.java        |     1 +
 .../backtype/storm/messaging/netty/Server.java  |    50 +-
 .../messaging/netty/StormServerHandler.java     |    24 +-
 .../metric/HttpForwardingMetricsConsumer.java   |    11 +-
 .../storm/metric/LoggingMetricsConsumer.java    |     7 +-
 .../jvm/backtype/storm/multilang/BoltMsg.java   |     3 +-
 .../jvm/backtype/storm/multilang/ShellMsg.java  |     3 +-
 .../jvm/backtype/storm/multilang/SpoutMsg.java  |     3 +-
 .../AbstractDNSToSwitchMapping.java             |     7 +-
 .../networktopography/DNSToSwitchMapping.java   |     1 -
 .../jvm/backtype/storm/scheduler/INimbus.java   |    16 +-
 .../backtype/storm/scheduler/IScheduler.java    |     8 +-
 .../resource/ResourceAwareScheduler.java        |     2 +-
 .../strategies/ResourceAwareStrategy.java       |     2 +-
 .../backtype/storm/security/auth/AuthUtils.java |    69 +
 .../storm/security/auth/IAuthorizer.java        |     6 +
 .../storm/security/auth/NimbusPrincipal.java    |    29 +
 .../storm/security/auth/ReqContext.java         |     9 +-
 .../authorizer/SimpleWhitelistAuthorizer.java   |     2 +-
 .../serialization/BlowfishTupleSerializer.java  |     4 +-
 .../src/jvm/backtype/storm/spout/ISpout.java    |    18 +-
 .../storm/spout/ISpoutWaitStrategy.java         |     2 +-
 .../jvm/backtype/storm/spout/MultiScheme.java   |     3 +-
 .../backtype/storm/spout/RawMultiScheme.java    |     3 +-
 .../src/jvm/backtype/storm/spout/RawScheme.java |     9 +-
 .../src/jvm/backtype/storm/spout/Scheme.java    |     3 +-
 .../storm/spout/SchemeAsMultiScheme.java        |     3 +-
 .../jvm/backtype/storm/spout/ShellSpout.java    |     6 +-
 .../storm/spout/SpoutOutputCollector.java       |     8 +-
 .../src/jvm/backtype/storm/task/IBolt.java      |    28 +-
 .../backtype/storm/task/OutputCollector.java    |    18 +-
 .../src/jvm/backtype/storm/task/ShellBolt.java  |    31 +-
 .../backtype/storm/task/TopologyContext.java    |    35 +-
 .../storm/testing/CompleteTopologyParam.java    |     2 +-
 .../testing/ForwardingMetricsConsumer.java      |     5 +
 .../backtype/storm/testing/MkClusterParam.java  |     2 +-
 .../backtype/storm/testing/MockedSources.java   |     2 +-
 .../src/jvm/backtype/storm/testing/TestJob.java |     4 +-
 .../storm/topology/TopologyBuilder.java         |    59 +-
 .../storm/topology/base/BaseRichSpout.java      |     4 -
 .../src/jvm/backtype/storm/tuple/ITuple.java    |    20 +-
 .../backtype/storm/utils/BufferInputStream.java |    53 +
 .../storm/utils/ShellBoltMessageQueue.java      |   121 +
 .../jvm/backtype/storm/utils/ShellUtils.java    |     7 +
 .../StormBoundedExponentialBackoffRetry.java    |     5 +-
 .../storm/utils/ThriftTopologyUtils.java        |    36 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |   543 +-
 .../storm/validation/ConfigValidation.java      |    30 +-
 .../validation/ConfigValidationAnnotations.java |    13 +-
 .../storm/pacemaker/IServerMessageHandler.java  |    25 +
 .../apache/storm/pacemaker/PacemakerClient.java |   255 +
 .../storm/pacemaker/PacemakerClientHandler.java |    75 +
 .../apache/storm/pacemaker/PacemakerServer.java |   163 +
 .../storm/pacemaker/codec/ThriftDecoder.java    |    76 +
 .../storm/pacemaker/codec/ThriftEncoder.java    |   110 +
 .../pacemaker/codec/ThriftNettyClientCodec.java |    94 +
 .../pacemaker/codec/ThriftNettyServerCodec.java |    99 +
 .../src/jvm/storm/trident/operation/Filter.java |     1 -
 .../jvm/storm/trident/state/map/CachedMap.java  |     1 -
 .../jvm/storm/trident/util/TridentUtils.java    |    33 +-
 storm-core/src/py/storm/Nimbus-remote           |    98 +
 storm-core/src/py/storm/Nimbus.py               |  5991 +++-
 storm-core/src/py/storm/ttypes.py               |  4297 ++-
 storm-core/src/storm.thrift                     |   129 +-
 .../test/clj/backtype/storm/cluster_test.clj    |    27 +-
 .../test/clj/backtype/storm/multilang_test.clj  |     2 +-
 .../test/clj/backtype/storm/nimbus_test.clj     |    43 +-
 .../storm/security/auth/ReqContext_test.clj     |     1 +
 .../test/clj/backtype/storm/supervisor_test.clj |    18 +-
 .../storm/pacemaker_state_factory_test.clj      |   150 +
 .../clj/org/apache/storm/pacemaker_test.clj     |   242 +
 .../jvm/backtype/storm/TestConfigValidate.java  |    18 +
 .../backtype/storm/blobstore/BlobStoreTest.java |   461 +
 .../storm/blobstore/BlobSynchronizerTest.java   |   137 +
 .../storm/blobstore/ClientBlobStoreTest.java    |   179 +
 .../LocalizedResourceRetentionSetTest.java      |    85 +
 .../localizer/LocalizedResourceSetTest.java     |    74 +
 .../backtype/storm/localizer/LocalizerTest.java |   671 +
 .../jvm/backtype/storm/localizer/localtest.zip  |   Bin 0 -> 6378 bytes
 .../storm/localizer/localtestwithsymlink.jar    |   Bin 0 -> 6591 bytes
 .../storm/localizer/localtestwithsymlink.tar    |   Bin 0 -> 24576 bytes
 .../storm/localizer/localtestwithsymlink.tar.gz |   Bin 0 -> 6106 bytes
 .../storm/localizer/localtestwithsymlink.tgz    |   Bin 0 -> 6106 bytes
 .../storm/topology/TopologyBuilderTest.java     |     5 +
 .../storm/utils/ShellBoltMessageQueueTest.java  |    85 +
 .../storm/utils/ThriftTopologyUtilsTest.java    |    94 +
 storm-dist/binary/LICENSE                       |    27 +
 storm-dist/binary/pom.xml                       |    10 +
 storm-dist/binary/src/main/assembly/binary.xml  |    36 +-
 375 files changed, 60503 insertions(+), 12482 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/60ff46ea/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------


[6/7] storm git commit: Merge branch 'windowing-eventts' of https://github.com/arunmahadevan/storm into STORM-1187

Posted by sr...@apache.org.
Merge branch 'windowing-eventts' of https://github.com/arunmahadevan/storm into STORM-1187


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8129a8bf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8129a8bf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8129a8bf

Branch: refs/heads/master
Commit: 8129a8bf4f1f28b9f7d3e813f1e3844d9235086f
Parents: 3e76fd5 e764a2b
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Sun Dec 20 19:54:12 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sun Dec 20 19:54:12 2015 -0800

----------------------------------------------------------------------
 docs/documentation/Windowing.md                 |  91 ++++++
 .../storm/starter/SlidingTupleTsTopology.java   |  62 ++++
 .../storm/starter/SlidingWindowTopology.java    |  81 +-----
 .../starter/bolt/SlidingWindowSumBolt.java      |  80 ++++++
 .../storm/starter/spout/RandomIntegerSpout.java |  55 ++++
 .../src/clj/backtype/storm/daemon/executor.clj  |   2 +
 storm-core/src/jvm/backtype/storm/Config.java   |  30 +-
 .../storm/topology/WindowedBoltExecutor.java    | 110 +++++++-
 .../storm/topology/base/BaseWindowedBolt.java   |  33 +++
 .../storm/windowing/CountEvictionPolicy.java    |  17 +-
 .../storm/windowing/CountTriggerPolicy.java     |  11 +-
 .../src/jvm/backtype/storm/windowing/Event.java |   8 +
 .../jvm/backtype/storm/windowing/EventImpl.java |  13 +
 .../storm/windowing/EvictionPolicy.java         |  38 ++-
 .../storm/windowing/TimeEvictionPolicy.java     |  39 ++-
 .../storm/windowing/TimeTriggerPolicy.java      |  13 +
 .../storm/windowing/TriggerHandler.java         |   6 +-
 .../storm/windowing/WaterMarkEvent.java         |  38 +++
 .../windowing/WaterMarkEventGenerator.java      | 116 ++++++++
 .../windowing/WatermarkCountEvictionPolicy.java |  65 +++++
 .../windowing/WatermarkCountTriggerPolicy.java  |  83 ++++++
 .../windowing/WatermarkTimeEvictionPolicy.java  |  77 +++++
 .../windowing/WatermarkTimeTriggerPolicy.java   | 109 ++++++++
 .../backtype/storm/windowing/WindowManager.java | 153 +++++++---
 .../topology/WindowedBoltExecutorTest.java      | 142 ++++++++++
 .../windowing/WaterMarkEventGeneratorTest.java  | 117 ++++++++
 .../storm/windowing/WindowManagerTest.java      | 280 +++++++++++++++++--
 27 files changed, 1691 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8129a8bf/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------


[5/7] storm git commit: Addressing review comments

Posted by sr...@apache.org.
Addressing review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e764a2bc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e764a2bc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e764a2bc

Branch: refs/heads/master
Commit: e764a2bcc93b0f39f1480b0bfbde7c44a74248f1
Parents: 60ff46e
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Tue Dec 15 14:16:43 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Tue Dec 15 14:16:43 2015 +0530

----------------------------------------------------------------------
 docs/documentation/Windowing.md                 |  3 ++-
 .../storm/topology/WindowedBoltExecutor.java    |  7 +++++--
 .../windowing/WaterMarkEventGenerator.java      | 14 +++++++++----
 .../windowing/WaterMarkEventGeneratorTest.java  | 21 ++++++++++++++++++--
 4 files changed, 36 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e764a2bc/docs/documentation/Windowing.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Windowing.md b/docs/documentation/Windowing.md
index 54e5e2d..803e5ca 100644
--- a/docs/documentation/Windowing.md
+++ b/docs/documentation/Windowing.md
@@ -145,7 +145,8 @@ If the field is not present in the tuple an exception will be thrown. Along with
 can also be specified which indicates the max time limit for tuples with out of order timestamps. 
 
 E.g. If the lag is 5 secs and a tuple `t1` arrived with timestamp `06:00:05` no tuples may arrive with tuple timestamp earlier than `06:00:00`. If a tuple
-arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`, it will be treated as a late tuple and not processed. 
+arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`, it will be treated as a late tuple and not processed. Currently the late
+ tuples are just logged in the worker log files at INFO level.
 
 ```java
 /**

http://git-wip-us.apache.org/repos/asf/storm/blob/e764a2bc/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
index 7b45cbc..b753c2c 100644
--- a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
@@ -236,8 +236,11 @@ public class WindowedBoltExecutor implements IRichBolt {
     public void execute(Tuple input) {
         if (isTupleTs()) {
             long ts = input.getLongByField(tupleTsFieldName);
-            windowManager.add(input, ts);
-            waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts);
+            if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
+                windowManager.add(input, ts);
+            } else {
+                LOG.info("Received a late tuple {} with ts {}. This will not processed.", input, ts);
+            }
         } else {
             windowManager.add(input);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/e764a2bc/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java b/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java
index 9820da6..88781db 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java
@@ -57,12 +57,18 @@ public class WaterMarkEventGenerator<T> implements Runnable {
         this.inputStreams = inputStreams;
     }
 
-    public void track(GlobalStreamId stream, long ts) {
+    /**
+     * Tracks the timestamp of the event in the stream, returns
+     * true if the event can be considered for processing or
+     * false if its a late event.
+     */
+    public boolean track(GlobalStreamId stream, long ts) {
         Long currentVal = streamToTs.get(stream);
         if (currentVal == null || ts > currentVal) {
             streamToTs.put(stream, ts);
         }
         checkFailures();
+        return ts >= lastWaterMarkTs;
     }
 
     @Override
@@ -70,7 +76,7 @@ public class WaterMarkEventGenerator<T> implements Runnable {
         try {
             long waterMarkTs = computeWaterMarkTs();
             if (waterMarkTs > lastWaterMarkTs) {
-                this.windowManager.add(new WaterMarkEvent<T>(waterMarkTs - eventTsLag));
+                this.windowManager.add(new WaterMarkEvent<T>(waterMarkTs));
                 lastWaterMarkTs = waterMarkTs;
             }
         } catch (Throwable th) {
@@ -83,7 +89,7 @@ public class WaterMarkEventGenerator<T> implements Runnable {
      * Computes the min ts across all streams.
      */
     private long computeWaterMarkTs() {
-        long ts = Long.MIN_VALUE;
+        long ts = 0;
         // only if some data has arrived on each input stream
         if(streamToTs.size() >= inputStreams.size()) {
             ts = Long.MAX_VALUE;
@@ -91,7 +97,7 @@ public class WaterMarkEventGenerator<T> implements Runnable {
                 ts = Math.min(ts, entry.getValue());
             }
         }
-        return ts;
+        return ts - eventTsLag;
     }
 
     private void checkFailures() {

http://git-wip-us.apache.org/repos/asf/storm/blob/e764a2bc/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java b/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
index 2e52236..a517045 100644
--- a/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
+++ b/storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java
@@ -31,7 +31,7 @@ import java.util.Set;
 import static org.junit.Assert.*;
 
 /**
- * Unit tests for {@link WaterMarkEventGeneratorTest}
+ * Unit tests for {@link WaterMarkEventGenerator}
  */
 public class WaterMarkEventGeneratorTest {
     WaterMarkEventGenerator<Integer> waterMarkEventGenerator;
@@ -79,7 +79,7 @@ public class WaterMarkEventGeneratorTest {
         Set<GlobalStreamId> streams = new HashSet<>();
         streams.add(streamId("s1"));
         streams.add(streamId("s2"));
-        waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 50, 5, streams);
+        waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 100000, 5, streams);
 
         waterMarkEventGenerator.track(streamId("s1"), 100);
         waterMarkEventGenerator.track(streamId("s1"), 110);
@@ -97,4 +97,21 @@ public class WaterMarkEventGeneratorTest {
         waterMarkEventGenerator.run();
         assertTrue(eventList.isEmpty());
     }
+
+    @Test
+    public void testLateEvent() throws Exception {
+        assertTrue(waterMarkEventGenerator.track(streamId("s1"), 100));
+        assertTrue(waterMarkEventGenerator.track(streamId("s1"), 110));
+        waterMarkEventGenerator.run();
+        assertTrue(eventList.get(0).isWatermark());
+        assertEquals(105, eventList.get(0).getTimestamp());
+        eventList.clear();
+        assertTrue(waterMarkEventGenerator.track(streamId("s1"), 105));
+        assertTrue(waterMarkEventGenerator.track(streamId("s1"), 106));
+        assertTrue(waterMarkEventGenerator.track(streamId("s1"), 115));
+        assertFalse(waterMarkEventGenerator.track(streamId("s1"), 104));
+        waterMarkEventGenerator.run();
+        assertTrue(eventList.get(0).isWatermark());
+        assertEquals(110, eventList.get(0).getTimestamp());
+    }
 }