You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/21 05:06:50 UTC

[2/7] storm git commit: STORM-1187 Support windowing based on tuple ts

STORM-1187 Support windowing based on tuple ts

Support for doing window calculations based to tuple timestamps and handle
out of order events based on time lag.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/06d4bb6e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/06d4bb6e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/06d4bb6e

Branch: refs/heads/master
Commit: 06d4bb6ef10a11e19e1d718a76d7062ef5f69834
Parents: a8d253a
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Fri Nov 13 18:06:56 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Mon Dec 7 01:02:56 2015 +0530

----------------------------------------------------------------------
 docs/documentation/Windowing.md                 |  90 ++++++
 .../storm/starter/SlidingTupleTsTopology.java   |  62 ++++
 .../storm/starter/SlidingWindowTopology.java    |  81 +-----
 .../starter/bolt/SlidingWindowSumBolt.java      |  80 ++++++
 .../storm/starter/spout/RandomIntegerSpout.java |  55 ++++
 .../src/clj/backtype/storm/daemon/executor.clj  |   2 +
 storm-core/src/jvm/backtype/storm/Config.java   |  30 +-
 .../storm/topology/WindowedBoltExecutor.java    | 106 ++++++-
 .../storm/topology/base/BaseWindowedBolt.java   |  33 +++
 .../storm/windowing/CountEvictionPolicy.java    |  17 +-
 .../storm/windowing/CountTriggerPolicy.java     |  11 +-
 .../src/jvm/backtype/storm/windowing/Event.java |   8 +
 .../jvm/backtype/storm/windowing/EventImpl.java |  13 +
 .../storm/windowing/EvictionPolicy.java         |  38 ++-
 .../storm/windowing/TimeEvictionPolicy.java     |  39 ++-
 .../storm/windowing/TimeTriggerPolicy.java      |  13 +
 .../storm/windowing/TriggerHandler.java         |   6 +-
 .../storm/windowing/WaterMarkEvent.java         |  38 +++
 .../windowing/WaterMarkEventGenerator.java      | 110 ++++++++
 .../windowing/WatermarkCountEvictionPolicy.java |  65 +++++
 .../windowing/WatermarkCountTriggerPolicy.java  |  83 ++++++
 .../windowing/WatermarkTimeEvictionPolicy.java  |  77 +++++
 .../windowing/WatermarkTimeTriggerPolicy.java   | 109 ++++++++
 .../backtype/storm/windowing/WindowManager.java | 153 +++++++---
 .../topology/WindowedBoltExecutorTest.java      | 139 +++++++++
 .../windowing/WaterMarkEventGeneratorTest.java  |  99 +++++++
 .../storm/windowing/WindowManagerTest.java      | 280 +++++++++++++++++--
 27 files changed, 1664 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/docs/documentation/Windowing.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Windowing.md b/docs/documentation/Windowing.md
index 8f9d758..54e5e2d 100644
--- a/docs/documentation/Windowing.md
+++ b/docs/documentation/Windowing.md
@@ -126,6 +126,96 @@ Time duration based tumbling window that tumbles after the specified time durati
 
 ```
 
+## Tuple timestamp and out of order tuples
+By default the timestamp tracked in the window is the time when the tuple is processed by the bolt. The window calculations
+are performed based on the processing timestamp. Storm has support for tracking windows based on the source generated timestamp.
+
+```java
+/**
+* Specify a field in the tuple that represents the timestamp as a long value. If this
+* field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
+*
+* @param fieldName the name of the field that contains the timestamp
+*/
+public BaseWindowedBolt withTimestampField(String fieldName)
+```
+
+The value for the above `fieldName` will be looked up from the incoming tuple and considered for windowing calculations. 
+If the field is not present in the tuple an exception will be thrown. Along with the timestamp field name, a time lag parameter 
+can also be specified which indicates the max time limit for tuples with out of order timestamps. 
+
+E.g. If the lag is 5 secs and a tuple `t1` arrived with timestamp `06:00:05` no tuples may arrive with tuple timestamp earlier than `06:00:00`. If a tuple
+arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`, it will be treated as a late tuple and not processed. 
+
+```java
+/**
+* Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
+* cannot be out of order by more than this amount.
+*
+* @param duration the max lag duration
+*/
+public BaseWindowedBolt withLag(Duration duration)
+```
+
+### Watermarks
+For processing tuples with timestamp field, storm internally computes watermarks based on the incoming tuple timestamp. Watermark is 
+the minimum of the latest tuple timestamps (minus the lag) across all the input streams. At a higher level this is similar to the watermark concept
+used by Flink and Google's MillWheel for tracking event based timestamps.
+
+Periodically (default every sec), the watermark timestamps are emitted and this is considered as the clock tick for the window calculation if 
+tuple based timestamps are in use. The interval at which watermarks are emitted can be changed with the below api.
+ 
+```java
+/**
+* Specify the watermark event generation interval. For tuple based timestamps, watermark events
+* are used to track the progress of time
+*
+* @param interval the interval at which watermark events are generated
+*/
+public BaseWindowedBolt withWatermarkInterval(Duration interval)
+```
+
+
+When a watermark is received, all windows up to that timestamp will be evaluated.
+
+For example, consider tuple timestamp based processing with following window parameters,
+
+`Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s`
+
+```
+|-----|-----|-----|-----|-----|-----|-----|
+0     10    20    30    40    50    60    70
+````
+
+Current ts = `09:00:00`
+
+Tuples `e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36)` are received between `9:00:00` and `9:00:01`
+
+At time t = `09:00:01`, watermark w1 = `6:00:31` is emitted since no tuples earlier than `6:00:31` can arrive.
+
+Three windows will be evaluated. The first window end ts (06:00:10) is computed by taking the earliest event timestamp (06:00:03) 
+and computing the ceiling based on the sliding interval (10s).
+
+1. `5:59:50 - 06:00:10` with tuples e1, e2, e3
+2. `6:00:00 - 06:00:20` with tuples e1, e2, e3, e4
+3. `6:00:10 - 06:00:30` with tuples e4, e5
+
+e6 is not evaluated since watermark timestamp `6:00:31` is older than the tuple ts `6:00:36`.
+
+Tuples `e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)` are received between `9:00:01` and `9:00:02`
+
+At time t = `09:00:02` another watermark w2 = `08:00:34` is emitted since no tuples earlier than `8:00:34` can arrive now.
+
+Three windows will be evaluated,
+
+1. `6:00:20 - 06:00:40` with tuples e5, e6 (from earlier batch)
+2. `6:00:30 - 06:00:50` with tuple e6 (from earlier batch)
+3. `8:00:10 - 08:00:30` with tuples e7, e8, e9
+
+e10 is not evaluated since the tuple ts `8:00:39` is beyond the watermark time `8:00:34`.
+
+The window calculation considers the time gaps and computes the windows based on the tuple timestamp.
+
 ## Guarentees
 The windowing functionality in storm core currently provides at-least once guarentee. The values emitted from the bolts
 `execute(TupleWindow inputWindow)` method are automatically anchored to all the tuples in the inputWindow. The downstream

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java b/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java
new file mode 100644
index 0000000..598335d
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseWindowedBolt;
+import backtype.storm.utils.Utils;
+import storm.starter.bolt.PrinterBolt;
+import storm.starter.bolt.SlidingWindowSumBolt;
+import storm.starter.spout.RandomIntegerSpout;
+
+import java.util.concurrent.TimeUnit;
+
+import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
+
+/**
+ * Windowing based on tuple timestamp (e.g. the time when tuple is generated
+ * rather than when its processed).
+ */
+public class SlidingTupleTsTopology {
+    public static void main(String[] args) throws Exception {
+        TopologyBuilder builder = new TopologyBuilder();
+        BaseWindowedBolt bolt = new SlidingWindowSumBolt()
+                .withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS))
+                .withTimestampField("ts")
+                .withLag(new Duration(5, TimeUnit.SECONDS));
+        builder.setSpout("integer", new RandomIntegerSpout(), 1);
+        builder.setBolt("slidingsum", bolt, 1).shuffleGrouping("integer");
+        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingsum");
+        Config conf = new Config();
+        conf.setDebug(true);
+
+        if (args != null && args.length > 0) {
+            conf.setNumWorkers(1);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+        } else {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", conf, builder.createTopology());
+            Utils.sleep(40000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java b/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
index beee8de..5031f8d 100644
--- a/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
@@ -20,12 +20,10 @@ package storm.starter;
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.StormSubmitter;
-import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.topology.base.BaseWindowedBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
@@ -35,10 +33,11 @@ import backtype.storm.windowing.TupleWindow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.starter.bolt.PrinterBolt;
+import storm.starter.bolt.SlidingWindowSumBolt;
+import storm.starter.spout.RandomIntegerSpout;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 
 import static backtype.storm.topology.base.BaseWindowedBolt.Count;
 
@@ -51,79 +50,6 @@ public class SlidingWindowTopology {
     private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowTopology.class);
 
     /*
-     * emits a random integer every 100 ms
-     */
-
-    private static class RandomIntegerSpout extends BaseRichSpout {
-        SpoutOutputCollector collector;
-        Random rand;
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("value"));
-        }
-
-        @Override
-        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-            this.collector = collector;
-            this.rand = new Random();
-        }
-
-        @Override
-        public void nextTuple() {
-            Utils.sleep(100);
-            collector.emit(new Values(rand.nextInt(1000)));
-        }
-    }
-
-    /*
-     * Computes sliding window sum
-     */
-    private static class SlidingWindowSumBolt extends BaseWindowedBolt {
-        private int sum = 0;
-        private OutputCollector collector;
-
-        @Override
-        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-            this.collector = collector;
-        }
-
-        @Override
-        public void execute(TupleWindow inputWindow) {
-            /*
-             * The inputWindow gives a view of
-             * (a) all the events in the window
-             * (b) events that expired since last activation of the window
-             * (c) events that newly arrived since last activation of the window
-             */
-            List<Tuple> tuplesInWindow = inputWindow.get();
-            List<Tuple> newTuples = inputWindow.getNew();
-            List<Tuple> expiredTuples = inputWindow.getExpired();
-
-            LOG.debug("Events in current window: " + tuplesInWindow.size());
-            /*
-             * Instead of iterating over all the tuples in the window to compute
-             * the sum, the values for the new events are added and old events are
-             * subtracted. Similar optimizations might be possible in other
-             * windowing computations.
-             */
-            for (Tuple tuple : newTuples) {
-                sum += (int) tuple.getValue(0);
-            }
-            for (Tuple tuple : expiredTuples) {
-                sum -= (int) tuple.getValue(0);
-            }
-            collector.emit(new Values(sum));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("sum"));
-        }
-    }
-
-
-    /*
      * Computes tumbling window average
      */
     private static class TumblingWindowAvgBolt extends BaseWindowedBolt {
@@ -168,13 +94,10 @@ public class SlidingWindowTopology {
         builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
         Config conf = new Config();
         conf.setDebug(true);
-
         if (args != null && args.length > 0) {
             conf.setNumWorkers(1);
-
             StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
         } else {
-
             LocalCluster cluster = new LocalCluster();
             cluster.submitTopology("test", conf, builder.createTopology());
             Utils.sleep(40000);

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java
new file mode 100644
index 0000000..ef3a0b8
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseWindowedBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.windowing.TupleWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Computes sliding window sum
+ */
+public class SlidingWindowSumBolt extends BaseWindowedBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowSumBolt.class);
+
+    private int sum = 0;
+    private OutputCollector collector;
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(TupleWindow inputWindow) {
+            /*
+             * The inputWindow gives a view of
+             * (a) all the events in the window
+             * (b) events that expired since last activation of the window
+             * (c) events that newly arrived since last activation of the window
+             */
+        List<Tuple> tuplesInWindow = inputWindow.get();
+        List<Tuple> newTuples = inputWindow.getNew();
+        List<Tuple> expiredTuples = inputWindow.getExpired();
+
+        LOG.debug("Events in current window: " + tuplesInWindow.size());
+            /*
+             * Instead of iterating over all the tuples in the window to compute
+             * the sum, the values for the new events are added and old events are
+             * subtracted. Similar optimizations might be possible in other
+             * windowing computations.
+             */
+        for (Tuple tuple : newTuples) {
+            sum += (int) tuple.getValue(0);
+        }
+        for (Tuple tuple : expiredTuples) {
+            sum -= (int) tuple.getValue(0);
+        }
+        collector.emit(new Values(sum));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("sum"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java b/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java
new file mode 100644
index 0000000..5778c8e
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter.spout;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Emits a random integer and a timestamp value (offset by one day),
+ * every 100 ms. The ts field can be used in tuple time based windowing.
+ */
+public class RandomIntegerSpout extends BaseRichSpout {
+    private SpoutOutputCollector collector;
+    private Random rand;
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("value", "ts"));
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.collector = collector;
+        this.rand = new Random();
+    }
+
+    @Override
+    public void nextTuple() {
+        Utils.sleep(100);
+        collector.emit(new Values(rand.nextInt(1000), System.currentTimeMillis() - (24 * 60 * 60 * 1000)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 2aeb5e7..58b56cf 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -171,6 +171,8 @@
                         TOPOLOGY-BOLTS-WINDOW-LENGTH-DURATION-MS
                         TOPOLOGY-BOLTS-SLIDING-INTERVAL-COUNT
                         TOPOLOGY-BOLTS-SLIDING-INTERVAL-DURATION-MS
+                        TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-FIELD-NAME
+                        TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-MAX-LAG-MS
                         )
         spec-conf (-> general-context
                       (.getComponentCommon component-id)

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index a1da8fe..4f86786 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1526,19 +1526,45 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS = "topology.bolts.window.length.duration.ms";
 
     /*
-     * Bolt-specific configuration for windowed bolts to specifiy the sliding interval as a count of number of tuples.
+     * Bolt-specific configuration for windowed bolts to specify the sliding interval as a count of number of tuples.
      */
     @isInteger
     @isPositiveNumber
     public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT = "topology.bolts.window.sliding.interval.count";
 
     /*
-     * Bolt-specific configuration for windowed bolts to specifiy the sliding interval in time duration.
+     * Bolt-specific configuration for windowed bolts to specify the sliding interval in time duration.
      */
     @isInteger
     @isPositiveNumber
     public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS = "topology.bolts.window.sliding.interval.duration.ms";
 
+    /*
+     * Bolt-specific configuration for windowed bolts to specify the name of the field in the tuple that holds
+     * the timestamp (e.g. the ts when the tuple was actually generated). If this config is specified and the
+     * field is not present in the incoming tuple, a java.lang.IllegalArgumentException will be thrown.
+     */
+    @isString
+    public static final String TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME = "topology.bolts.tuple.timestamp.field.name";
+
+    /*
+     * Bolt-specific configuration for windowed bolts to specify the maximum time lag of the tuple timestamp
+     * in milliseconds. It means that the tuple timestamps cannot be out of order by more than this amount.
+     * This config will be effective only if the TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME is also specified.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS = "topology.bolts.tuple.timestamp.max.lag.ms";
+
+    /*
+     * Bolt-specific configuration for windowed bolts to specify the time interval for generating
+     * watermark events. Watermark event tracks the progress of time when tuple timestamp is used.
+     * This config is effective only if TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME is also specified.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS = "topology.bolts.watermark.event.interval.ms";
+
     /**
      * This config is available for TransactionalSpouts, and contains the id ( a String) for
      * the transactional topology. This id is used to store the state of the transactional

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
index c7d6f70..ff2278e 100644
--- a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
@@ -22,7 +22,18 @@ import backtype.storm.task.IOutputCollector;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.windowing.CountEvictionPolicy;
+import backtype.storm.windowing.CountTriggerPolicy;
+import backtype.storm.windowing.EvictionPolicy;
+import backtype.storm.windowing.TimeEvictionPolicy;
+import backtype.storm.windowing.TimeTriggerPolicy;
+import backtype.storm.windowing.TriggerPolicy;
 import backtype.storm.windowing.TupleWindowImpl;
+import backtype.storm.windowing.WaterMarkEventGenerator;
+import backtype.storm.windowing.WatermarkCountEvictionPolicy;
+import backtype.storm.windowing.WatermarkCountTriggerPolicy;
+import backtype.storm.windowing.WatermarkTimeEvictionPolicy;
+import backtype.storm.windowing.WatermarkTimeTriggerPolicy;
 import backtype.storm.windowing.WindowLifecycleListener;
 import backtype.storm.windowing.WindowManager;
 import org.slf4j.Logger;
@@ -40,11 +51,15 @@ import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
  */
 public class WindowedBoltExecutor implements IRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
-
-    private IWindowedBolt bolt;
+    private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
+    private static final int DEFAULT_MAX_LAG_MS = 0; // no lag
+    private final IWindowedBolt bolt;
     private transient WindowedOutputCollector windowedOutputCollector;
     private transient WindowLifecycleListener<Tuple> listener;
     private transient WindowManager<Tuple> windowManager;
+    private transient int maxLagMs;
+    private transient String tupleTsFieldName;
+    private transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
 
     public WindowedBoltExecutor(IWindowedBolt bolt) {
         this.bolt = bolt;
@@ -93,6 +108,10 @@ public class WindowedBoltExecutor implements IRichBolt {
 
         int topologyTimeout = getTopologyTimeoutMillis(stormConf);
         int maxSpoutPending = getMaxSpoutPending(stormConf);
+        if (windowLengthCount == null && windowLengthDuration == null) {
+            throw new IllegalArgumentException("Window length is not specified");
+        }
+
         if (windowLengthDuration != null && slidingIntervalDuration != null) {
             ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout);
         } else if (windowLengthDuration != null) {
@@ -110,12 +129,14 @@ public class WindowedBoltExecutor implements IRichBolt {
         }
     }
 
-    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf) {
+    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf,
+                                                   TopologyContext context) {
         WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener);
         Duration windowLengthDuration = null;
         Count windowLengthCount = null;
         Duration slidingIntervalDuration = null;
         Count slidingIntervalCount = null;
+        // window length
         if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
             windowLengthCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
         } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
@@ -123,7 +144,7 @@ public class WindowedBoltExecutor implements IRichBolt {
                     ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
                     TimeUnit.MILLISECONDS);
         }
-
+        // sliding interval
         if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
             slidingIntervalCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
         } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
@@ -132,20 +153,73 @@ public class WindowedBoltExecutor implements IRichBolt {
             // default is a sliding window of count 1
             slidingIntervalCount = new Count(1);
         }
+        // tuple ts
+        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME)) {
+            tupleTsFieldName = (String) stormConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME);
+            // max lag
+            if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) {
+                maxLagMs = ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue();
+            } else {
+                maxLagMs = DEFAULT_MAX_LAG_MS;
+            }
+            // watermark interval
+            int watermarkInterval;
+            if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) {
+                watermarkInterval = ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue();
+            } else {
+                watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS;
+            }
+            waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval,
+                                                                    maxLagMs, context.getThisSources().keySet());
+        }
         // validate
         validate(stormConf, windowLengthCount, windowLengthDuration,
                  slidingIntervalCount, slidingIntervalDuration);
-        if (windowLengthCount != null) {
-            manager.setWindowLength(windowLengthCount);
+        EvictionPolicy<Tuple> evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration,
+                                                                 manager);
+        TriggerPolicy<Tuple> triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration,
+                                                              manager, evictionPolicy);
+        manager.setEvictionPolicy(evictionPolicy);
+        manager.setTriggerPolicy(triggerPolicy);
+        return manager;
+    }
+
+    private boolean isTupleTs() {
+        return tupleTsFieldName != null;
+    }
+
+    private TriggerPolicy<Tuple> getTriggerPolicy(Count slidingIntervalCount, Duration slidingIntervalDuration,
+                                                  WindowManager<Tuple> manager, EvictionPolicy<Tuple> evictionPolicy) {
+        if (slidingIntervalCount != null) {
+            if (isTupleTs()) {
+                return new WatermarkCountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy, manager);
+            } else {
+                return new CountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy);
+            }
         } else {
-            manager.setWindowLength(windowLengthDuration);
+            if (isTupleTs()) {
+                return new WatermarkTimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy, manager);
+            } else {
+                return new TimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy);
+            }
         }
-        if (slidingIntervalCount != null) {
-            manager.setSlidingInterval(slidingIntervalCount);
+    }
+
+    private EvictionPolicy<Tuple> getEvictionPolicy(Count windowLengthCount, Duration windowLengthDuration,
+                                                    WindowManager<Tuple> manager) {
+        if (windowLengthCount != null) {
+            if (isTupleTs()) {
+                return new WatermarkCountEvictionPolicy<>(windowLengthCount.value, manager);
+            } else {
+                return new CountEvictionPolicy<>(windowLengthCount.value);
+            }
         } else {
-            manager.setSlidingInterval(slidingIntervalDuration);
+            if (isTupleTs()) {
+                return new WatermarkTimeEvictionPolicy<>(windowLengthDuration.value, maxLagMs);
+            } else {
+                return new TimeEvictionPolicy<>(windowLengthDuration.value);
+            }
         }
-        return manager;
     }
 
     @Override
@@ -153,13 +227,19 @@ public class WindowedBoltExecutor implements IRichBolt {
         this.windowedOutputCollector = new WindowedOutputCollector(collector);
         bolt.prepare(stormConf, context, windowedOutputCollector);
         this.listener = newWindowLifecycleListener();
-        this.windowManager = initWindowManager(listener, stormConf);
+        this.windowManager = initWindowManager(listener, stormConf, context);
         LOG.debug("Initialized window manager {} ", this.windowManager);
     }
 
     @Override
     public void execute(Tuple input) {
-        windowManager.add(input);
+        if (isTupleTs()) {
+            long ts = input.getLongByField(tupleTsFieldName);
+            windowManager.add(input, ts);
+            waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts);
+        } else {
+            windowManager.add(input);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java b/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
index fd4af90..2f49661 100644
--- a/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
+++ b/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
@@ -157,6 +157,39 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
         return withWindowLength(duration).withSlidingInterval(duration);
     }
 
+    /**
+     * Specify a field in the tuple that represents the timestamp as a long value. If this
+     * field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
+     *
+     * @param fieldName the name of the field that contains the timestamp
+     */
+    public BaseWindowedBolt withTimestampField(String fieldName) {
+        windowConfiguration.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME, fieldName);
+        return this;
+    }
+
+    /**
+     * Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
+     * cannot be out of order by more than this amount.
+     *
+     * @param duration the max lag duration
+     */
+    public BaseWindowedBolt withLag(Duration duration) {
+        windowConfiguration.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, duration.value);
+        return this;
+    }
+
+    /**
+     * Specify the watermark event generation interval. For tuple based timestamps, watermark events
+     * are used to track the progress of time
+     *
+     * @param interval the interval at which watermark events are generated
+     */
+    public BaseWindowedBolt withWatermarkInterval(Duration interval) {
+        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, interval.value);
+        return this;
+    }
+
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
         // NOOP

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
index a6779a8..a49702c 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class CountEvictionPolicy<T> implements EvictionPolicy<T> {
     private final int threshold;
-    private final AtomicInteger currentCount;
+    protected final AtomicInteger currentCount;
 
     public CountEvictionPolicy(int count) {
         this.threshold = count;
@@ -35,7 +35,7 @@ public class CountEvictionPolicy<T> implements EvictionPolicy<T> {
     }
 
     @Override
-    public boolean evict(Event<T> event) {
+    public Action evict(Event<T> event) {
         /*
          * atomically decrement the count if its greater than threshold and
          * return if the event should be evicted
@@ -44,18 +44,25 @@ public class CountEvictionPolicy<T> implements EvictionPolicy<T> {
             int curVal = currentCount.get();
             if (curVal > threshold) {
                 if (currentCount.compareAndSet(curVal, curVal - 1)) {
-                    return true;
+                    return Action.EXPIRE;
                 }
             } else {
                 break;
             }
         }
-        return false;
+        return Action.PROCESS;
     }
 
     @Override
     public void track(Event<T> event) {
-        currentCount.incrementAndGet();
+        if (!event.isWatermark()) {
+            currentCount.incrementAndGet();
+        }
+    }
+
+    @Override
+    public void setContext(Object context) {
+        // NOOP
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
index 3b1bf9f..ffe1b90 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
@@ -29,17 +29,22 @@ public class CountTriggerPolicy<T> implements TriggerPolicy<T> {
     private final int count;
     private final AtomicInteger currentCount;
     private final TriggerHandler handler;
+    private final EvictionPolicy<T> evictionPolicy;
 
-    public CountTriggerPolicy(int count, TriggerHandler handler) {
+    public CountTriggerPolicy(int count, TriggerHandler handler, EvictionPolicy<T> evictionPolicy) {
         this.count = count;
         this.currentCount = new AtomicInteger();
         this.handler = handler;
+        this.evictionPolicy = evictionPolicy;
     }
 
     @Override
     public void track(Event<T> event) {
-        if (currentCount.incrementAndGet() >= count) {
-            handler.onTrigger();
+        if (!event.isWatermark()) {
+            if (currentCount.incrementAndGet() >= count) {
+                evictionPolicy.setContext(System.currentTimeMillis());
+                handler.onTrigger();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/Event.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/Event.java b/storm-core/src/jvm/backtype/storm/windowing/Event.java
index 4855701..a7bcb9b 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/Event.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/Event.java
@@ -38,4 +38,12 @@ interface Event<T> {
      * @return the wrapped object.
      */
     T get();
+
+    /**
+     * If this is a watermark event or not. Watermark events are used
+     * for tracking time while processing event based ts.
+     *
+     * @return true if this is a watermark event
+     */
+    boolean isWatermark();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java b/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
index 09c974c..035f41b 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
@@ -35,4 +35,17 @@ class EventImpl<T> implements Event<T> {
     public T get() {
         return event;
     }
+
+    @Override
+    public boolean isWatermark() {
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return "EventImpl{" +
+                "event=" + event +
+                ", ts=" + ts +
+                '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
index 8820e92..c3b3032 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
@@ -25,12 +25,36 @@ package backtype.storm.windowing;
  */
 public interface EvictionPolicy<T> {
     /**
-     * Decides if an event should be evicted from the window or not.
+     * The action to be taken when {@link EvictionPolicy#evict(Event)} is invoked.
+     */
+    enum Action {
+        /**
+         * expire the event and remove it from the queue
+         */
+        EXPIRE,
+        /**
+         * process the event in the current window of events
+         */
+        PROCESS,
+        /**
+         * don't include in the current window but keep the event
+         * in the queue for evaluating as a part of future windows
+         */
+        KEEP,
+        /**
+         * stop processing the queue, there cannot be anymore events
+         * satisfying the eviction policy
+         */
+        STOP
+    }
+    /**
+     * Decides if an event should be expired from the window, processed in the current
+     * window or kept for later processing.
      *
      * @param event the input event
-     * @return true if the event should be evicted, false otherwise
+     * @return the {@link backtype.storm.windowing.EvictionPolicy.Action} to be taken based on the input event
      */
-    boolean evict(Event<T> event);
+    Action evict(Event<T> event);
 
     /**
      * Tracks the event to later decide whether
@@ -39,4 +63,12 @@ public interface EvictionPolicy<T> {
      * @param event the input event to be tracked
      */
     void track(Event<T> event);
+
+    /**
+     * Sets a context in the eviction policy that can be used while evicting the events.
+     * E.g. For TimeEvictionPolicy, this could be used to set the reference timestamp.
+     *
+     * @param context
+     */
+    void setContext(Object context);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
index 16408f3..117df92 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
@@ -21,21 +21,34 @@ package backtype.storm.windowing;
  * Eviction policy that evicts events based on time duration.
  */
 public class TimeEvictionPolicy<T> implements EvictionPolicy<T> {
-    private final long duration;
+    private final int windowLength;
+    /**
+     * The reference time in millis for window calculations and
+     * expiring events. If not set it will default to System.currentTimeMillis()
+     */
+    protected Long referenceTime;
 
-    public TimeEvictionPolicy(long millis) {
-        this.duration = millis;
+    /**
+     * Constructs a TimeEvictionPolicy that evicts events older
+     * than the given window length in millis
+     *
+     * @param windowLength the duration in milliseconds
+     */
+    public TimeEvictionPolicy(int windowLength) {
+        this.windowLength = windowLength;
     }
 
     /**
-     * Returns true if the event falls out of the window based on the window duration
-     *
-     * @param event
-     * @return
+     * {@inheritDoc}
      */
     @Override
-    public boolean evict(Event<T> event) {
-        return (System.currentTimeMillis() - event.getTimestamp()) >= duration;
+    public Action evict(Event<T> event) {
+        long now = referenceTime == null ? System.currentTimeMillis() : referenceTime;
+        long diff = now - event.getTimestamp();
+        if (diff >= windowLength) {
+            return Action.EXPIRE;
+        }
+        return Action.PROCESS;
     }
 
     @Override
@@ -44,9 +57,15 @@ public class TimeEvictionPolicy<T> implements EvictionPolicy<T> {
     }
 
     @Override
+    public void setContext(Object context) {
+        referenceTime = ((Number) context).longValue();
+    }
+
+    @Override
     public String toString() {
         return "TimeEvictionPolicy{" +
-                "duration=" + duration +
+                "windowLength=" + windowLength +
+                ", referenceTime=" + referenceTime +
                 '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
index db1fbeb..a32cb4d 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
@@ -37,12 +37,18 @@ public class TimeTriggerPolicy<T> implements TriggerPolicy<T> {
     private final TriggerHandler handler;
     private final ScheduledExecutorService executor;
     private final ScheduledFuture<?> executorFuture;
+    private final EvictionPolicy<T> evictionPolicy;
 
     public TimeTriggerPolicy(long millis, TriggerHandler handler) {
+        this(millis, handler, null);
+    }
+
+    public TimeTriggerPolicy(long millis, TriggerHandler handler, EvictionPolicy evictionPolicy) {
         this.duration = millis;
         this.handler = handler;
         this.executor = Executors.newSingleThreadScheduledExecutor();
         this.executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
+        this.evictionPolicy = evictionPolicy;
     }
 
     @Override
@@ -100,6 +106,13 @@ public class TimeTriggerPolicy<T> implements TriggerPolicy<T> {
             @Override
             public void run() {
                 try {
+                    /*
+                     * set the current timestamp as the reference time for the eviction policy
+                     * to evict the events
+                     */
+                    if(evictionPolicy != null) {
+                        evictionPolicy.setContext(System.currentTimeMillis());
+                    }
                     handler.onTrigger();
                 } catch (Throwable th) {
                     LOG.error("handler.onTrigger failed ", th);

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java b/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java
index f947951..29fe90d 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java
@@ -23,7 +23,9 @@ package backtype.storm.windowing;
  */
 interface TriggerHandler {
     /**
-     * the code to execute when the {@link TriggerPolicy} condition is satisfied.
+     * The code to execute when the {@link TriggerPolicy} condition is satisfied.
+     *
+     * @return true if the window was evaluated with at least one event in the window, false otherwise
      */
-    void onTrigger();
+    boolean onTrigger();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEvent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEvent.java b/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEvent.java
new file mode 100644
index 0000000..bada76d
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+/**
+ * Watermark event used for tracking progress of time when
+ * processing event based ts.
+ */
+public class WaterMarkEvent<T> extends EventImpl<T> {
+    public WaterMarkEvent(long ts) {
+        super(null, ts);
+    }
+
+    @Override
+    public boolean isWatermark() {
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "WaterMarkEvent{} " + super.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java b/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java
new file mode 100644
index 0000000..9820da6
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.topology.FailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tracks tuples across input streams and periodically emits watermark events.
+ * Watermark event timestamp is the minimum of the latest tuple timestamps
+ * across all the input streams (minus the lag). Once a watermark event is emitted
+ * any tuple coming with an earlier timestamp can be considered as late events.
+ */
+public class WaterMarkEventGenerator<T> implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(WaterMarkEventGenerator.class);
+    private final WindowManager<T> windowManager;
+    private final int eventTsLag;
+    private final Set<GlobalStreamId> inputStreams;
+    private final Map<GlobalStreamId, Long> streamToTs;
+    private final ScheduledExecutorService executorService;
+    private final ScheduledFuture<?> executorFuture;
+    private long lastWaterMarkTs = 0;
+
+    public WaterMarkEventGenerator(WindowManager<T> windowManager, int interval,
+                                   int eventTsLag, Set<GlobalStreamId> inputStreams) {
+        this.windowManager = windowManager;
+        streamToTs = new ConcurrentHashMap<>();
+        executorService = Executors.newSingleThreadScheduledExecutor();
+        this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS);
+        this.eventTsLag = eventTsLag;
+        this.inputStreams = inputStreams;
+    }
+
+    public void track(GlobalStreamId stream, long ts) {
+        Long currentVal = streamToTs.get(stream);
+        if (currentVal == null || ts > currentVal) {
+            streamToTs.put(stream, ts);
+        }
+        checkFailures();
+    }
+
+    @Override
+    public void run() {
+        try {
+            long waterMarkTs = computeWaterMarkTs();
+            if (waterMarkTs > lastWaterMarkTs) {
+                this.windowManager.add(new WaterMarkEvent<T>(waterMarkTs - eventTsLag));
+                lastWaterMarkTs = waterMarkTs;
+            }
+        } catch (Throwable th) {
+            LOG.error("Failed while processing watermark event ", th);
+            throw th;
+        }
+    }
+
+    /**
+     * Computes the min ts across all streams.
+     */
+    private long computeWaterMarkTs() {
+        long ts = Long.MIN_VALUE;
+        // only if some data has arrived on each input stream
+        if(streamToTs.size() >= inputStreams.size()) {
+            ts = Long.MAX_VALUE;
+            for (Map.Entry<GlobalStreamId, Long> entry : streamToTs.entrySet()) {
+                ts = Math.min(ts, entry.getValue());
+            }
+        }
+        return ts;
+    }
+
+    private void checkFailures() {
+        if (executorFuture.isDone()) {
+            try {
+                executorFuture.get();
+            } catch (InterruptedException ex) {
+                LOG.error("Got exception ", ex);
+                throw new FailedException(ex);
+            } catch (ExecutionException ex) {
+                LOG.error("Got exception ", ex);
+                throw new FailedException(ex.getCause());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountEvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountEvictionPolicy.java
new file mode 100644
index 0000000..0aa1c6b
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountEvictionPolicy.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+/**
+ * An eviction policy that tracks count based on watermark ts and
+ * evicts events upto the watermark based on a threshold count.
+ *
+ * @param <T> the type of event tracked by this policy.
+ */
+public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> {
+    private final WindowManager<T> windowManager;
+    /*
+     * The reference time in millis for window calculations and
+     * expiring events. If not set it will default to System.currentTimeMillis()
+     */
+    private long referenceTime;
+
+    public WatermarkCountEvictionPolicy(int count, WindowManager<T> windowManager) {
+        super(count);
+        this.windowManager = windowManager;
+    }
+
+    @Override
+    public Action evict(Event<T> event) {
+        if (event.getTimestamp() <= referenceTime) {
+            return super.evict(event);
+        } else {
+            return Action.KEEP;
+        }
+    }
+
+    @Override
+    public void track(Event<T> event) {
+        // NOOP
+    }
+
+    @Override
+    public void setContext(Object context) {
+        referenceTime = (Long) context;
+        currentCount.set(windowManager.getEventCount(referenceTime));
+    }
+
+    @Override
+    public String toString() {
+        return "WatermarkCountEvictionPolicy{" +
+                "referenceTime=" + referenceTime +
+                "} " + super.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountTriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountTriggerPolicy.java
new file mode 100644
index 0000000..510d451
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountTriggerPolicy.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import java.util.List;
+
+/**
+ * A trigger policy that tracks event counts and sets the context for
+ * eviction policy to evict based on latest watermark time.
+ *
+ * @param <T> the type of event tracked by this policy.
+ */
+public class WatermarkCountTriggerPolicy<T> implements TriggerPolicy<T> {
+    private final int count;
+    private final TriggerHandler handler;
+    private final EvictionPolicy<T> evictionPolicy;
+    private final WindowManager<T> windowManager;
+    private long lastProcessedTs = 0;
+
+    public WatermarkCountTriggerPolicy(int count, TriggerHandler handler,
+                                       EvictionPolicy<T> evictionPolicy, WindowManager<T> windowManager) {
+        this.count = count;
+        this.handler = handler;
+        this.evictionPolicy = evictionPolicy;
+        this.windowManager = windowManager;
+    }
+
+    @Override
+    public void track(Event<T> event) {
+        if (event.isWatermark()) {
+            handleWaterMarkEvent(event);
+        }
+    }
+
+    @Override
+    public void reset() {
+        // NOOP
+    }
+
+    @Override
+    public void shutdown() {
+        // NOOP
+    }
+
+    /**
+     * Triggers all the pending windows up to the waterMarkEvent timestamp
+     * based on the sliding interval count.
+     *
+     * @param waterMarkEvent the watermark event
+     */
+    private void handleWaterMarkEvent(Event<T> waterMarkEvent) {
+        long watermarkTs = waterMarkEvent.getTimestamp();
+        List<Long> eventTs = windowManager.getSlidingCountTimestamps(lastProcessedTs, watermarkTs, count);
+        for (long ts : eventTs) {
+            evictionPolicy.setContext(ts);
+            handler.onTrigger();
+            lastProcessedTs = ts;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "WatermarkCountTriggerPolicy{" +
+                "count=" + count +
+                ", lastProcessedTs=" + lastProcessedTs +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeEvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeEvictionPolicy.java
new file mode 100644
index 0000000..202726e
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeEvictionPolicy.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+/**
+ * An eviction policy that evicts events based on time duration taking
+ * watermark time and event lag into account.
+ */
+public class WatermarkTimeEvictionPolicy<T> extends TimeEvictionPolicy<T> {
+    private final int lag;
+
+    /**
+     * Constructs a WatermarkTimeEvictionPolicy that evicts events older
+     * than the given window length in millis.
+     *
+     * @param windowLength the window length in milliseconds
+     */
+    public WatermarkTimeEvictionPolicy(int windowLength) {
+        this(windowLength, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Constructs a WatermarkTimeEvictionPolicy that evicts events older
+     * than the given window length in millis. The lag parameter
+     * can be used in the case of event based ts to break the queue
+     * scan early.
+     *
+     * @param windowLength the window length in milliseconds
+     * @param lag          the max event lag in milliseconds
+     */
+    public WatermarkTimeEvictionPolicy(int windowLength, int lag) {
+        super(windowLength);
+        referenceTime = 0L;
+        this.lag = lag;
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p/>
+     * Keeps events with future ts in the queue for processing in the next
+     * window. If the ts difference is more than the lag, stops scanning
+     * the queue for the current window.
+     */
+    @Override
+    public Action evict(Event<T> event) {
+        long diff = referenceTime - event.getTimestamp();
+        if (diff < -lag) {
+            return Action.STOP;
+        } else if (diff < 0) {
+            return Action.KEEP;
+        } else {
+            return super.evict(event);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "WatermarkTimeEvictionPolicy{" +
+                "lag=" + lag +
+                "} " + super.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeTriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeTriggerPolicy.java
new file mode 100644
index 0000000..8b5cd60
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeTriggerPolicy.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles watermark events and triggers {@link TriggerHandler#onTrigger()} for each window
+ * interval that has events to be processed up to the watermark ts.
+ */
+public class WatermarkTimeTriggerPolicy<T> implements TriggerPolicy<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(WatermarkTimeTriggerPolicy.class);
+    private final long slidingIntervalMs;
+    private final TriggerHandler handler;
+    private final EvictionPolicy<T> evictionPolicy;
+    private final WindowManager<T> windowManager;
+    private long nextWindowEndTs = 0;
+
+    public WatermarkTimeTriggerPolicy(long slidingIntervalMs, TriggerHandler handler, EvictionPolicy evictionPolicy,
+                                      WindowManager<T> windowManager) {
+        this.slidingIntervalMs = slidingIntervalMs;
+        this.handler = handler;
+        this.evictionPolicy = evictionPolicy;
+        this.windowManager = windowManager;
+    }
+
+    @Override
+    public void track(Event<T> event) {
+        if (event.isWatermark()) {
+            handleWaterMarkEvent(event);
+        }
+    }
+
+    @Override
+    public void reset() {
+        // NOOP
+    }
+
+    @Override
+    public void shutdown() {
+        // NOOP
+    }
+
+    /**
+     * Invokes the trigger all pending windows up to the
+     * watermark timestamp. The end ts of the window is set
+     * in the eviction policy context so that the events falling
+     * within that window can be processed.
+     */
+    private void handleWaterMarkEvent(Event<T> event) {
+        long watermarkTs = event.getTimestamp();
+        long windowEndTs = nextWindowEndTs;
+        LOG.debug("Window end ts {} Watermark ts {}", windowEndTs, watermarkTs);
+        while (windowEndTs <= watermarkTs) {
+            evictionPolicy.setContext(windowEndTs);
+            if (handler.onTrigger()) {
+                windowEndTs += slidingIntervalMs;
+            } else {
+                /*
+                 * No events were found in the previous window interval.
+                 * Scan through the events in the queue to find the next
+                 * window intervals based on event ts.
+                 */
+                long ts = getNextAlignedWindowTs(windowEndTs, watermarkTs);
+                LOG.debug("Next aligned window end ts {}", ts);
+                if (ts == Long.MAX_VALUE) {
+                    LOG.debug("No events to process between {} and watermark ts {}", windowEndTs, watermarkTs);
+                    break;
+                }
+                windowEndTs = ts;
+            }
+        }
+        nextWindowEndTs = windowEndTs;
+    }
+
+    /**
+     * Computes the next window by scanning the events in the window and
+     * finds the next aligned window between the startTs and endTs. Return the end ts
+     * of the next aligned window, i.e. the ts when the window should fire.
+     *
+     * @param startTs the start timestamp (excluding)
+     * @param endTs the end timestamp (including)
+     * @return the aligned window end ts for the next window or Long.MAX_VALUE if there
+     * are no more events to be processed.
+     */
+    private long getNextAlignedWindowTs(long startTs, long endTs) {
+        long nextTs = windowManager.getEarliestEventTs(startTs, endTs);
+        if (nextTs == Long.MAX_VALUE || (nextTs % slidingIntervalMs == 0)) {
+            return nextTs;
+        }
+        return nextTs + (slidingIntervalMs - (nextTs % slidingIntervalMs));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java b/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
index 6603abf..b783eb0 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
@@ -17,11 +17,11 @@
  */
 package backtype.storm.windowing;
 
+import backtype.storm.windowing.EvictionPolicy.Action;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -30,8 +30,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static backtype.storm.topology.base.BaseWindowedBolt.Count;
-import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
+import static backtype.storm.windowing.EvictionPolicy.Action.EXPIRE;
+import static backtype.storm.windowing.EvictionPolicy.Action.PROCESS;
+import static backtype.storm.windowing.EvictionPolicy.Action.STOP;
 
 /**
  * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks
@@ -49,7 +50,7 @@ public class WindowManager<T> implements TriggerHandler {
     public static final int EXPIRE_EVENTS_THRESHOLD = 100;
 
     private final WindowLifecycleListener<T> windowLifecycleListener;
-    private final ConcurrentLinkedQueue<Event<T>> window;
+    private final ConcurrentLinkedQueue<Event<T>> queue;
     private final List<T> expiredEvents;
     private final Set<Event<T>> prevWindowEvents;
     private final AtomicInteger eventsSinceLastExpiry;
@@ -59,27 +60,19 @@ public class WindowManager<T> implements TriggerHandler {
 
     public WindowManager(WindowLifecycleListener<T> lifecycleListener) {
         windowLifecycleListener = lifecycleListener;
-        window = new ConcurrentLinkedQueue<>();
+        queue = new ConcurrentLinkedQueue<>();
         expiredEvents = new ArrayList<>();
         prevWindowEvents = new HashSet<>();
         eventsSinceLastExpiry = new AtomicInteger();
         lock = new ReentrantLock(true);
     }
 
-    public void setWindowLength(Count count) {
-        this.evictionPolicy = new CountEvictionPolicy<>(count.value);
+    public void setEvictionPolicy(EvictionPolicy<T> evictionPolicy) {
+        this.evictionPolicy = evictionPolicy;
     }
 
-    public void setWindowLength(Duration duration) {
-        this.evictionPolicy = new TimeEvictionPolicy<>(duration.value);
-    }
-
-    public void setSlidingInterval(Count count) {
-        this.triggerPolicy = new CountTriggerPolicy<>(count.value, this);
-    }
-
-    public void setSlidingInterval(Duration duration) {
-        this.triggerPolicy = new TimeTriggerPolicy<>(duration.value, this);
+    public void setTriggerPolicy(TriggerPolicy<T> triggerPolicy) {
+        this.triggerPolicy = triggerPolicy;
     }
 
     /**
@@ -99,8 +92,21 @@ public class WindowManager<T> implements TriggerHandler {
      * @param ts    the timestamp
      */
     public void add(T event, long ts) {
-        Event<T> windowEvent = new EventImpl<T>(event, ts);
-        window.add(windowEvent);
+        add(new EventImpl<T>(event, ts));
+    }
+
+    /**
+     * Tracks a window event
+     *
+     * @param windowEvent the window event to track
+     */
+    public void add(Event<T> windowEvent) {
+        // watermark events are not added to the queue.
+        if (!windowEvent.isWatermark()) {
+            queue.add(windowEvent);
+        } else {
+            LOG.debug("Got watermark event with ts {}", windowEvent.getTimestamp());
+        }
         track(windowEvent);
         compactWindow();
     }
@@ -109,7 +115,7 @@ public class WindowManager<T> implements TriggerHandler {
      * The callback invoked by the trigger policy.
      */
     @Override
-    public void onTrigger() {
+    public boolean onTrigger() {
         List<Event<T>> windowEvents = null;
         List<T> expired = null;
         try {
@@ -118,7 +124,7 @@ public class WindowManager<T> implements TriggerHandler {
              * scan the entire window to handle out of order events in
              * the case of time based windows.
              */
-            windowEvents = expireEvents(true);
+            windowEvents = scanEvents(true);
             expired = new ArrayList<>(expiredEvents);
             expiredEvents.clear();
         } finally {
@@ -133,10 +139,15 @@ public class WindowManager<T> implements TriggerHandler {
             }
         }
         prevWindowEvents.clear();
-        prevWindowEvents.addAll(windowEvents);
-        LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", windowEvents.size());
-        windowLifecycleListener.onActivation(events, newEvents, expired);
+        if (!events.isEmpty()) {
+            prevWindowEvents.addAll(windowEvents);
+            LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
+            windowLifecycleListener.onActivation(events, newEvents, expired);
+        } else {
+            LOG.debug("No events in the window, skipping onActivation");
+        }
         triggerPolicy.reset();
+        return !events.isEmpty();
     }
 
     public void shutdown() {
@@ -153,7 +164,7 @@ public class WindowManager<T> implements TriggerHandler {
      */
     private void compactWindow() {
         if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) {
-            expireEvents(false);
+            scanEvents(false);
         }
     }
 
@@ -167,29 +178,30 @@ public class WindowManager<T> implements TriggerHandler {
     }
 
     /**
-     * Expire events from the window, using the expiration policy to check
+     * Scan events in the queue, using the expiration policy to check
      * if the event should be evicted or not.
      *
-     * @param fullScan if set, will scan the entire window; if not set, will stop
+     * @param fullScan if set, will scan the entire queue; if not set, will stop
      *                 as soon as an event not satisfying the expiration policy is found
-     * @return the list of remaining events in the window after expiry
+     * @return the list of events to be processed as a part of the current window
      */
-    private List<Event<T>> expireEvents(boolean fullScan) {
-        LOG.debug("Expire events, eviction policy {}", evictionPolicy);
+    private List<Event<T>> scanEvents(boolean fullScan) {
+        LOG.debug("Scan events, eviction policy {}", evictionPolicy);
         List<T> eventsToExpire = new ArrayList<>();
-        List<Event<T>> remaining = new ArrayList<>();
+        List<Event<T>> eventsToProcess = new ArrayList<>();
         try {
             lock.lock();
-            Iterator<Event<T>> it = window.iterator();
+            Iterator<Event<T>> it = queue.iterator();
             while (it.hasNext()) {
                 Event<T> windowEvent = it.next();
-                if (evictionPolicy.evict(windowEvent)) {
+                Action action = evictionPolicy.evict(windowEvent);
+                if (action == EXPIRE) {
                     eventsToExpire.add(windowEvent.get());
                     it.remove();
-                } else if (!fullScan) {
+                } else if (!fullScan || action == STOP) {
                     break;
-                } else {
-                    remaining.add(windowEvent);
+                } else if (action == PROCESS) {
+                    eventsToProcess.add(windowEvent);
                 }
             }
             expiredEvents.addAll(eventsToExpire);
@@ -198,8 +210,73 @@ public class WindowManager<T> implements TriggerHandler {
         }
         eventsSinceLastExpiry.set(0);
         LOG.debug("[{}] events expired from window.", eventsToExpire.size());
-        windowLifecycleListener.onExpiry(eventsToExpire);
-        return remaining;
+        if (!eventsToExpire.isEmpty()) {
+            LOG.debug("invoking windowLifecycleListener.onExpiry");
+            windowLifecycleListener.onExpiry(eventsToExpire);
+        }
+        return eventsToProcess;
+    }
+
+    /**
+     * Scans the event queue and returns the next earliest event ts
+     * between the startTs and endTs
+     *
+     * @param startTs the start ts (exclusive)
+     * @param endTs the end ts (inclusive)
+     * @return the earliest event ts between startTs and endTs
+     */
+    public long getEarliestEventTs(long startTs, long endTs) {
+        long minTs = Long.MAX_VALUE;
+        for (Event<T> event : queue) {
+            if (event.getTimestamp() > startTs && event.getTimestamp() <= endTs) {
+                minTs = Math.min(minTs, event.getTimestamp());
+            }
+        }
+        return minTs;
+    }
+
+    /**
+     * Scans the event queue and returns number of events having
+     * timestamp less than or equal to the reference time.
+     *
+     * @param referenceTime the reference timestamp in millis
+     * @return the count of events with timestamp less than or equal to referenceTime
+     */
+    public int getEventCount(long referenceTime) {
+        int count = 0;
+        for (Event<T> event : queue) {
+            if (event.getTimestamp() <= referenceTime) {
+                ++count;
+            }
+        }
+        return count;
+    }
+
+    /**
+     * Scans the event queue and returns the list of event ts
+     * falling between startTs (exclusive) and endTs (inclusive)
+     * at each sliding interval counts.
+     *
+     * @param startTs the start timestamp (exclusive)
+     * @param endTs the end timestamp (inclusive)
+     * @param slidingCount the sliding interval count
+     * @return the list of event ts
+     */
+    public List<Long> getSlidingCountTimestamps(long startTs, long endTs, int slidingCount) {
+        List<Long> timestamps = new ArrayList<>();
+        if (endTs > startTs) {
+            int count = 0;
+            long ts = Long.MIN_VALUE;
+            for (Event<T> event : queue) {
+                if (event.getTimestamp() > startTs && event.getTimestamp() <= endTs) {
+                    ts = Math.max(ts, event.getTimestamp());
+                    if (++count % slidingCount == 0) {
+                        timestamps.add(ts);
+                    }
+                }
+            }
+        }
+        return timestamps;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java b/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
new file mode 100644
index 0000000..1c59f3a
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.topology;
+
+import backtype.storm.Config;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseWindowedBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import backtype.storm.tuple.Values;
+import backtype.storm.windowing.TupleWindow;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit tests for {@link WindowedBoltExecutor}
+ */
+public class WindowedBoltExecutorTest {
+
+    private WindowedBoltExecutor executor;
+    private TestWindowedBolt testWindowedBolt;
+
+    private static class TestWindowedBolt extends BaseWindowedBolt {
+        List<TupleWindow> tupleWindows = new ArrayList<>();
+
+        @Override
+        public void execute(TupleWindow input) {
+            //System.out.println(input);
+            tupleWindows.add(input);
+        }
+    }
+
+    private GeneralTopologyContext getContext(final Fields fields) {
+        TopologyBuilder builder = new TopologyBuilder();
+        return new GeneralTopologyContext(builder.createTopology(),
+                                          new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+            @Override
+            public Fields getComponentOutputFields(String componentId, String streamId) {
+                return fields;
+            }
+
+        };
+    }
+
+    private Tuple getTuple(String streamId, final Fields fields, Values values) {
+        return new TupleImpl(getContext(fields), values, 1, streamId) {
+            @Override
+            public GlobalStreamId getSourceGlobalStreamId() {
+                return new GlobalStreamId("s1", "default");
+            }
+        };
+    }
+
+    private OutputCollector getOutputCollector() {
+        return Mockito.mock(OutputCollector.class);
+    }
+
+    private TopologyContext getTopologyContext() {
+        TopologyContext context = Mockito.mock(TopologyContext.class);
+        Map<GlobalStreamId, Grouping> sources = Collections.singletonMap(
+                new GlobalStreamId("s1", "default"),
+                null
+        );
+        Mockito.when(context.getThisSources()).thenReturn(sources);
+        return context;
+    }
+
+    @Before
+    public void setUp() {
+        testWindowedBolt = new TestWindowedBolt();
+        executor = new WindowedBoltExecutor(testWindowedBolt);
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+        conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20);
+        conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10);
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME, "ts");
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5);
+        conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 100);
+        executor.prepare(conf, getTopologyContext(), getOutputCollector());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testExecuteWithoutTs() throws Exception {
+        executor.execute(getTuple("s1", new Fields("a"), new Values(1)));
+    }
+
+    @Test
+    public void testExecuteWithTs() throws Exception {
+        long[] timstamps = {603, 605, 607, 618, 626, 636};
+        for (long ts : timstamps) {
+            executor.execute(getTuple("s1", new Fields("ts"), new Values(ts)));
+        }
+        Thread.sleep(120);
+        //System.out.println(testWindowedBolt.tupleWindows);
+        assertEquals(3, testWindowedBolt.tupleWindows.size());
+        TupleWindow first = testWindowedBolt.tupleWindows.get(0);
+        assertArrayEquals(new long[]{603, 605, 607},
+                          new long[]{(long) first.get().get(0).getValue(0), (long)first.get().get(1).getValue(0),
+                                  (long)first.get().get(2).getValue(0)});
+
+        TupleWindow second = testWindowedBolt.tupleWindows.get(1);
+        assertArrayEquals(new long[]{603, 605, 607, 618},
+                          new long[]{(long) second.get().get(0).getValue(0), (long) second.get().get(1).getValue(0),
+                                  (long) second.get().get(2).getValue(0), (long) second.get().get(3).getValue(0)});
+
+        TupleWindow third = testWindowedBolt.tupleWindows.get(2);
+        assertArrayEquals(new long[]{618, 626},
+                          new long[]{(long) third.get().get(0).getValue(0), (long)third.get().get(1).getValue(0)});
+    }
+}
\ No newline at end of file