You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/11/20 08:10:32 UTC

[1/6] storm git commit: STORM-1167: Add windowing support for storm core

Repository: storm
Updated Branches:
  refs/heads/master ad0c4efd9 -> ab5b2f92c


STORM-1167: Add windowing support for storm core

1. Added new interface IWindowedBolt and wrapper classes for bolts that need windowing support
2. New constants for specifying the window length and sliding interval
3. WindowManager and related classes that handles the windowing logic


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/77429d07
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/77429d07
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/77429d07

Branch: refs/heads/master
Commit: 77429d07d7c080962b8c414cf02ee93839b25ca2
Parents: f3568d7
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Wed Nov 4 16:59:19 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Wed Nov 4 16:59:19 2015 +0530

----------------------------------------------------------------------
 .../storm/starter/SlidingWindowTopology.java    | 129 +++++++++++
 .../src/clj/backtype/storm/daemon/executor.clj  |   4 +
 storm-core/src/jvm/backtype/storm/Config.java   |  29 +++
 .../backtype/storm/task/OutputCollector.java    |   2 +-
 .../backtype/storm/topology/IWindowedBolt.java  |  40 ++++
 .../storm/topology/TopologyBuilder.java         |  17 ++
 .../storm/topology/WindowedBoltExecutor.java    | 200 +++++++++++++++++
 .../storm/topology/base/BaseWindowedBolt.java   | 184 +++++++++++++++
 .../storm/windowing/CountEvictionPolicy.java    |  68 ++++++
 .../storm/windowing/CountTriggerPolicy.java     |  62 +++++
 .../src/jvm/backtype/storm/windowing/Event.java |  41 ++++
 .../jvm/backtype/storm/windowing/EventImpl.java |  38 ++++
 .../storm/windowing/EvictionPolicy.java         |  37 +++
 .../storm/windowing/TimeEvictionPolicy.java     |  54 +++++
 .../storm/windowing/TimeTriggerPolicy.java      |  78 +++++++
 .../storm/windowing/TriggerHandler.java         |  29 +++
 .../backtype/storm/windowing/TriggerPolicy.java |  40 ++++
 .../backtype/storm/windowing/TupleWindow.java   |  26 +++
 .../storm/windowing/TupleWindowImpl.java        |  61 +++++
 .../jvm/backtype/storm/windowing/Window.java    |  48 ++++
 .../windowing/WindowLifecycleListener.java      |  42 ++++
 .../backtype/storm/windowing/WindowManager.java | 210 +++++++++++++++++
 .../storm/windowing/WindowManagerTest.java      | 224 +++++++++++++++++++
 23 files changed, 1662 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java b/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
new file mode 100644
index 0000000..e2ffbdd
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.topology.base.BaseWindowedBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import backtype.storm.windowing.TupleWindow;
+import storm.starter.bolt.PrinterBolt;
+
+import java.util.Map;
+import java.util.Random;
+
+import static backtype.storm.topology.base.BaseWindowedBolt.Count;
+import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
+
+/**
+ * A sample topology that demonstrates the usage of {@link backtype.storm.topology.IWindowedBolt}
+ * to calculate sliding window sum.
+ */
+public class SlidingWindowTopology {
+
+    /*
+     * emits random integers every 100 ms
+     */
+    private static class RandomIntegerSpout extends BaseRichSpout {
+        SpoutOutputCollector collector;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("value"));
+        }
+
+        @Override
+        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void nextTuple() {
+            Utils.sleep(100);
+            Random rand = new Random();
+            Integer value = rand.nextInt(1000);
+            collector.emit(new Values(value));
+        }
+    }
+
+    /*
+     * Computes sliding window sum
+     */
+    private static class SlidingWindowSumBolt extends BaseWindowedBolt {
+        private int sum = 0;
+        private OutputCollector collector;
+
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(TupleWindow inputWindow) {
+            System.out.println("Events in current window: " + inputWindow.get().size());
+            for (Tuple tuple : inputWindow.getNew()) {
+                sum += (int) tuple.getValue(0);
+            }
+            for (Tuple tuple : inputWindow.getExpired()) {
+                sum -= (int) tuple.getValue(0);
+            }
+            collector.emit(new Values(sum));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("sum"));
+        }
+
+    }
+
+
+    public static void main(String[] args) throws Exception {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("integer", new RandomIntegerSpout(), 1);
+        builder.setBolt("window", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)),
+                        //new SlidingWindowSumBolt().withTumblingWindow(new Duration(20, TimeUnit.SECONDS))
+                        1).shuffleGrouping("integer");
+        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("window");
+        Config conf = new Config();
+        conf.setDebug(true);
+
+        if (args != null && args.length > 0) {
+            conf.setNumWorkers(1);
+
+            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+        } else {
+
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", conf, builder.createTopology());
+            Utils.sleep(40000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 1bc2db4..7e04c65 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -159,6 +159,10 @@
                         TOPOLOGY-TICK-TUPLE-FREQ-SECS
                         TOPOLOGY-SLEEP-SPOUT-WAIT-STRATEGY-TIME-MS
                         TOPOLOGY-SPOUT-WAIT-STRATEGY
+                        TOPOLOGY-BOLTS-WINDOW-LENGTH-COUNT
+                        TOPOLOGY-BOLTS-WINDOW-LENGTH-DURATION-MS
+                        TOPOLOGY-BOLTS-SLIDING-INTERVAL-COUNT
+                        TOPOLOGY-BOLTS-SLIDING-INTERVAL-DURATION-MS
                         )
         spec-conf (-> general-context
                       (.getComponentCommon component-id)

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 73e41cb..d782a10 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1449,6 +1449,35 @@ public class Config extends HashMap<String, Object> {
     @isBoolean
     public static final String TOPOLOGY_BOLTS_OUTGOING_OVERFLOW_BUFFER_ENABLE="topology.bolts.outgoing.overflow.buffer.enable";
 
+    /*
+     * Bolt-specific configuration for windowed bolts to specify the window length as a count of number of tuples
+     * in the window.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT = "topology.bolts.window.length.count";
+
+    /*
+     * Bolt-specific configuration for windowed bolts to specify the window length in time duration.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS = "topology.bolts.window.length.duration.ms";
+
+    /*
+     * Bolt-specific configuration for windowed bolts to specifiy the sliding interval as a count of number of tuples.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT = "topology.bolts.window.sliding.interval.count";
+
+    /*
+     * Bolt-specific configuration for windowed bolts to specifiy the sliding interval in time duration.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS = "topology.bolts.window.sliding.interval.duration.ms";
+
     /**
      * This config is available for TransactionalSpouts, and contains the id ( a String) for
      * the transactional topology. This id is used to store the state of the transactional

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/task/OutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/OutputCollector.java b/storm-core/src/jvm/backtype/storm/task/OutputCollector.java
index 620d33d..0214eaa 100644
--- a/storm-core/src/jvm/backtype/storm/task/OutputCollector.java
+++ b/storm-core/src/jvm/backtype/storm/task/OutputCollector.java
@@ -115,7 +115,7 @@ public class OutputCollector implements IOutputCollector {
      * @param tuple the new output tuple from this bolt
      */
     public void emitDirect(int taskId, String streamId, Tuple anchor, List<Object> tuple) {
-        emitDirect(taskId, streamId, Arrays.asList(anchor), tuple);
+        emitDirect(taskId, streamId, anchor == null ? (List) null : Arrays.asList(anchor), tuple);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/topology/IWindowedBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/IWindowedBolt.java b/storm-core/src/jvm/backtype/storm/topology/IWindowedBolt.java
new file mode 100644
index 0000000..620e7d9
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/topology/IWindowedBolt.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.topology;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.windowing.TupleWindow;
+
+import java.util.Map;
+
+/**
+ * A bolt abstraction for supporting time and count based sliding & tumbling windows.
+ */
+public interface IWindowedBolt extends IComponent {
+    /**
+     * This is similar to the {@link backtype.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector)} except
+     * that while emitting the tuples are are automatically anchored to the tuples in the inputWindow.
+     */
+    void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
+    /**
+     * Process the tuple window and optionally emit new tuples based on the tuples in the input window.
+     */
+    void execute(TupleWindow inputWindow);
+    void cleanup();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
index 38b30d7..be34a67 100644
--- a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
@@ -34,6 +34,8 @@ import backtype.storm.utils.Utils;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+
+import backtype.storm.windowing.TupleWindow;
 import org.json.simple.JSONValue;
 
 /**
@@ -177,6 +179,21 @@ public class TopologyBuilder {
     }
 
     /**
+     * Define a new bolt in this topology. This defines a windowed bolt, intended
+     * for windowing operations. The {@link IWindowedBolt#execute(TupleWindow)} method
+     * is triggered for each window interval with the list of current events in the window.
+     *
+     * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
+     * @param bolt the windowed bolt
+     * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
+     * @return use the returned object to declare the inputs to this component
+     * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
+     */
+    public BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
+        return setBolt(id, new WindowedBoltExecutor(bolt), parallelism_hint);
+    }
+
+    /**
      * Define a new spout in this topology.
      *
      * @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
new file mode 100644
index 0000000..cc6529f
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.topology;
+
+import backtype.storm.Config;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.windowing.TupleWindowImpl;
+import backtype.storm.windowing.WindowLifecycleListener;
+import backtype.storm.windowing.WindowManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static backtype.storm.topology.base.BaseWindowedBolt.Count;
+import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
+
+/**
+ * An {@link IWindowedBolt} wrapper that does the windowing of tuples.
+ */
+public class WindowedBoltExecutor implements IRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
+
+    private IWindowedBolt bolt;
+    private transient WindowedOutputCollector windowedOutputCollector;
+    private transient WindowLifecycleListener<Tuple> listener;
+    private transient WindowManager<Tuple> windowManager;
+
+    public WindowedBoltExecutor(IWindowedBolt bolt) {
+        this.bolt = bolt;
+    }
+
+    private int getTopologyTimeoutMillis(Map stormConf) {
+        if (stormConf.containsKey(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
+            boolean timeOutsEnabled = (boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
+            if (!timeOutsEnabled) {
+                return Integer.MAX_VALUE;
+            }
+        }
+        int timeout = 0;
+        if (stormConf.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
+            timeout = ((Number) stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
+        }
+        return timeout * 1000;
+    }
+
+    private void ensureDurationLessThanTimeout(int duration, int timeout) {
+        if (duration > timeout) {
+            throw new IllegalArgumentException("Window duration (length + sliding interval) value " + duration +
+                                                       " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS +
+                                                       " value " + timeout);
+        }
+    }
+
+    // TODO: add more validation
+    private void validate(Map stormConf, Count windowLengthCount, Duration windowLengthDuration,
+                          Count slidingIntervalCount, Duration slidingIntervalDuration) {
+
+        int topologyTimeout = getTopologyTimeoutMillis(stormConf);
+        if (windowLengthDuration != null && slidingIntervalDuration != null) {
+            ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout);
+        } else if (windowLengthDuration != null) {
+            ensureDurationLessThanTimeout(windowLengthDuration.value, topologyTimeout);
+        }
+    }
+
+    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf) {
+        WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener);
+        Duration windowLengthDuration = null;
+        Count windowLengthCount = null;
+        Duration slidingIntervalDuration = null;
+        Count slidingIntervalCount = null;
+        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
+            windowLengthCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
+        } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
+            windowLengthDuration = new Duration(
+                    ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
+                    TimeUnit.MILLISECONDS);
+        }
+
+        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
+            slidingIntervalCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
+        } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
+            slidingIntervalDuration = new Duration(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS);
+        } else {
+            // default is a sliding window of count 1
+            slidingIntervalCount = new Count(1);
+        }
+        // validate
+        validate(stormConf, windowLengthCount, windowLengthDuration,
+                 slidingIntervalCount, slidingIntervalDuration);
+        if (windowLengthCount != null) {
+            manager.setWindowLength(windowLengthCount);
+        } else {
+            manager.setWindowLength(windowLengthDuration);
+        }
+        if (slidingIntervalCount != null) {
+            manager.setSlidingInterval(slidingIntervalCount);
+        } else {
+            manager.setSlidingInterval(slidingIntervalDuration);
+        }
+        return manager;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.windowedOutputCollector = new WindowedOutputCollector(collector);
+        bolt.prepare(stormConf, context, windowedOutputCollector);
+        this.listener = newWindowLifecycleListener();
+        this.windowManager = initWindowManager(listener, stormConf);
+        LOG.info("Initialized window manager {} ", this.windowManager);
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        windowManager.add(input);
+    }
+
+    @Override
+    public void cleanup() {
+        windowManager.shutdown();
+        bolt.cleanup();
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        bolt.declareOutputFields(declarer);
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return bolt.getComponentConfiguration();
+    }
+
+    private WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
+        return new WindowLifecycleListener<Tuple>() {
+            @Override
+            public void onExpiry(List<Tuple> tuples) {
+                for (Tuple tuple : tuples) {
+                    windowedOutputCollector.ack(tuple);
+                }
+            }
+
+            @Override
+            public void onActivation(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples) {
+                windowedOutputCollector.setContext(tuples);
+                bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples));
+                newTuples.clear();
+                expiredTuples.clear();
+            }
+        };
+    }
+
+    /**
+     * Creates an {@link OutputCollector} wrapper that automatically
+     * anchors the tuples to inputTuples while emitting.
+     */
+    private static class WindowedOutputCollector extends OutputCollector {
+        private List<Tuple> inputTuples;
+
+        WindowedOutputCollector(IOutputCollector delegate) {
+            super(delegate);
+        }
+
+        void setContext(List<Tuple> inputTuples) {
+            this.inputTuples = inputTuples;
+        }
+
+        @Override
+        public List<Integer> emit(String streamId, List<Object> tuple) {
+            return emit(streamId, inputTuples, tuple);
+        }
+
+        @Override
+        public void emitDirect(int taskId, String streamId, List<Object> tuple) {
+            emitDirect(taskId, streamId, inputTuples, tuple);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java b/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
new file mode 100644
index 0000000..abbfa26
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.topology.base;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IWindowedBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.windowing.TupleWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class BaseWindowedBolt implements IWindowedBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class);
+
+    private transient Map<String, Object> windowConfiguration;
+
+    /**
+     * Holds a count value for count based windows and sliding intervals.
+     */
+    public static class Count {
+        public final int value;
+
+        public Count(int value) {
+            this.value = value;
+        }
+    }
+
+    /**
+     * Holds a Time duration for time based windows and sliding intervals.
+     */
+    public static class Duration {
+        public final int value;
+
+        public Duration(int value, TimeUnit timeUnit) {
+            this.value = (int) timeUnit.toMillis(value);
+        }
+    }
+
+    protected BaseWindowedBolt() {
+        windowConfiguration = new HashMap<>();
+    }
+
+    private BaseWindowedBolt withWindowLength(Count count) {
+        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, count.value);
+        return this;
+    }
+
+    private BaseWindowedBolt withWindowLength(Duration duration) {
+        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, duration.value);
+        return this;
+    }
+
+    private BaseWindowedBolt withSlidingInterval(Count count) {
+        windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, count.value);
+        return this;
+    }
+
+    private BaseWindowedBolt withSlidingInterval(Duration duration) {
+        windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, duration.value);
+        return this;
+    }
+
+    /**
+     * Tuple count based sliding window configuration.
+     *
+     * @param windowLength    the number of tuples in the window
+     * @param slidingInterval the number of tuples after which the window slides
+     */
+    public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) {
+        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+    }
+
+    /**
+     * Tuple count and time duration based sliding window configuration.
+     *
+     * @param windowLength    the number of tuples in the window
+     * @param slidingInterval the time duration after which the window slides
+     */
+    public BaseWindowedBolt withWindow(Count windowLength, Duration slidingInterval) {
+        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+    }
+
+    /**
+     * Time duration and count based sliding window configuration.
+     *
+     * @param windowLength    the time duration of the window
+     * @param slidingInterval the number of tuples after which the window slides
+     */
+    public BaseWindowedBolt withWindow(Duration windowLength, Count slidingInterval) {
+        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+    }
+
+    /**
+     * Time duration based sliding window configuration.
+     *
+     * @param windowLength    the time duration of the window
+     * @param slidingInterval the time duration after which the window slides
+     */
+    public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) {
+        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+    }
+
+    /**
+     * A tuple count based window that slides with every incoming tuple.
+     *
+     * @param windowLength the number of tuples in the window
+     */
+    public BaseWindowedBolt withWindow(Count windowLength) {
+        return withWindowLength(windowLength).withSlidingInterval(new Count(1));
+    }
+
+
+    /**
+     * A time duration based window that slides with every incoming tuple.
+     *
+     * @param windowLength the time duration of the window
+     */
+    public BaseWindowedBolt withWindow(Duration windowLength) {
+        return withWindowLength(windowLength).withSlidingInterval(new Count(1));
+    }
+
+    /**
+     * A count based tumbling window.
+     *
+     * @param count the number of tuples after which the window tumbles
+     */
+    public BaseWindowedBolt withTumblingWindow(Count count) {
+        return withWindowLength(count).withSlidingInterval(count);
+    }
+
+    /**
+     * A time duration based tumbling window.
+     *
+     * @param duration the time duration after which the window tumbles
+     */
+    public BaseWindowedBolt withTumblingWindow(Duration duration) {
+        return withWindowLength(duration).withSlidingInterval(duration);
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    }
+
+    @Override
+    public void execute(TupleWindow inputWindow) {
+
+    }
+
+    @Override
+    public void cleanup() {
+
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return windowConfiguration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
new file mode 100644
index 0000000..97d4332
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An eviction policy that tracks event counts and can
+ * evict based on a threshold count.
+ *
+ * @param <T> the type of event tracked by this policy.
+ */
+public class CountEvictionPolicy<T> implements EvictionPolicy<T> {
+    private final int threshold;
+    private AtomicInteger currentCount;
+
+    public CountEvictionPolicy(int count) {
+        this.threshold = count;
+        this.currentCount = new AtomicInteger();
+    }
+
+    @Override
+    public boolean evict(Event<T> event) {
+        /*
+         * atomically decrement the count if its greater than threshold and
+         * return if the event should be evicted
+         */
+        while (true) {
+            int curVal = currentCount.get();
+            if (curVal > threshold) {
+                if (currentCount.compareAndSet(curVal, curVal - 1)) {
+                    return true;
+                }
+            } else {
+                break;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public void track(Event<T> event) {
+        currentCount.incrementAndGet();
+    }
+
+    @Override
+    public String toString() {
+        return "CountEvictionPolicy{" +
+                "threshold=" + threshold +
+                ", currentCount=" + currentCount +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
new file mode 100644
index 0000000..022eb9e
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A trigger that tracks event counts and calls back {@link TriggerHandler#onTrigger()}
+ * when the count threshold is hit.
+ *
+ * @param <T> the type of event tracked by this policy.
+ */
+public class CountTriggerPolicy<T> implements TriggerPolicy<T> {
+    private final int count;
+    private AtomicInteger currentCount;
+    private TriggerHandler handler;
+
+    public CountTriggerPolicy(int count, TriggerHandler handler) {
+        this.count = count;
+        this.currentCount = new AtomicInteger();
+        this.handler = handler;
+    }
+
+    @Override
+    public void track(Event<T> event) {
+        if (currentCount.incrementAndGet() >= count) {
+            handler.onTrigger();
+        }
+    }
+
+    @Override
+    public void reset() {
+        currentCount.set(0);
+    }
+
+    @Override
+    public void shutdown() {
+    }
+
+    @Override
+    public String toString() {
+        return "CountTriggerPolicy{" +
+                "count=" + count +
+                ", currentCount=" + currentCount +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/windowing/Event.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/Event.java b/storm-core/src/jvm/backtype/storm/windowing/Event.java
new file mode 100644
index 0000000..e33ee71
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/Event.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+/**
+ * An event is a wrapper object that gets stored in the window.
+ *
+ * @param <T> the type of the object thats wrapped. E.g Tuple
+ */
+interface Event<T> {
+    /**
+     * The event timestamp in millis. This could be the time
+     * when the source generated the tuple or if not the time
+     * when the tuple was received by a bolt.
+     *
+     * @return
+     */
+    long getTs();
+
+    /**
+     * Returns the wrapped object, E.g. a tuple
+     *
+     * @return the wrapped object.
+     */
+    T get();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java b/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
new file mode 100644
index 0000000..1d43319
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+class EventImpl<T> implements Event<T> {
+    private final T event;
+    private long ts;
+
+    EventImpl(T event, long ts) {
+        this.event = event;
+        this.ts = ts;
+    }
+
+    @Override
+    public long getTs() {
+        return ts;
+    }
+
+    @Override
+    public T get() {
+        return event;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
new file mode 100644
index 0000000..fcf1e53
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+/**
+ * Eviction policy tracks events and decides whether
+ * an event should be evicted from the window or not.
+ *
+ * @param <T> the type of event that is tracked.
+ */
+public interface EvictionPolicy<T> {
+    /**
+     * returns true if the event should be evicted from the window.
+     */
+    boolean evict(Event<T> event);
+
+    /**
+     * Tracks the event to later decide whether
+     * {@link EvictionPolicy#evict(Event)} should evict the event.
+     */
+    void track(Event<T> event);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
new file mode 100644
index 0000000..4cda2d3
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Eviction policy that evicts events based on time duration.
+ */
+public class TimeEvictionPolicy<T> implements EvictionPolicy<T> {
+    private final long duration;
+
+    public TimeEvictionPolicy(long millis) {
+        this.duration = millis;
+    }
+
+    /**
+     * Returns true if the event falls out of the window based on the window duration
+     *
+     * @param event
+     * @return
+     */
+    @Override
+    public boolean evict(Event<T> event) {
+        return (System.currentTimeMillis() - event.getTs()) >= duration;
+    }
+
+    @Override
+    public void track(Event<T> event) {
+        // no-op
+    }
+
+    @Override
+    public String toString() {
+        return "TimeEvictionPolicy{" +
+                "duration=" + duration +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
new file mode 100644
index 0000000..9e8612a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Invokes {@link TriggerHandler#onTrigger()} after the duration.
+ */
+public class TimeTriggerPolicy<T> implements TriggerPolicy<T> {
+    private long duration;
+    private TriggerHandler handler;
+    private ScheduledExecutorService ex;
+
+    public TimeTriggerPolicy(long millis, TriggerHandler handler) {
+        this.duration = millis;
+        this.handler = handler;
+        start();
+    }
+
+    @Override
+    public void track(Event<T> event) {
+        // no-op
+    }
+
+    @Override
+    public void reset() {
+        // no-op
+    }
+
+    @Override
+    public void shutdown() {
+        ex.shutdown();
+        try {
+            if (!ex.awaitTermination(2, TimeUnit.SECONDS)) {
+                ex.shutdownNow();
+            }
+        } catch (InterruptedException ie) {
+            ex.shutdownNow();
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    private void start() {
+        Runnable task = new Runnable() {
+            @Override
+            public void run() {
+                handler.onTrigger();
+            }
+        };
+        ex = Executors.newSingleThreadScheduledExecutor();
+        ex.scheduleAtFixedRate(task, duration, duration, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public String toString() {
+        return "TimeTriggerPolicy{" +
+                "duration=" + duration +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java b/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java
new file mode 100644
index 0000000..f947951
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+/**
+ * The callback fired by {@link TriggerPolicy} when the trigger
+ * condition is satisfied.
+ */
+interface TriggerHandler {
+    /**
+     * the code to execute when the {@link TriggerPolicy} condition is satisfied.
+     */
+    void onTrigger();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/windowing/TriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/TriggerPolicy.java
new file mode 100644
index 0000000..99eaaad
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/TriggerPolicy.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+/**
+ * Triggers the window calculations based on the policy.
+ *
+ * @param <T> the type of the event that is tracked
+ */
+public interface TriggerPolicy<T> {
+    /**
+     * Tracks the event and could use this to invoke the trigger.
+     */
+    void track(Event<T> event);
+
+    /**
+     * resets the trigger policy
+     */
+    void reset();
+
+    /**
+     * Any clean up could be handled here.
+     */
+    void shutdown();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/windowing/TupleWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TupleWindow.java b/storm-core/src/jvm/backtype/storm/windowing/TupleWindow.java
new file mode 100644
index 0000000..cf7408b
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/TupleWindow.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import backtype.storm.tuple.Tuple;
+
+/**
+ * A {@link Window} that contains {@link Tuple} objects.
+ */
+public interface TupleWindow extends Window<Tuple> {
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/windowing/TupleWindowImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TupleWindowImpl.java b/storm-core/src/jvm/backtype/storm/windowing/TupleWindowImpl.java
new file mode 100644
index 0000000..56a395f
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/TupleWindowImpl.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import backtype.storm.tuple.Tuple;
+
+import java.util.List;
+
+/**
+ * Holds the expired, new and current tuples in a window.
+ */
+public class TupleWindowImpl implements TupleWindow {
+    private final List<Tuple> tuples;
+    private final List<Tuple> newTuples;
+    private final List<Tuple> expiredTuples;
+
+    public TupleWindowImpl(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples) {
+        this.tuples = tuples;
+        this.newTuples = newTuples;
+        this.expiredTuples = expiredTuples;
+    }
+
+    @Override
+    public List<Tuple> get() {
+        return tuples;
+    }
+
+    @Override
+    public List<Tuple> getNew() {
+        return newTuples;
+    }
+
+    @Override
+    public List<Tuple> getExpired() {
+        return expiredTuples;
+    }
+
+    @Override
+    public String toString() {
+        return "TupleWindowImpl{" +
+                "tuples=" + tuples +
+                ", newTuples=" + newTuples +
+                ", expiredTuples=" + expiredTuples +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/windowing/Window.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/Window.java b/storm-core/src/jvm/backtype/storm/windowing/Window.java
new file mode 100644
index 0000000..9505e33
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/Window.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import java.util.List;
+
+/**
+ * A view of events in a sliding window.
+ *
+ * @param <T> the type of event that this window contains. E.g. {@link backtype.storm.tuple.Tuple}
+ */
+public interface Window<T> {
+    /**
+     * Gets the list of events in the window.
+     *
+     * @return the list of events in the window.
+     */
+    List<T> get();
+
+    /**
+     * Get the list of newly added events in the window since the last time the window was generated.
+     *
+     * @return the list of newly added events in the window.
+     */
+    List<T> getNew();
+
+    /**
+     * Get the list of events expired from the window since the last time the window was generated.
+     *
+     * @return the list of events expired from the window.
+     */
+    List<T> getExpired();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/windowing/WindowLifecycleListener.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WindowLifecycleListener.java b/storm-core/src/jvm/backtype/storm/windowing/WindowLifecycleListener.java
new file mode 100644
index 0000000..5954003
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WindowLifecycleListener.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import java.util.List;
+
+/**
+ * A callback for expiry, activation of events tracked by the {@link WindowManager}
+ *
+ * @param <T> The type of Event in the window (e.g. Tuple).
+ */
+public interface WindowLifecycleListener<T> {
+    /**
+     * Called on expiry of events from the window due to {@link EvictionPolicy}
+     *
+     * @param events the expired events
+     */
+    void onExpiry(List<T> events);
+
+    /**
+     * Called on activation of the window due to the {@link TriggerPolicy}
+     * @param events the list of current events in the window.
+     * @param newEvents the newly added events since last activation.
+     * @param expired the expired events since last activation.
+     */
+    void onActivation(List<T> events, List<T> newEvents, List<T> expired);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java b/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
new file mode 100644
index 0000000..bf66c92
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static backtype.storm.topology.base.BaseWindowedBolt.Count;
+import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
+
+/**
+ * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks
+ * on expiry of events or activation of the window due to {@link TriggerPolicy}.
+ *
+ * @param <T> the type of event in the window.
+ */
+public class WindowManager<T> implements TriggerHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(WindowManager.class);
+
+    /**
+     * Expire old events every EXPIRE_EVENTS_THRESHOLD to
+     * keep the window size in check.
+     */
+    public static final int EXPIRE_EVENTS_THRESHOLD = 100;
+
+    private WindowLifecycleListener<T> windowLifecycleListener;
+    private ConcurrentLinkedQueue<Event<T>> window;
+    private EvictionPolicy<T> evictionPolicy;
+    private TriggerPolicy<T> triggerPolicy;
+    private List<T> expiredEvents;
+    private Set<Event<T>> prevWindowEvents;
+    private AtomicInteger eventsSinceLastExpiry;
+    private ReentrantLock lock;
+
+    public WindowManager(WindowLifecycleListener<T> lifecycleListener) {
+        windowLifecycleListener = lifecycleListener;
+        window = new ConcurrentLinkedQueue<>();
+        expiredEvents = new ArrayList<>();
+        prevWindowEvents = new HashSet<>();
+        eventsSinceLastExpiry = new AtomicInteger();
+        lock = new ReentrantLock(true);
+    }
+
+    public void setWindowLength(Count count) {
+        this.evictionPolicy = new CountEvictionPolicy<>(count.value);
+    }
+
+    public void setWindowLength(Duration duration) {
+        this.evictionPolicy = new TimeEvictionPolicy<>(duration.value);
+    }
+
+    public void setSlidingInterval(Count count) {
+        this.triggerPolicy = new CountTriggerPolicy<>(count.value, this);
+    }
+
+    public void setSlidingInterval(Duration duration) {
+        this.triggerPolicy = new TimeTriggerPolicy<>(duration.value, this);
+    }
+
+    /**
+     * Add an event into the window, with {@link System#currentTimeMillis()} as
+     * the tracking ts.
+     *
+     * @param event the event to add
+     */
+    public void add(T event) {
+        add(event, System.currentTimeMillis());
+    }
+
+    /**
+     * Add an event into the window, with the given ts as the tracking ts.
+     *
+     * @param event the event to track
+     * @param ts    the timestamp
+     */
+    public void add(T event, long ts) {
+        Event<T> windowEvent = new EventImpl<T>(event, ts);
+        window.add(windowEvent);
+        track(windowEvent);
+        compactWindow();
+    }
+
+    /**
+     * The callback invoked by the trigger policy.
+     */
+    @Override
+    public void onTrigger() {
+        List<Event<T>> windowEvents = new ArrayList<>();
+        List<T> expired = null;
+        try {
+            lock.lock();
+            /*
+             * scan the entire window to handle out of order events in
+             * the case of time based windows.
+             */
+            expireEvents(true, windowEvents);
+            expired = new ArrayList<>(expiredEvents);
+            expiredEvents.clear();
+        } finally {
+            lock.unlock();
+        }
+        List<T> events = new ArrayList<>();
+        List<T> newEvents = new ArrayList<>();
+        for (Event<T> event : windowEvents) {
+            events.add(event.get());
+            if (!prevWindowEvents.contains(event)) {
+                newEvents.add(event.get());
+            }
+        }
+        prevWindowEvents.clear();
+        prevWindowEvents.addAll(windowEvents);
+        LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", windowEvents.size());
+        windowLifecycleListener.onActivation(events, newEvents, expired);
+        triggerPolicy.reset();
+    }
+
+    public void shutdown() {
+        LOG.debug("Shutting down WindowManager");
+        if (triggerPolicy != null) {
+            triggerPolicy.shutdown();
+        }
+    }
+
+    /**
+     * expires events that fall out of the window every
+     * EXPIRE_EVENTS_THRESHOLD so that the window does not grow
+     * too big.
+     */
+    private void compactWindow() {
+        if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) {
+            expireEvents(false, null);
+        }
+    }
+
+    /**
+     * feed the event to the eviction and trigger policies
+     * for bookkeeping and optionally firing the trigger.
+     */
+    private void track(Event<T> windowEvent) {
+        evictionPolicy.track(windowEvent);
+        triggerPolicy.track(windowEvent);
+    }
+
+    /**
+     * Expire events from the window, using the expiration policy to check
+     * if the event should be evicted or not.
+     *
+     * @param fullScan  if set, will scan the entire window. if not set, will stop
+     *                  as soon as an event not satisfying the expiration policy is found.
+     * @param remaining the list of remaining events in the window after expiry.
+     */
+    private void expireEvents(boolean fullScan, List<Event<T>> remaining) {
+        LOG.debug("Expire events, eviction policy {}", evictionPolicy);
+        List<T> eventsToExpire = new ArrayList<>();
+        try {
+            lock.lock();
+            Iterator<Event<T>> it = window.iterator();
+            while (it.hasNext()) {
+                Event<T> windowEvent = it.next();
+                if (evictionPolicy.evict(windowEvent)) {
+                    eventsToExpire.add(windowEvent.get());
+                    it.remove();
+                } else if (!fullScan) {
+                    break;
+                } else if (remaining != null) {
+                    remaining.add(windowEvent);
+                }
+            }
+            expiredEvents.addAll(eventsToExpire);
+        } finally {
+            lock.unlock();
+        }
+        eventsSinceLastExpiry.set(0);
+        LOG.debug("[{}] events expired from window.", eventsToExpire.size());
+        windowLifecycleListener.onExpiry(eventsToExpire);
+    }
+
+    @Override
+    public String toString() {
+        return "WindowManager{" +
+                "evictionPolicy=" + evictionPolicy +
+                ", triggerPolicy=" + triggerPolicy +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/77429d07/storm-core/test/jvm/backtype/storm/windowing/WindowManagerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/windowing/WindowManagerTest.java b/storm-core/test/jvm/backtype/storm/windowing/WindowManagerTest.java
new file mode 100644
index 0000000..9792ab3
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/windowing/WindowManagerTest.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static backtype.storm.topology.base.BaseWindowedBolt.Count;
+import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link WindowManager}
+ */
+public class WindowManagerTest {
+    private WindowManager<Integer> windowManager;
+    private Listener listener;
+
+    private static class Listener implements WindowLifecycleListener<Integer> {
+        List<Integer> onExpiryEvents = Collections.emptyList();
+        List<Integer> onActivationEvents = Collections.emptyList();
+        List<Integer> onActivationNewEvents = Collections.emptyList();
+        List<Integer> onActivationExpiredEvents = Collections.emptyList();
+
+        @Override
+        public void onExpiry(List<Integer> events) {
+            onExpiryEvents = events;
+        }
+
+        @Override
+        public void onActivation(List<Integer> events, List<Integer> newEvents, List<Integer> expired) {
+            onActivationEvents = events;
+            onActivationNewEvents = newEvents;
+            onActivationExpiredEvents = expired;
+        }
+
+        void clear() {
+            onExpiryEvents = Collections.emptyList();
+            onActivationEvents = Collections.emptyList();
+            onActivationNewEvents = Collections.emptyList();
+            onActivationExpiredEvents = Collections.emptyList();
+        }
+    }
+
+    @Before
+    public void setUp() {
+        listener = new Listener();
+        windowManager = new WindowManager<>(listener);
+    }
+
+    @After
+    public void tearDown() {
+        windowManager.shutdown();
+    }
+
+    @Test
+    public void testCountBasedWindow() throws Exception {
+        windowManager.setWindowLength(new Count(5));
+        windowManager.setSlidingInterval(new Count(2));
+        windowManager.add(1);
+        windowManager.add(2);
+        // nothing expired yet
+        assertTrue(listener.onExpiryEvents.isEmpty());
+        assertEquals(seq(1, 2), listener.onActivationEvents);
+        assertEquals(seq(1, 2), listener.onActivationNewEvents);
+        assertTrue(listener.onActivationExpiredEvents.isEmpty());
+        windowManager.add(3);
+        windowManager.add(4);
+        // nothing expired yet
+        assertTrue(listener.onExpiryEvents.isEmpty());
+        assertEquals(seq(1, 4), listener.onActivationEvents);
+        assertEquals(seq(3, 4), listener.onActivationNewEvents);
+        assertTrue(listener.onActivationExpiredEvents.isEmpty());
+        windowManager.add(5);
+        windowManager.add(6);
+        // 1 expired
+        assertEquals(seq(1), listener.onExpiryEvents);
+        assertEquals(seq(2, 6), listener.onActivationEvents);
+        assertEquals(seq(5, 6), listener.onActivationNewEvents);
+        assertEquals(seq(1), listener.onActivationExpiredEvents);
+        listener.clear();
+        windowManager.add(7);
+        // nothing expires until threshold is hit
+        assertTrue(listener.onExpiryEvents.isEmpty());
+        windowManager.add(8);
+        // 1 expired
+        assertEquals(seq(2, 3), listener.onExpiryEvents);
+        assertEquals(seq(4, 8), listener.onActivationEvents);
+        assertEquals(seq(7, 8), listener.onActivationNewEvents);
+        assertEquals(seq(2, 3), listener.onActivationExpiredEvents);
+    }
+
+    @Test
+    public void testExpireThreshold() throws Exception {
+        int threshold = WindowManager.EXPIRE_EVENTS_THRESHOLD;
+        int windowLength = 5;
+        windowManager.setWindowLength(new Count(5));
+        windowManager.setSlidingInterval(new Duration(1, TimeUnit.HOURS));
+        for (int i : seq(1, 5)) {
+            windowManager.add(i);
+        }
+        // nothing expired yet
+        assertTrue(listener.onExpiryEvents.isEmpty());
+        for (int i : seq(6, 10)) {
+            windowManager.add(i);
+        }
+        for (int i : seq(11, threshold)) {
+            windowManager.add(i);
+        }
+        // window should be compacted and events should be expired.
+        assertEquals(seq(1, threshold - windowLength), listener.onExpiryEvents);
+    }
+
+
+    @Test
+    public void testTimeBasedWindow() throws Exception {
+        windowManager.setWindowLength(new Duration(1, TimeUnit.SECONDS));
+        windowManager.setSlidingInterval(new Duration(100, TimeUnit.MILLISECONDS));
+        long now = System.currentTimeMillis();
+
+        // add with past ts
+        for (int i : seq(1, 50)) {
+            windowManager.add(i, now - 1000);
+        }
+
+        // add with current ts
+        for (int i : seq(51, WindowManager.EXPIRE_EVENTS_THRESHOLD)) {
+            windowManager.add(i, now);
+        }
+        // first 50 should have expired due to expire events threshold
+        assertEquals(50, listener.onExpiryEvents.size());
+
+        // add more events with past ts
+        for (int i : seq(WindowManager.EXPIRE_EVENTS_THRESHOLD + 1, WindowManager.EXPIRE_EVENTS_THRESHOLD + 100)) {
+            windowManager.add(i, now - 1000);
+        }
+        // wait for time trigger
+        Thread.sleep(120);
+
+        // 100 events with past ts should expire
+        assertEquals(100, listener.onExpiryEvents.size());
+        assertEquals(seq(WindowManager.EXPIRE_EVENTS_THRESHOLD + 1, WindowManager.EXPIRE_EVENTS_THRESHOLD + 100),
+                     listener.onExpiryEvents);
+        List<Integer> activationsEvents = seq(51, WindowManager.EXPIRE_EVENTS_THRESHOLD);
+        assertEquals(seq(51, WindowManager.EXPIRE_EVENTS_THRESHOLD), listener.onActivationEvents);
+        assertEquals(seq(51, WindowManager.EXPIRE_EVENTS_THRESHOLD), listener.onActivationNewEvents);
+        // activation expired list should contain even the ones expired due to EXPIRE_EVENTS_THRESHOLD
+        List<Integer> expiredList = seq(1, 50);
+        expiredList.addAll(seq(WindowManager.EXPIRE_EVENTS_THRESHOLD + 1, WindowManager.EXPIRE_EVENTS_THRESHOLD + 100));
+        assertEquals(expiredList, listener.onActivationExpiredEvents);
+
+        listener.clear();
+        // add more events with current ts
+        List<Integer> newEvents = seq(WindowManager.EXPIRE_EVENTS_THRESHOLD + 101, WindowManager.EXPIRE_EVENTS_THRESHOLD + 200);
+        for (int i : newEvents) {
+            windowManager.add(i, now);
+        }
+        activationsEvents.addAll(newEvents);
+        // wait for time trigger
+        Thread.sleep(120);
+        assertTrue(listener.onExpiryEvents.isEmpty());
+        assertEquals(activationsEvents, listener.onActivationEvents);
+        assertEquals(newEvents, listener.onActivationNewEvents);
+
+    }
+
+
+    @Test
+    public void testTimeBasedWindowExpiry() throws Exception {
+        windowManager.setWindowLength(new Duration(100, TimeUnit.MILLISECONDS));
+        windowManager.setSlidingInterval(new Duration(60, TimeUnit.MILLISECONDS));
+        // add 10 events
+        for (int i : seq(1, 10)) {
+            windowManager.add(i);
+        }
+        Thread.sleep(70);
+        assertEquals(seq(1, 10), listener.onActivationEvents);
+        assertTrue(listener.onActivationExpiredEvents.isEmpty());
+        listener.clear();
+        // wait so all events expire
+        Thread.sleep(70);
+        assertEquals(seq(1, 10), listener.onActivationExpiredEvents);
+        assertTrue(listener.onActivationEvents.isEmpty());
+        listener.clear();
+        Thread.sleep(70);
+        assertTrue(listener.onActivationExpiredEvents.isEmpty());
+        assertTrue(listener.onActivationEvents.isEmpty());
+
+    }
+
+    private List<Integer> seq(int start) {
+        return seq(start, start);
+    }
+
+    private List<Integer> seq(int start, int stop) {
+        List<Integer> ints = new ArrayList<>();
+        for (int i = start; i <= stop; i++) {
+            ints.add(i);
+        }
+        return ints;
+    }
+}
\ No newline at end of file


[4/6] storm git commit: Added documentation for windowing support in storm core

Posted by sr...@apache.org.
Added documentation for windowing support in storm core


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/194aeca7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/194aeca7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/194aeca7

Branch: refs/heads/master
Commit: 194aeca7ebe90a40318a1318c09113e31b02f86a
Parents: eb12941
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Mon Nov 9 10:30:42 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Mon Nov 9 10:30:54 2015 +0530

----------------------------------------------------------------------
 docs/documentation/Windowing.md | 144 +++++++++++++++++++++++++++++++++++
 1 file changed, 144 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/194aeca7/docs/documentation/Windowing.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Windowing.md b/docs/documentation/Windowing.md
new file mode 100644
index 0000000..8f9d758
--- /dev/null
+++ b/docs/documentation/Windowing.md
@@ -0,0 +1,144 @@
+# Windowing support in core storm
+
+Storm core has support for processing a group of tuples that falls within a window. Windows are specified with the 
+following two parameters,
+
+1. Window length - the length or duration of the window
+2. Sliding interval - the interval at which the windowing slides
+
+## Sliding Window
+
+Tuples are grouped in windows and window slides every sliding interval. A tuple can belong to more than one window.
+
+For example a time duration based sliding window with length 10 secs and sliding interval of 5 seconds.
+
+```
+| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
+0       5             10         15    -> time
+
+|<------- w1 -------->|
+        |------------ w2 ------->|
+```
+
+The window is evaluated every 5 seconds and some of the tuples in the first window overlaps with the second one.
+		
+
+## Tumbling Window
+
+Tuples are grouped in a single window based on time or count. Any tuple belongs to only one of the windows.
+
+For example a time duration based tumbling window with length 5 secs.
+
+```
+| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
+0       5             10         15    -> time
+   w1         w2            w3
+```
+
+The window is evaluated every five seconds and none of the windows overlap.
+
+Storm supports specifying the window length and sliding intervals as a count of the number of tuples or as a time duration.
+
+The bolt interface `IWindowedBolt` is implemented by bolts that needs windowing support.
+
+```java
+public interface IWindowedBolt extends IComponent {
+    void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
+    /**
+     * Process tuples falling within the window and optionally emit 
+     * new tuples based on the tuples in the input window.
+     */
+    void execute(TupleWindow inputWindow);
+    void cleanup();
+}
+```
+
+Every time the window activates, the `execute` method is invoked. The TupleWindow parameter gives access to the current tuples
+in the window, the tuples that expired and the new tuples that are added since last window was computed which will be useful 
+for efficient windowing computations.
+
+Bolts that needs windowing support typically would extend `BaseWindowedBolt` which has the apis for specifying the
+window length and sliding intervals.
+
+E.g. 
+
+```java
+public class SlidingWindowBolt extends BaseWindowedBolt {
+	private OutputCollector collector;
+	
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    	this.collector = collector;
+    }
+	
+    @Override
+    public void execute(TupleWindow inputWindow) {
+	  for(Tuple tuple: inputWindow.get()) {
+	    // do the windowing computation
+		...
+	  }
+	  // emit the results
+	  collector.emit(new Values(computedValue));
+    }
+}
+
+public static void main(String[] args) {
+    TopologyBuilder builder = new TopologyBuilder();
+     builder.setSpout("spout", new RandomSentenceSpout(), 1);
+     builder.setBolt("slidingwindowbolt", 
+                     new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),
+                     1).shuffleGrouping("spout");
+    Config conf = new Config();
+    conf.setDebug(true);
+    conf.setNumWorkers(1);
+
+    StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+	
+}
+```
+
+The following window configurations are supported.
+
+```java
+withWindow(Count windowLength, Count slidingInterval)
+Tuple count based sliding window that slides after `slidingInterval` number of tuples.
+
+withWindow(Count windowLength)
+Tuple count based window that slides with every incoming tuple.
+
+withWindow(Count windowLength, Duration slidingInterval)
+Tuple count based sliding window that slides after `slidingInterval` time duration.
+
+withWindow(Duration windowLength, Duration slidingInterval)
+Time duration based sliding window that slides after `slidingInterval` time duration.
+
+withWindow(Duration windowLength)
+Time duration based window that slides with every incoming tuple.
+
+withWindow(Duration windowLength, Count slidingInterval)
+Time duration based sliding window configuration that slides after `slidingInterval` number of tuples.
+
+withTumblingWindow(BaseWindowedBolt.Count count)
+Count based tumbling window that tumbles after the specified count of tuples.
+
+withTumblingWindow(BaseWindowedBolt.Duration duration)
+Time duration based tumbling window that tumbles after the specified time duration.
+
+```
+
+## Guarentees
+The windowing functionality in storm core currently provides at-least once guarentee. The values emitted from the bolts
+`execute(TupleWindow inputWindow)` method are automatically anchored to all the tuples in the inputWindow. The downstream
+bolts are expected to ack the received tuple (i.e the tuple emitted from the windowed bolt) to complete the tuple tree. 
+If not the tuples will be replayed and the windowing computation will be re-evaluated. 
+
+The tuples in the window are automatically acked when the expire, i.e. when they fall out of the window after 
+`windowLength + slidingInterval`. Note that the configuration `topology.message.timeout.secs` should be sufficiently more 
+than `windowLength + slidingInterval` for time based windows; otherwise the tuples will timeout and get replayed and can result
+in duplicate evaluations. For count based windows, the configuration should be adjusted such that `windowLength + slidingInterval`
+tuples can be received within the timeout period.
+
+## Example topology
+An example toplogy `SlidingWindowTopology` shows how to use the apis to compute a sliding window sum and a tumbling window 
+average.
+


[5/6] storm git commit: Merge branch 'windowing' of https://github.com/arunmahadevan/storm into STORM-1167

Posted by sr...@apache.org.
Merge branch 'windowing' of https://github.com/arunmahadevan/storm into STORM-1167


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5cbad190
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5cbad190
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5cbad190

Branch: refs/heads/master
Commit: 5cbad190ebcc69faeb492cd45f7e39d584db807a
Parents: ad0c4ef 194aeca
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Nov 19 22:55:59 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Nov 19 22:55:59 2015 -0800

----------------------------------------------------------------------
 docs/documentation/Windowing.md                 | 144 +++++++++++
 .../storm/starter/SlidingWindowTopology.java    | 185 ++++++++++++++
 .../src/clj/backtype/storm/daemon/executor.clj  |   4 +
 storm-core/src/jvm/backtype/storm/Config.java   |  29 +++
 .../backtype/storm/task/OutputCollector.java    |   2 +-
 .../backtype/storm/topology/IWindowedBolt.java  |  40 +++
 .../storm/topology/TopologyBuilder.java         |  17 ++
 .../storm/topology/WindowedBoltExecutor.java    | 224 +++++++++++++++++
 .../storm/topology/base/BaseWindowedBolt.java   | 179 +++++++++++++
 .../storm/windowing/CountEvictionPolicy.java    |  68 +++++
 .../storm/windowing/CountTriggerPolicy.java     |  63 +++++
 .../src/jvm/backtype/storm/windowing/Event.java |  41 +++
 .../jvm/backtype/storm/windowing/EventImpl.java |  38 +++
 .../storm/windowing/EvictionPolicy.java         |  42 ++++
 .../storm/windowing/TimeEvictionPolicy.java     |  52 ++++
 .../storm/windowing/TimeTriggerPolicy.java      | 115 +++++++++
 .../storm/windowing/TriggerHandler.java         |  29 +++
 .../backtype/storm/windowing/TriggerPolicy.java |  42 ++++
 .../backtype/storm/windowing/TupleWindow.java   |  26 ++
 .../storm/windowing/TupleWindowImpl.java        |  61 +++++
 .../jvm/backtype/storm/windowing/Window.java    |  48 ++++
 .../windowing/WindowLifecycleListener.java      |  42 ++++
 .../backtype/storm/windowing/WindowManager.java | 212 ++++++++++++++++
 .../storm/windowing/WindowManagerTest.java      | 250 +++++++++++++++++++
 24 files changed, 1952 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5cbad190/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/5cbad190/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/5cbad190/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------


[6/6] storm git commit: Added STORM-1167 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-1167 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ab5b2f92
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ab5b2f92
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ab5b2f92

Branch: refs/heads/master
Commit: ab5b2f92cceee6ffbfeebb4181e06c1835e2edcf
Parents: 5cbad19
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Nov 19 23:04:20 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Nov 19 23:04:20 2015 -0800

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ab5b2f92/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d992da6..18a1a81 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1167: Add windowing support for storm core.
  * STORM-1215: Use Async Loggers to avoid locking  and logging overhead
  * STORM-1204: Logviewer should graceful report page-not-found instead of 500 for bad topo-id etc
  * STORM-831: Add BugTracker and Central Logging URL to UI


[2/6] storm git commit: Addressed review comments

Posted by sr...@apache.org.
Addressed review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0c2fce5b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0c2fce5b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0c2fce5b

Branch: refs/heads/master
Commit: 0c2fce5bf8fa41430ca09ce6a55afef607b031d7
Parents: 77429d0
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Thu Nov 5 15:01:38 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Thu Nov 5 19:08:48 2015 +0530

----------------------------------------------------------------------
 .../storm/starter/SlidingWindowTopology.java    | 84 ++++++++++++++++----
 .../backtype/storm/topology/IWindowedBolt.java  |  2 +-
 .../storm/topology/WindowedBoltExecutor.java    | 40 ++++++++--
 .../storm/topology/base/BaseWindowedBolt.java   | 18 ++---
 .../storm/windowing/CountEvictionPolicy.java    |  6 +-
 .../storm/windowing/CountTriggerPolicy.java     |  9 ++-
 .../src/jvm/backtype/storm/windowing/Event.java |  6 +-
 .../jvm/backtype/storm/windowing/EventImpl.java |  2 +-
 .../storm/windowing/EvictionPolicy.java         |  4 +-
 .../storm/windowing/TimeEvictionPolicy.java     | 10 +--
 .../storm/windowing/TimeTriggerPolicy.java      | 20 ++---
 .../backtype/storm/windowing/TriggerPolicy.java |  4 +-
 .../backtype/storm/windowing/WindowManager.java | 23 +++---
 .../storm/windowing/WindowManagerTest.java      | 30 ++++++-
 14 files changed, 181 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0c2fce5b/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java b/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
index e2ffbdd..beee8de 100644
--- a/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,13 +32,15 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
 import backtype.storm.windowing.TupleWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import storm.starter.bolt.PrinterBolt;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
 import static backtype.storm.topology.base.BaseWindowedBolt.Count;
-import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
 
 /**
  * A sample topology that demonstrates the usage of {@link backtype.storm.topology.IWindowedBolt}
@@ -46,11 +48,15 @@ import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
  */
 public class SlidingWindowTopology {
 
+    private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowTopology.class);
+
     /*
-     * emits random integers every 100 ms
+     * emits a random integer every 100 ms
      */
+
     private static class RandomIntegerSpout extends BaseRichSpout {
         SpoutOutputCollector collector;
+        Random rand;
 
         @Override
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
@@ -60,14 +66,13 @@ public class SlidingWindowTopology {
         @Override
         public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
             this.collector = collector;
+            this.rand = new Random();
         }
 
         @Override
         public void nextTuple() {
             Utils.sleep(100);
-            Random rand = new Random();
-            Integer value = rand.nextInt(1000);
-            collector.emit(new Values(value));
+            collector.emit(new Values(rand.nextInt(1000)));
         }
     }
 
@@ -85,11 +90,27 @@ public class SlidingWindowTopology {
 
         @Override
         public void execute(TupleWindow inputWindow) {
-            System.out.println("Events in current window: " + inputWindow.get().size());
-            for (Tuple tuple : inputWindow.getNew()) {
+            /*
+             * The inputWindow gives a view of
+             * (a) all the events in the window
+             * (b) events that expired since last activation of the window
+             * (c) events that newly arrived since last activation of the window
+             */
+            List<Tuple> tuplesInWindow = inputWindow.get();
+            List<Tuple> newTuples = inputWindow.getNew();
+            List<Tuple> expiredTuples = inputWindow.getExpired();
+
+            LOG.debug("Events in current window: " + tuplesInWindow.size());
+            /*
+             * Instead of iterating over all the tuples in the window to compute
+             * the sum, the values for the new events are added and old events are
+             * subtracted. Similar optimizations might be possible in other
+             * windowing computations.
+             */
+            for (Tuple tuple : newTuples) {
                 sum += (int) tuple.getValue(0);
             }
-            for (Tuple tuple : inputWindow.getExpired()) {
+            for (Tuple tuple : expiredTuples) {
                 sum -= (int) tuple.getValue(0);
             }
             collector.emit(new Values(sum));
@@ -99,17 +120,52 @@ public class SlidingWindowTopology {
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
             declarer.declare(new Fields("sum"));
         }
+    }
 
+
+    /*
+     * Computes tumbling window average
+     */
+    private static class TumblingWindowAvgBolt extends BaseWindowedBolt {
+        private OutputCollector collector;
+
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(TupleWindow inputWindow) {
+            int sum = 0;
+            List<Tuple> tuplesInWindow = inputWindow.get();
+            LOG.debug("Events in current window: " + tuplesInWindow.size());
+            if (tuplesInWindow.size() > 0) {
+                /*
+                * Since this is a tumbling window calculation,
+                * we use all the tuples in the window to compute the avg.
+                */
+                for (Tuple tuple : tuplesInWindow) {
+                    sum += (int) tuple.getValue(0);
+                }
+                collector.emit(new Values(sum / tuplesInWindow.size()));
+            }
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("avg"));
+        }
     }
 
 
     public static void main(String[] args) throws Exception {
         TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout("integer", new RandomIntegerSpout(), 1);
-        builder.setBolt("window", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)),
-                        //new SlidingWindowSumBolt().withTumblingWindow(new Duration(20, TimeUnit.SECONDS))
-                        1).shuffleGrouping("integer");
-        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("window");
+        builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)), 1)
+                .shuffleGrouping("integer");
+        builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(new Count(3)), 1)
+                .shuffleGrouping("slidingsum");
+        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
         Config conf = new Config();
         conf.setDebug(true);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2fce5b/storm-core/src/jvm/backtype/storm/topology/IWindowedBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/IWindowedBolt.java b/storm-core/src/jvm/backtype/storm/topology/IWindowedBolt.java
index 620e7d9..8363717 100644
--- a/storm-core/src/jvm/backtype/storm/topology/IWindowedBolt.java
+++ b/storm-core/src/jvm/backtype/storm/topology/IWindowedBolt.java
@@ -29,7 +29,7 @@ import java.util.Map;
 public interface IWindowedBolt extends IComponent {
     /**
      * This is similar to the {@link backtype.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector)} except
-     * that while emitting the tuples are are automatically anchored to the tuples in the inputWindow.
+     * that while emitting, the tuples are automatically anchored to the tuples in the inputWindow.
      */
     void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2fce5b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
index cc6529f..c7d6f70 100644
--- a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -51,19 +51,27 @@ public class WindowedBoltExecutor implements IRichBolt {
     }
 
     private int getTopologyTimeoutMillis(Map stormConf) {
-        if (stormConf.containsKey(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
+        if (stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS) != null) {
             boolean timeOutsEnabled = (boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
             if (!timeOutsEnabled) {
                 return Integer.MAX_VALUE;
             }
         }
         int timeout = 0;
-        if (stormConf.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
+        if (stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS) != null) {
             timeout = ((Number) stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
         }
         return timeout * 1000;
     }
 
+    private int getMaxSpoutPending(Map stormConf) {
+        int maxPending = Integer.MAX_VALUE;
+        if (stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING) != null) {
+            maxPending = ((Number) stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)).intValue();
+        }
+        return maxPending;
+    }
+
     private void ensureDurationLessThanTimeout(int duration, int timeout) {
         if (duration > timeout) {
             throw new IllegalArgumentException("Window duration (length + sliding interval) value " + duration +
@@ -72,15 +80,33 @@ public class WindowedBoltExecutor implements IRichBolt {
         }
     }
 
-    // TODO: add more validation
+    private void ensureCountLessThanMaxPending(int count, int maxPending) {
+        if (count > maxPending) {
+            throw new IllegalArgumentException("Window count (length + sliding interval) value " + count +
+                                                       " is more than " + Config.TOPOLOGY_MAX_SPOUT_PENDING +
+                                                       " value " + maxPending);
+        }
+    }
+
     private void validate(Map stormConf, Count windowLengthCount, Duration windowLengthDuration,
                           Count slidingIntervalCount, Duration slidingIntervalDuration) {
 
         int topologyTimeout = getTopologyTimeoutMillis(stormConf);
+        int maxSpoutPending = getMaxSpoutPending(stormConf);
         if (windowLengthDuration != null && slidingIntervalDuration != null) {
             ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout);
         } else if (windowLengthDuration != null) {
             ensureDurationLessThanTimeout(windowLengthDuration.value, topologyTimeout);
+        } else if (slidingIntervalDuration != null) {
+            ensureDurationLessThanTimeout(slidingIntervalDuration.value, topologyTimeout);
+        }
+
+        if (windowLengthCount != null && slidingIntervalCount != null) {
+            ensureCountLessThanMaxPending(windowLengthCount.value + slidingIntervalCount.value, maxSpoutPending);
+        } else if (windowLengthCount != null) {
+            ensureCountLessThanMaxPending(windowLengthCount.value, maxSpoutPending);
+        } else if (slidingIntervalCount != null) {
+            ensureCountLessThanMaxPending(slidingIntervalCount.value, maxSpoutPending);
         }
     }
 
@@ -128,7 +154,7 @@ public class WindowedBoltExecutor implements IRichBolt {
         bolt.prepare(stormConf, context, windowedOutputCollector);
         this.listener = newWindowLifecycleListener();
         this.windowManager = initWindowManager(listener, stormConf);
-        LOG.info("Initialized window manager {} ", this.windowManager);
+        LOG.debug("Initialized window manager {} ", this.windowManager);
     }
 
     @Override
@@ -165,8 +191,6 @@ public class WindowedBoltExecutor implements IRichBolt {
             public void onActivation(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples) {
                 windowedOutputCollector.setContext(tuples);
                 bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples));
-                newTuples.clear();
-                expiredTuples.clear();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2fce5b/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java b/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
index abbfa26..5b20f5a 100644
--- a/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
+++ b/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -30,10 +30,10 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-public class BaseWindowedBolt implements IWindowedBolt {
+public abstract class BaseWindowedBolt implements IWindowedBolt {
     private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class);
 
-    private transient Map<String, Object> windowConfiguration;
+    private final transient Map<String, Object> windowConfiguration;
 
     /**
      * Holds a count value for count based windows and sliding intervals.
@@ -160,21 +160,17 @@ public class BaseWindowedBolt implements IWindowedBolt {
 
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-    }
-
-    @Override
-    public void execute(TupleWindow inputWindow) {
-
+        // NOOP
     }
 
     @Override
     public void cleanup() {
-
+        // NOOP
     }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
+        // NOOP
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2fce5b/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
index 97d4332..a6779a8 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class CountEvictionPolicy<T> implements EvictionPolicy<T> {
     private final int threshold;
-    private AtomicInteger currentCount;
+    private final AtomicInteger currentCount;
 
     public CountEvictionPolicy(int count) {
         this.threshold = count;

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2fce5b/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
index 022eb9e..3b1bf9f 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,8 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class CountTriggerPolicy<T> implements TriggerPolicy<T> {
     private final int count;
-    private AtomicInteger currentCount;
-    private TriggerHandler handler;
+    private final AtomicInteger currentCount;
+    private final TriggerHandler handler;
 
     public CountTriggerPolicy(int count, TriggerHandler handler) {
         this.count = count;
@@ -50,6 +50,7 @@ public class CountTriggerPolicy<T> implements TriggerPolicy<T> {
 
     @Override
     public void shutdown() {
+        // NOOP
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2fce5b/storm-core/src/jvm/backtype/storm/windowing/Event.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/Event.java b/storm-core/src/jvm/backtype/storm/windowing/Event.java
index e33ee71..4855701 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/Event.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/Event.java
@@ -25,12 +25,12 @@ package backtype.storm.windowing;
 interface Event<T> {
     /**
      * The event timestamp in millis. This could be the time
-     * when the source generated the tuple or if not the time
+     * when the source generated the tuple or the time
      * when the tuple was received by a bolt.
      *
-     * @return
+     * @return the event timestamp in milliseconds.
      */
-    long getTs();
+    long getTimestamp();
 
     /**
      * Returns the wrapped object, E.g. a tuple

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2fce5b/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java b/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
index 1d43319..09c974c 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
@@ -27,7 +27,7 @@ class EventImpl<T> implements Event<T> {
     }
 
     @Override
-    public long getTs() {
+    public long getTimestamp() {
         return ts;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2fce5b/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
index fcf1e53..8458d48 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2fce5b/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
index 4cda2d3..16408f3 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,8 +17,6 @@
  */
 package backtype.storm.windowing;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * Eviction policy that evicts events based on time duration.
  */
@@ -37,12 +35,12 @@ public class TimeEvictionPolicy<T> implements EvictionPolicy<T> {
      */
     @Override
     public boolean evict(Event<T> event) {
-        return (System.currentTimeMillis() - event.getTs()) >= duration;
+        return (System.currentTimeMillis() - event.getTimestamp()) >= duration;
     }
 
     @Override
     public void track(Event<T> event) {
-        // no-op
+        // NOOP
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2fce5b/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
index 9e8612a..0d24c92 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
@@ -26,34 +26,35 @@ import java.util.concurrent.TimeUnit;
  */
 public class TimeTriggerPolicy<T> implements TriggerPolicy<T> {
     private long duration;
-    private TriggerHandler handler;
-    private ScheduledExecutorService ex;
+    private final TriggerHandler handler;
+    private final ScheduledExecutorService executor;
 
     public TimeTriggerPolicy(long millis, TriggerHandler handler) {
         this.duration = millis;
         this.handler = handler;
+        this.executor = Executors.newSingleThreadScheduledExecutor();
         start();
     }
 
     @Override
     public void track(Event<T> event) {
-        // no-op
+        // NOOP
     }
 
     @Override
     public void reset() {
-        // no-op
+        // NOOP
     }
 
     @Override
     public void shutdown() {
-        ex.shutdown();
+        executor.shutdown();
         try {
-            if (!ex.awaitTermination(2, TimeUnit.SECONDS)) {
-                ex.shutdownNow();
+            if (!executor.awaitTermination(2, TimeUnit.SECONDS)) {
+                executor.shutdownNow();
             }
         } catch (InterruptedException ie) {
-            ex.shutdownNow();
+            executor.shutdownNow();
             Thread.currentThread().interrupt();
         }
     }
@@ -65,8 +66,7 @@ public class TimeTriggerPolicy<T> implements TriggerPolicy<T> {
                 handler.onTrigger();
             }
         };
-        ex = Executors.newSingleThreadScheduledExecutor();
-        ex.scheduleAtFixedRate(task, duration, duration, TimeUnit.MILLISECONDS);
+        executor.scheduleAtFixedRate(task, duration, duration, TimeUnit.MILLISECONDS);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2fce5b/storm-core/src/jvm/backtype/storm/windowing/TriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/TriggerPolicy.java
index 99eaaad..bc3e777 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/TriggerPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/TriggerPolicy.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2fce5b/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java b/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
index bf66c92..38127c8 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -110,7 +111,7 @@ public class WindowManager<T> implements TriggerHandler {
      */
     @Override
     public void onTrigger() {
-        List<Event<T>> windowEvents = new ArrayList<>();
+        List<Event<T>> windowEvents = null;
         List<T> expired = null;
         try {
             lock.lock();
@@ -118,7 +119,7 @@ public class WindowManager<T> implements TriggerHandler {
              * scan the entire window to handle out of order events in
              * the case of time based windows.
              */
-            expireEvents(true, windowEvents);
+            windowEvents = expireEvents(true);
             expired = new ArrayList<>(expiredEvents);
             expiredEvents.clear();
         } finally {
@@ -153,7 +154,7 @@ public class WindowManager<T> implements TriggerHandler {
      */
     private void compactWindow() {
         if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) {
-            expireEvents(false, null);
+            expireEvents(false);
         }
     }
 
@@ -170,13 +171,14 @@ public class WindowManager<T> implements TriggerHandler {
      * Expire events from the window, using the expiration policy to check
      * if the event should be evicted or not.
      *
-     * @param fullScan  if set, will scan the entire window. if not set, will stop
-     *                  as soon as an event not satisfying the expiration policy is found.
-     * @param remaining the list of remaining events in the window after expiry.
+     * @param fullScan if set, will scan the entire window; if not set, will stop
+     *                 as soon as an event not satisfying the expiration policy is found
+     * @return the list of remaining events in the window after expiry
      */
-    private void expireEvents(boolean fullScan, List<Event<T>> remaining) {
+    private List<Event<T>> expireEvents(boolean fullScan) {
         LOG.debug("Expire events, eviction policy {}", evictionPolicy);
         List<T> eventsToExpire = new ArrayList<>();
+        List<Event<T>> remaining = new ArrayList<>();
         try {
             lock.lock();
             Iterator<Event<T>> it = window.iterator();
@@ -187,7 +189,7 @@ public class WindowManager<T> implements TriggerHandler {
                     it.remove();
                 } else if (!fullScan) {
                     break;
-                } else if (remaining != null) {
+                } else {
                     remaining.add(windowEvent);
                 }
             }
@@ -198,6 +200,7 @@ public class WindowManager<T> implements TriggerHandler {
         eventsSinceLastExpiry.set(0);
         LOG.debug("[{}] events expired from window.", eventsToExpire.size());
         windowLifecycleListener.onExpiry(eventsToExpire);
+        return remaining;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2fce5b/storm-core/test/jvm/backtype/storm/windowing/WindowManagerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/windowing/WindowManagerTest.java b/storm-core/test/jvm/backtype/storm/windowing/WindowManagerTest.java
index 9792ab3..6531817 100644
--- a/storm-core/test/jvm/backtype/storm/windowing/WindowManagerTest.java
+++ b/storm-core/test/jvm/backtype/storm/windowing/WindowManagerTest.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -210,6 +210,32 @@ public class WindowManagerTest {
 
     }
 
+    @Test
+    public void testTumblingWindow() throws Exception {
+        windowManager.setWindowLength(new Count(3));
+        windowManager.setSlidingInterval(new Count(3));
+        windowManager.add(1);
+        windowManager.add(2);
+        // nothing expired yet
+        assertTrue(listener.onExpiryEvents.isEmpty());
+        windowManager.add(3);
+        assertTrue(listener.onExpiryEvents.isEmpty());
+        assertEquals(seq(1, 3), listener.onActivationEvents);
+        assertTrue(listener.onActivationExpiredEvents.isEmpty());
+        assertEquals(seq(1, 3), listener.onActivationNewEvents);
+
+        listener.clear();
+        windowManager.add(4);
+        windowManager.add(5);
+        windowManager.add(6);
+
+        assertEquals(seq(1, 3), listener.onExpiryEvents);
+        assertEquals(seq(4, 6), listener.onActivationEvents);
+        assertEquals(seq(1, 3), listener.onActivationExpiredEvents);
+        assertEquals(seq(4, 6), listener.onActivationNewEvents);
+
+    }
+
     private List<Integer> seq(int start) {
         return seq(start, start);
     }


[3/6] storm git commit: Addressed review comments

Posted by sr...@apache.org.
Addressed review comments

1. Handle uncaught exceptions thrown by onTrigger in TimeTriggerPolicy and fail fast
2. Address other review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eb129410
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eb129410
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eb129410

Branch: refs/heads/master
Commit: eb129410dbd12ccdbc213e258312b23cc165557e
Parents: 0c2fce5
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Fri Nov 6 10:34:42 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Fri Nov 6 10:34:42 2015 +0530

----------------------------------------------------------------------
 .../storm/topology/base/BaseWindowedBolt.java   |  1 -
 .../storm/windowing/EvictionPolicy.java         |  9 ++-
 .../storm/windowing/TimeTriggerPolicy.java      | 63 ++++++++++++++++----
 .../backtype/storm/windowing/TriggerPolicy.java |  2 +
 .../backtype/storm/windowing/WindowManager.java | 13 ++--
 5 files changed, 65 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/eb129410/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java b/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
index 5b20f5a..fd4af90 100644
--- a/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
+++ b/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
@@ -130,7 +130,6 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
         return withWindowLength(windowLength).withSlidingInterval(new Count(1));
     }
 
-
     /**
      * A time duration based window that slides with every incoming tuple.
      *

http://git-wip-us.apache.org/repos/asf/storm/blob/eb129410/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
index 8458d48..8820e92 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
@@ -25,13 +25,18 @@ package backtype.storm.windowing;
  */
 public interface EvictionPolicy<T> {
     /**
-     * returns true if the event should be evicted from the window.
+     * Decides if an event should be evicted from the window or not.
+     *
+     * @param event the input event
+     * @return true if the event should be evicted, false otherwise
      */
     boolean evict(Event<T> event);
 
     /**
      * Tracks the event to later decide whether
-     * {@link EvictionPolicy#evict(Event)} should evict the event.
+     * {@link EvictionPolicy#evict(Event)} should evict it or not.
+     *
+     * @param event the input event to be tracked
      */
     void track(Event<T> event);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/eb129410/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
index 0d24c92..db1fbeb 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
@@ -17,33 +17,42 @@
  */
 package backtype.storm.windowing;
 
+import backtype.storm.topology.FailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
  * Invokes {@link TriggerHandler#onTrigger()} after the duration.
  */
 public class TimeTriggerPolicy<T> implements TriggerPolicy<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(TimeTriggerPolicy.class);
+
     private long duration;
     private final TriggerHandler handler;
     private final ScheduledExecutorService executor;
+    private final ScheduledFuture<?> executorFuture;
 
     public TimeTriggerPolicy(long millis, TriggerHandler handler) {
         this.duration = millis;
         this.handler = handler;
         this.executor = Executors.newSingleThreadScheduledExecutor();
-        start();
+        this.executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
     }
 
     @Override
     public void track(Event<T> event) {
-        // NOOP
+        checkFailures();
     }
 
     @Override
     public void reset() {
-        // NOOP
+        checkFailures();
     }
 
     @Override
@@ -59,20 +68,48 @@ public class TimeTriggerPolicy<T> implements TriggerPolicy<T> {
         }
     }
 
-    private void start() {
-        Runnable task = new Runnable() {
-            @Override
-            public void run() {
-                handler.onTrigger();
-            }
-        };
-        executor.scheduleAtFixedRate(task, duration, duration, TimeUnit.MILLISECONDS);
-    }
-
     @Override
     public String toString() {
         return "TimeTriggerPolicy{" +
                 "duration=" + duration +
                 '}';
     }
+
+    /*
+    * Check for uncaught exceptions during the execution
+    * of the trigger and fail fast.
+    * The uncaught exceptions will be wrapped in
+    * ExecutionException and thrown when future.get() is invoked.
+    */
+    private void checkFailures() {
+        if (executorFuture.isDone()) {
+            try {
+                executorFuture.get();
+            } catch (InterruptedException ex) {
+                LOG.error("Got exception ", ex);
+                throw new FailedException(ex);
+            } catch (ExecutionException ex) {
+                LOG.error("Got exception ", ex);
+                throw new FailedException(ex.getCause());
+            }
+        }
+    }
+
+    private Runnable newTriggerTask() {
+        return new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    handler.onTrigger();
+                } catch (Throwable th) {
+                    LOG.error("handler.onTrigger failed ", th);
+                    /*
+                     * propagate it so that task gets canceled and the exception
+                     * can be retrieved from executorFuture.get()
+                     */
+                    throw th;
+                }
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/eb129410/storm-core/src/jvm/backtype/storm/windowing/TriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/TriggerPolicy.java
index bc3e777..d27af76 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/TriggerPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/TriggerPolicy.java
@@ -25,6 +25,8 @@ package backtype.storm.windowing;
 public interface TriggerPolicy<T> {
     /**
      * Tracks the event and could use this to invoke the trigger.
+     *
+     * @param event the input event
      */
     void track(Event<T> event);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/eb129410/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java b/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
index 38127c8..6603abf 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
@@ -17,7 +17,6 @@
  */
 package backtype.storm.windowing;
 
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,14 +48,14 @@ public class WindowManager<T> implements TriggerHandler {
      */
     public static final int EXPIRE_EVENTS_THRESHOLD = 100;
 
-    private WindowLifecycleListener<T> windowLifecycleListener;
-    private ConcurrentLinkedQueue<Event<T>> window;
+    private final WindowLifecycleListener<T> windowLifecycleListener;
+    private final ConcurrentLinkedQueue<Event<T>> window;
+    private final List<T> expiredEvents;
+    private final Set<Event<T>> prevWindowEvents;
+    private final AtomicInteger eventsSinceLastExpiry;
+    private final ReentrantLock lock;
     private EvictionPolicy<T> evictionPolicy;
     private TriggerPolicy<T> triggerPolicy;
-    private List<T> expiredEvents;
-    private Set<Event<T>> prevWindowEvents;
-    private AtomicInteger eventsSinceLastExpiry;
-    private ReentrantLock lock;
 
     public WindowManager(WindowLifecycleListener<T> lifecycleListener) {
         windowLifecycleListener = lifecycleListener;