You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/21 05:06:50 UTC
[2/7] storm git commit: STORM-1187 Support windowing based on tuple ts
STORM-1187 Support windowing based on tuple ts
Support for doing window calculations based to tuple timestamps and handle
out of order events based on time lag.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/06d4bb6e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/06d4bb6e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/06d4bb6e
Branch: refs/heads/master
Commit: 06d4bb6ef10a11e19e1d718a76d7062ef5f69834
Parents: a8d253a
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Fri Nov 13 18:06:56 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Mon Dec 7 01:02:56 2015 +0530
----------------------------------------------------------------------
docs/documentation/Windowing.md | 90 ++++++
.../storm/starter/SlidingTupleTsTopology.java | 62 ++++
.../storm/starter/SlidingWindowTopology.java | 81 +-----
.../starter/bolt/SlidingWindowSumBolt.java | 80 ++++++
.../storm/starter/spout/RandomIntegerSpout.java | 55 ++++
.../src/clj/backtype/storm/daemon/executor.clj | 2 +
storm-core/src/jvm/backtype/storm/Config.java | 30 +-
.../storm/topology/WindowedBoltExecutor.java | 106 ++++++-
.../storm/topology/base/BaseWindowedBolt.java | 33 +++
.../storm/windowing/CountEvictionPolicy.java | 17 +-
.../storm/windowing/CountTriggerPolicy.java | 11 +-
.../src/jvm/backtype/storm/windowing/Event.java | 8 +
.../jvm/backtype/storm/windowing/EventImpl.java | 13 +
.../storm/windowing/EvictionPolicy.java | 38 ++-
.../storm/windowing/TimeEvictionPolicy.java | 39 ++-
.../storm/windowing/TimeTriggerPolicy.java | 13 +
.../storm/windowing/TriggerHandler.java | 6 +-
.../storm/windowing/WaterMarkEvent.java | 38 +++
.../windowing/WaterMarkEventGenerator.java | 110 ++++++++
.../windowing/WatermarkCountEvictionPolicy.java | 65 +++++
.../windowing/WatermarkCountTriggerPolicy.java | 83 ++++++
.../windowing/WatermarkTimeEvictionPolicy.java | 77 +++++
.../windowing/WatermarkTimeTriggerPolicy.java | 109 ++++++++
.../backtype/storm/windowing/WindowManager.java | 153 +++++++---
.../topology/WindowedBoltExecutorTest.java | 139 +++++++++
.../windowing/WaterMarkEventGeneratorTest.java | 99 +++++++
.../storm/windowing/WindowManagerTest.java | 280 +++++++++++++++++--
27 files changed, 1664 insertions(+), 173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/docs/documentation/Windowing.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Windowing.md b/docs/documentation/Windowing.md
index 8f9d758..54e5e2d 100644
--- a/docs/documentation/Windowing.md
+++ b/docs/documentation/Windowing.md
@@ -126,6 +126,96 @@ Time duration based tumbling window that tumbles after the specified time durati
```
+## Tuple timestamp and out of order tuples
+By default the timestamp tracked in the window is the time when the tuple is processed by the bolt. The window calculations
+are performed based on the processing timestamp. Storm has support for tracking windows based on the source generated timestamp.
+
+```java
+/**
+* Specify a field in the tuple that represents the timestamp as a long value. If this
+* field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
+*
+* @param fieldName the name of the field that contains the timestamp
+*/
+public BaseWindowedBolt withTimestampField(String fieldName)
+```
+
+The value for the above `fieldName` will be looked up from the incoming tuple and considered for windowing calculations.
+If the field is not present in the tuple an exception will be thrown. Along with the timestamp field name, a time lag parameter
+can also be specified which indicates the max time limit for tuples with out of order timestamps.
+
+E.g. If the lag is 5 secs and a tuple `t1` arrived with timestamp `06:00:05` no tuples may arrive with tuple timestamp earlier than `06:00:00`. If a tuple
+arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`, it will be treated as a late tuple and not processed.
+
+```java
+/**
+* Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
+* cannot be out of order by more than this amount.
+*
+* @param duration the max lag duration
+*/
+public BaseWindowedBolt withLag(Duration duration)
+```
+
+### Watermarks
+For processing tuples with timestamp field, storm internally computes watermarks based on the incoming tuple timestamp. Watermark is
+the minimum of the latest tuple timestamps (minus the lag) across all the input streams. At a higher level this is similar to the watermark concept
+used by Flink and Google's MillWheel for tracking event based timestamps.
+
+Periodically (default every sec), the watermark timestamps are emitted and this is considered as the clock tick for the window calculation if
+tuple based timestamps are in use. The interval at which watermarks are emitted can be changed with the below api.
+
+```java
+/**
+* Specify the watermark event generation interval. For tuple based timestamps, watermark events
+* are used to track the progress of time
+*
+* @param interval the interval at which watermark events are generated
+*/
+public BaseWindowedBolt withWatermarkInterval(Duration interval)
+```
+
+
+When a watermark is received, all windows up to that timestamp will be evaluated.
+
+For example, consider tuple timestamp based processing with following window parameters,
+
+`Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s`
+
+```
+|-----|-----|-----|-----|-----|-----|-----|
+0 10 20 30 40 50 60 70
+````
+
+Current ts = `09:00:00`
+
+Tuples `e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36)` are received between `9:00:00` and `9:00:01`
+
+At time t = `09:00:01`, watermark w1 = `6:00:31` is emitted since no tuples earlier than `6:00:31` can arrive.
+
+Three windows will be evaluated. The first window end ts (06:00:10) is computed by taking the earliest event timestamp (06:00:03)
+and computing the ceiling based on the sliding interval (10s).
+
+1. `5:59:50 - 06:00:10` with tuples e1, e2, e3
+2. `6:00:00 - 06:00:20` with tuples e1, e2, e3, e4
+3. `6:00:10 - 06:00:30` with tuples e4, e5
+
+e6 is not evaluated since watermark timestamp `6:00:31` is older than the tuple ts `6:00:36`.
+
+Tuples `e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)` are received between `9:00:01` and `9:00:02`
+
+At time t = `09:00:02` another watermark w2 = `08:00:34` is emitted since no tuples earlier than `8:00:34` can arrive now.
+
+Three windows will be evaluated,
+
+1. `6:00:20 - 06:00:40` with tuples e5, e6 (from earlier batch)
+2. `6:00:30 - 06:00:50` with tuple e6 (from earlier batch)
+3. `8:00:10 - 08:00:30` with tuples e7, e8, e9
+
+e10 is not evaluated since the tuple ts `8:00:39` is beyond the watermark time `8:00:34`.
+
+The window calculation considers the time gaps and computes the windows based on the tuple timestamp.
+
## Guarentees
The windowing functionality in storm core currently provides at-least once guarentee. The values emitted from the bolts
`execute(TupleWindow inputWindow)` method are automatically anchored to all the tuples in the inputWindow. The downstream
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java b/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java
new file mode 100644
index 0000000..598335d
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseWindowedBolt;
+import backtype.storm.utils.Utils;
+import storm.starter.bolt.PrinterBolt;
+import storm.starter.bolt.SlidingWindowSumBolt;
+import storm.starter.spout.RandomIntegerSpout;
+
+import java.util.concurrent.TimeUnit;
+
+import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
+
+/**
+ * Windowing based on tuple timestamp (e.g. the time when tuple is generated
+ * rather than when its processed).
+ */
+public class SlidingTupleTsTopology {
+ public static void main(String[] args) throws Exception {
+ TopologyBuilder builder = new TopologyBuilder();
+ BaseWindowedBolt bolt = new SlidingWindowSumBolt()
+ .withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS))
+ .withTimestampField("ts")
+ .withLag(new Duration(5, TimeUnit.SECONDS));
+ builder.setSpout("integer", new RandomIntegerSpout(), 1);
+ builder.setBolt("slidingsum", bolt, 1).shuffleGrouping("integer");
+ builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingsum");
+ Config conf = new Config();
+ conf.setDebug(true);
+
+ if (args != null && args.length > 0) {
+ conf.setNumWorkers(1);
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+ } else {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", conf, builder.createTopology());
+ Utils.sleep(40000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java b/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
index beee8de..5031f8d 100644
--- a/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
@@ -20,12 +20,10 @@ package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
-import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.topology.base.BaseWindowedBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
@@ -35,10 +33,11 @@ import backtype.storm.windowing.TupleWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.starter.bolt.PrinterBolt;
+import storm.starter.bolt.SlidingWindowSumBolt;
+import storm.starter.spout.RandomIntegerSpout;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import static backtype.storm.topology.base.BaseWindowedBolt.Count;
@@ -51,79 +50,6 @@ public class SlidingWindowTopology {
private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowTopology.class);
/*
- * emits a random integer every 100 ms
- */
-
- private static class RandomIntegerSpout extends BaseRichSpout {
- SpoutOutputCollector collector;
- Random rand;
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("value"));
- }
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- this.collector = collector;
- this.rand = new Random();
- }
-
- @Override
- public void nextTuple() {
- Utils.sleep(100);
- collector.emit(new Values(rand.nextInt(1000)));
- }
- }
-
- /*
- * Computes sliding window sum
- */
- private static class SlidingWindowSumBolt extends BaseWindowedBolt {
- private int sum = 0;
- private OutputCollector collector;
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(TupleWindow inputWindow) {
- /*
- * The inputWindow gives a view of
- * (a) all the events in the window
- * (b) events that expired since last activation of the window
- * (c) events that newly arrived since last activation of the window
- */
- List<Tuple> tuplesInWindow = inputWindow.get();
- List<Tuple> newTuples = inputWindow.getNew();
- List<Tuple> expiredTuples = inputWindow.getExpired();
-
- LOG.debug("Events in current window: " + tuplesInWindow.size());
- /*
- * Instead of iterating over all the tuples in the window to compute
- * the sum, the values for the new events are added and old events are
- * subtracted. Similar optimizations might be possible in other
- * windowing computations.
- */
- for (Tuple tuple : newTuples) {
- sum += (int) tuple.getValue(0);
- }
- for (Tuple tuple : expiredTuples) {
- sum -= (int) tuple.getValue(0);
- }
- collector.emit(new Values(sum));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("sum"));
- }
- }
-
-
- /*
* Computes tumbling window average
*/
private static class TumblingWindowAvgBolt extends BaseWindowedBolt {
@@ -168,13 +94,10 @@ public class SlidingWindowTopology {
builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
Config conf = new Config();
conf.setDebug(true);
-
if (args != null && args.length > 0) {
conf.setNumWorkers(1);
-
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
-
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(40000);
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java
new file mode 100644
index 0000000..ef3a0b8
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseWindowedBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.windowing.TupleWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Computes sliding window sum
+ */
+public class SlidingWindowSumBolt extends BaseWindowedBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowSumBolt.class);
+
+ private int sum = 0;
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(TupleWindow inputWindow) {
+ /*
+ * The inputWindow gives a view of
+ * (a) all the events in the window
+ * (b) events that expired since last activation of the window
+ * (c) events that newly arrived since last activation of the window
+ */
+ List<Tuple> tuplesInWindow = inputWindow.get();
+ List<Tuple> newTuples = inputWindow.getNew();
+ List<Tuple> expiredTuples = inputWindow.getExpired();
+
+ LOG.debug("Events in current window: " + tuplesInWindow.size());
+ /*
+ * Instead of iterating over all the tuples in the window to compute
+ * the sum, the values for the new events are added and old events are
+ * subtracted. Similar optimizations might be possible in other
+ * windowing computations.
+ */
+ for (Tuple tuple : newTuples) {
+ sum += (int) tuple.getValue(0);
+ }
+ for (Tuple tuple : expiredTuples) {
+ sum -= (int) tuple.getValue(0);
+ }
+ collector.emit(new Values(sum));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("sum"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java b/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java
new file mode 100644
index 0000000..5778c8e
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter.spout;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Emits a random integer and a timestamp value (offset by one day),
+ * every 100 ms. The ts field can be used in tuple time based windowing.
+ */
+public class RandomIntegerSpout extends BaseRichSpout {
+ private SpoutOutputCollector collector;
+ private Random rand;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("value", "ts"));
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.collector = collector;
+ this.rand = new Random();
+ }
+
+ @Override
+ public void nextTuple() {
+ Utils.sleep(100);
+ collector.emit(new Values(rand.nextInt(1000), System.currentTimeMillis() - (24 * 60 * 60 * 1000)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 2aeb5e7..58b56cf 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -171,6 +171,8 @@
TOPOLOGY-BOLTS-WINDOW-LENGTH-DURATION-MS
TOPOLOGY-BOLTS-SLIDING-INTERVAL-COUNT
TOPOLOGY-BOLTS-SLIDING-INTERVAL-DURATION-MS
+ TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-FIELD-NAME
+ TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-MAX-LAG-MS
)
spec-conf (-> general-context
(.getComponentCommon component-id)
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index a1da8fe..4f86786 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1526,19 +1526,45 @@ public class Config extends HashMap<String, Object> {
public static final String TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS = "topology.bolts.window.length.duration.ms";
/*
- * Bolt-specific configuration for windowed bolts to specifiy the sliding interval as a count of number of tuples.
+ * Bolt-specific configuration for windowed bolts to specify the sliding interval as a count of number of tuples.
*/
@isInteger
@isPositiveNumber
public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT = "topology.bolts.window.sliding.interval.count";
/*
- * Bolt-specific configuration for windowed bolts to specifiy the sliding interval in time duration.
+ * Bolt-specific configuration for windowed bolts to specify the sliding interval in time duration.
*/
@isInteger
@isPositiveNumber
public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS = "topology.bolts.window.sliding.interval.duration.ms";
+ /*
+ * Bolt-specific configuration for windowed bolts to specify the name of the field in the tuple that holds
+ * the timestamp (e.g. the ts when the tuple was actually generated). If this config is specified and the
+ * field is not present in the incoming tuple, a java.lang.IllegalArgumentException will be thrown.
+ */
+ @isString
+ public static final String TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME = "topology.bolts.tuple.timestamp.field.name";
+
+ /*
+ * Bolt-specific configuration for windowed bolts to specify the maximum time lag of the tuple timestamp
+ * in milliseconds. It means that the tuple timestamps cannot be out of order by more than this amount.
+ * This config will be effective only if the TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME is also specified.
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS = "topology.bolts.tuple.timestamp.max.lag.ms";
+
+ /*
+ * Bolt-specific configuration for windowed bolts to specify the time interval for generating
+ * watermark events. Watermark event tracks the progress of time when tuple timestamp is used.
+ * This config is effective only if TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME is also specified.
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS = "topology.bolts.watermark.event.interval.ms";
+
/**
* This config is available for TransactionalSpouts, and contains the id ( a String) for
* the transactional topology. This id is used to store the state of the transactional
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
index c7d6f70..ff2278e 100644
--- a/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java
@@ -22,7 +22,18 @@ import backtype.storm.task.IOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
+import backtype.storm.windowing.CountEvictionPolicy;
+import backtype.storm.windowing.CountTriggerPolicy;
+import backtype.storm.windowing.EvictionPolicy;
+import backtype.storm.windowing.TimeEvictionPolicy;
+import backtype.storm.windowing.TimeTriggerPolicy;
+import backtype.storm.windowing.TriggerPolicy;
import backtype.storm.windowing.TupleWindowImpl;
+import backtype.storm.windowing.WaterMarkEventGenerator;
+import backtype.storm.windowing.WatermarkCountEvictionPolicy;
+import backtype.storm.windowing.WatermarkCountTriggerPolicy;
+import backtype.storm.windowing.WatermarkTimeEvictionPolicy;
+import backtype.storm.windowing.WatermarkTimeTriggerPolicy;
import backtype.storm.windowing.WindowLifecycleListener;
import backtype.storm.windowing.WindowManager;
import org.slf4j.Logger;
@@ -40,11 +51,15 @@ import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
*/
public class WindowedBoltExecutor implements IRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
-
- private IWindowedBolt bolt;
+ private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
+ private static final int DEFAULT_MAX_LAG_MS = 0; // no lag
+ private final IWindowedBolt bolt;
private transient WindowedOutputCollector windowedOutputCollector;
private transient WindowLifecycleListener<Tuple> listener;
private transient WindowManager<Tuple> windowManager;
+ private transient int maxLagMs;
+ private transient String tupleTsFieldName;
+ private transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
public WindowedBoltExecutor(IWindowedBolt bolt) {
this.bolt = bolt;
@@ -93,6 +108,10 @@ public class WindowedBoltExecutor implements IRichBolt {
int topologyTimeout = getTopologyTimeoutMillis(stormConf);
int maxSpoutPending = getMaxSpoutPending(stormConf);
+ if (windowLengthCount == null && windowLengthDuration == null) {
+ throw new IllegalArgumentException("Window length is not specified");
+ }
+
if (windowLengthDuration != null && slidingIntervalDuration != null) {
ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout);
} else if (windowLengthDuration != null) {
@@ -110,12 +129,14 @@ public class WindowedBoltExecutor implements IRichBolt {
}
}
- private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf) {
+ private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf,
+ TopologyContext context) {
WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener);
Duration windowLengthDuration = null;
Count windowLengthCount = null;
Duration slidingIntervalDuration = null;
Count slidingIntervalCount = null;
+ // window length
if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
windowLengthCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
} else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
@@ -123,7 +144,7 @@ public class WindowedBoltExecutor implements IRichBolt {
((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
TimeUnit.MILLISECONDS);
}
-
+ // sliding interval
if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
slidingIntervalCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
} else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
@@ -132,20 +153,73 @@ public class WindowedBoltExecutor implements IRichBolt {
// default is a sliding window of count 1
slidingIntervalCount = new Count(1);
}
+ // tuple ts
+ if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME)) {
+ tupleTsFieldName = (String) stormConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME);
+ // max lag
+ if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) {
+ maxLagMs = ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue();
+ } else {
+ maxLagMs = DEFAULT_MAX_LAG_MS;
+ }
+ // watermark interval
+ int watermarkInterval;
+ if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) {
+ watermarkInterval = ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue();
+ } else {
+ watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS;
+ }
+ waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval,
+ maxLagMs, context.getThisSources().keySet());
+ }
// validate
validate(stormConf, windowLengthCount, windowLengthDuration,
slidingIntervalCount, slidingIntervalDuration);
- if (windowLengthCount != null) {
- manager.setWindowLength(windowLengthCount);
+ EvictionPolicy<Tuple> evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration,
+ manager);
+ TriggerPolicy<Tuple> triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration,
+ manager, evictionPolicy);
+ manager.setEvictionPolicy(evictionPolicy);
+ manager.setTriggerPolicy(triggerPolicy);
+ return manager;
+ }
+
+ private boolean isTupleTs() {
+ return tupleTsFieldName != null;
+ }
+
+ private TriggerPolicy<Tuple> getTriggerPolicy(Count slidingIntervalCount, Duration slidingIntervalDuration,
+ WindowManager<Tuple> manager, EvictionPolicy<Tuple> evictionPolicy) {
+ if (slidingIntervalCount != null) {
+ if (isTupleTs()) {
+ return new WatermarkCountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy, manager);
+ } else {
+ return new CountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy);
+ }
} else {
- manager.setWindowLength(windowLengthDuration);
+ if (isTupleTs()) {
+ return new WatermarkTimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy, manager);
+ } else {
+ return new TimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy);
+ }
}
- if (slidingIntervalCount != null) {
- manager.setSlidingInterval(slidingIntervalCount);
+ }
+
+ private EvictionPolicy<Tuple> getEvictionPolicy(Count windowLengthCount, Duration windowLengthDuration,
+ WindowManager<Tuple> manager) {
+ if (windowLengthCount != null) {
+ if (isTupleTs()) {
+ return new WatermarkCountEvictionPolicy<>(windowLengthCount.value, manager);
+ } else {
+ return new CountEvictionPolicy<>(windowLengthCount.value);
+ }
} else {
- manager.setSlidingInterval(slidingIntervalDuration);
+ if (isTupleTs()) {
+ return new WatermarkTimeEvictionPolicy<>(windowLengthDuration.value, maxLagMs);
+ } else {
+ return new TimeEvictionPolicy<>(windowLengthDuration.value);
+ }
}
- return manager;
}
@Override
@@ -153,13 +227,19 @@ public class WindowedBoltExecutor implements IRichBolt {
this.windowedOutputCollector = new WindowedOutputCollector(collector);
bolt.prepare(stormConf, context, windowedOutputCollector);
this.listener = newWindowLifecycleListener();
- this.windowManager = initWindowManager(listener, stormConf);
+ this.windowManager = initWindowManager(listener, stormConf, context);
LOG.debug("Initialized window manager {} ", this.windowManager);
}
@Override
public void execute(Tuple input) {
- windowManager.add(input);
+ if (isTupleTs()) {
+ long ts = input.getLongByField(tupleTsFieldName);
+ windowManager.add(input, ts);
+ waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts);
+ } else {
+ windowManager.add(input);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java b/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
index fd4af90..2f49661 100644
--- a/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
+++ b/storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java
@@ -157,6 +157,39 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
return withWindowLength(duration).withSlidingInterval(duration);
}
+ /**
+ * Specify a field in the tuple that represents the timestamp as a long value. If this
+ * field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
+ *
+ * @param fieldName the name of the field that contains the timestamp
+ */
+ public BaseWindowedBolt withTimestampField(String fieldName) {
+ windowConfiguration.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME, fieldName);
+ return this;
+ }
+
+ /**
+ * Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
+ * cannot be out of order by more than this amount.
+ *
+ * @param duration the max lag duration
+ */
+ public BaseWindowedBolt withLag(Duration duration) {
+ windowConfiguration.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, duration.value);
+ return this;
+ }
+
+ /**
+ * Specify the watermark event generation interval. For tuple based timestamps, watermark events
+ * are used to track the progress of time
+ *
+ * @param interval the interval at which watermark events are generated
+ */
+ public BaseWindowedBolt withWatermarkInterval(Duration interval) {
+ windowConfiguration.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, interval.value);
+ return this;
+ }
+
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// NOOP
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
index a6779a8..a49702c 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class CountEvictionPolicy<T> implements EvictionPolicy<T> {
private final int threshold;
- private final AtomicInteger currentCount;
+ protected final AtomicInteger currentCount;
public CountEvictionPolicy(int count) {
this.threshold = count;
@@ -35,7 +35,7 @@ public class CountEvictionPolicy<T> implements EvictionPolicy<T> {
}
@Override
- public boolean evict(Event<T> event) {
+ public Action evict(Event<T> event) {
/*
* atomically decrement the count if its greater than threshold and
* return if the event should be evicted
@@ -44,18 +44,25 @@ public class CountEvictionPolicy<T> implements EvictionPolicy<T> {
int curVal = currentCount.get();
if (curVal > threshold) {
if (currentCount.compareAndSet(curVal, curVal - 1)) {
- return true;
+ return Action.EXPIRE;
}
} else {
break;
}
}
- return false;
+ return Action.PROCESS;
}
@Override
public void track(Event<T> event) {
- currentCount.incrementAndGet();
+ if (!event.isWatermark()) {
+ currentCount.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void setContext(Object context) {
+ // NOOP
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
index 3b1bf9f..ffe1b90 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java
@@ -29,17 +29,22 @@ public class CountTriggerPolicy<T> implements TriggerPolicy<T> {
private final int count;
private final AtomicInteger currentCount;
private final TriggerHandler handler;
+ private final EvictionPolicy<T> evictionPolicy;
- public CountTriggerPolicy(int count, TriggerHandler handler) {
+ public CountTriggerPolicy(int count, TriggerHandler handler, EvictionPolicy<T> evictionPolicy) {
this.count = count;
this.currentCount = new AtomicInteger();
this.handler = handler;
+ this.evictionPolicy = evictionPolicy;
}
@Override
public void track(Event<T> event) {
- if (currentCount.incrementAndGet() >= count) {
- handler.onTrigger();
+ if (!event.isWatermark()) {
+ if (currentCount.incrementAndGet() >= count) {
+ evictionPolicy.setContext(System.currentTimeMillis());
+ handler.onTrigger();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/Event.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/Event.java b/storm-core/src/jvm/backtype/storm/windowing/Event.java
index 4855701..a7bcb9b 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/Event.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/Event.java
@@ -38,4 +38,12 @@ interface Event<T> {
* @return the wrapped object.
*/
T get();
+
+ /**
+ * If this is a watermark event or not. Watermark events are used
+ * for tracking time while processing event based ts.
+ *
+ * @return true if this is a watermark event
+ */
+ boolean isWatermark();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java b/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
index 09c974c..035f41b 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/EventImpl.java
@@ -35,4 +35,17 @@ class EventImpl<T> implements Event<T> {
public T get() {
return event;
}
+
+ @Override
+ public boolean isWatermark() {
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "EventImpl{" +
+ "event=" + event +
+ ", ts=" + ts +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
index 8820e92..c3b3032 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java
@@ -25,12 +25,36 @@ package backtype.storm.windowing;
*/
public interface EvictionPolicy<T> {
/**
- * Decides if an event should be evicted from the window or not.
+ * The action to be taken when {@link EvictionPolicy#evict(Event)} is invoked.
+ */
+ enum Action {
+ /**
+ * expire the event and remove it from the queue
+ */
+ EXPIRE,
+ /**
+ * process the event in the current window of events
+ */
+ PROCESS,
+ /**
+ * don't include in the current window but keep the event
+ * in the queue for evaluating as a part of future windows
+ */
+ KEEP,
+ /**
+ * stop processing the queue, there cannot be anymore events
+ * satisfying the eviction policy
+ */
+ STOP
+ }
+ /**
+ * Decides if an event should be expired from the window, processed in the current
+ * window or kept for later processing.
*
* @param event the input event
- * @return true if the event should be evicted, false otherwise
+ * @return the {@link backtype.storm.windowing.EvictionPolicy.Action} to be taken based on the input event
*/
- boolean evict(Event<T> event);
+ Action evict(Event<T> event);
/**
* Tracks the event to later decide whether
@@ -39,4 +63,12 @@ public interface EvictionPolicy<T> {
* @param event the input event to be tracked
*/
void track(Event<T> event);
+
+ /**
+ * Sets a context in the eviction policy that can be used while evicting the events.
+ * E.g. For TimeEvictionPolicy, this could be used to set the reference timestamp.
+ *
+ * @param context
+ */
+ void setContext(Object context);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
index 16408f3..117df92 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java
@@ -21,21 +21,34 @@ package backtype.storm.windowing;
* Eviction policy that evicts events based on time duration.
*/
public class TimeEvictionPolicy<T> implements EvictionPolicy<T> {
- private final long duration;
+ private final int windowLength;
+ /**
+ * The reference time in millis for window calculations and
+ * expiring events. If not set it will default to System.currentTimeMillis()
+ */
+ protected Long referenceTime;
- public TimeEvictionPolicy(long millis) {
- this.duration = millis;
+ /**
+ * Constructs a TimeEvictionPolicy that evicts events older
+ * than the given window length in millis
+ *
+ * @param windowLength the duration in milliseconds
+ */
+ public TimeEvictionPolicy(int windowLength) {
+ this.windowLength = windowLength;
}
/**
- * Returns true if the event falls out of the window based on the window duration
- *
- * @param event
- * @return
+ * {@inheritDoc}
*/
@Override
- public boolean evict(Event<T> event) {
- return (System.currentTimeMillis() - event.getTimestamp()) >= duration;
+ public Action evict(Event<T> event) {
+ long now = referenceTime == null ? System.currentTimeMillis() : referenceTime;
+ long diff = now - event.getTimestamp();
+ if (diff >= windowLength) {
+ return Action.EXPIRE;
+ }
+ return Action.PROCESS;
}
@Override
@@ -44,9 +57,15 @@ public class TimeEvictionPolicy<T> implements EvictionPolicy<T> {
}
@Override
+ public void setContext(Object context) {
+ referenceTime = ((Number) context).longValue();
+ }
+
+ @Override
public String toString() {
return "TimeEvictionPolicy{" +
- "duration=" + duration +
+ "windowLength=" + windowLength +
+ ", referenceTime=" + referenceTime +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
index db1fbeb..a32cb4d 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java
@@ -37,12 +37,18 @@ public class TimeTriggerPolicy<T> implements TriggerPolicy<T> {
private final TriggerHandler handler;
private final ScheduledExecutorService executor;
private final ScheduledFuture<?> executorFuture;
+ private final EvictionPolicy<T> evictionPolicy;
public TimeTriggerPolicy(long millis, TriggerHandler handler) {
+ this(millis, handler, null);
+ }
+
+ public TimeTriggerPolicy(long millis, TriggerHandler handler, EvictionPolicy evictionPolicy) {
this.duration = millis;
this.handler = handler;
this.executor = Executors.newSingleThreadScheduledExecutor();
this.executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
+ this.evictionPolicy = evictionPolicy;
}
@Override
@@ -100,6 +106,13 @@ public class TimeTriggerPolicy<T> implements TriggerPolicy<T> {
@Override
public void run() {
try {
+ /*
+ * set the current timestamp as the reference time for the eviction policy
+ * to evict the events
+ */
+ if(evictionPolicy != null) {
+ evictionPolicy.setContext(System.currentTimeMillis());
+ }
handler.onTrigger();
} catch (Throwable th) {
LOG.error("handler.onTrigger failed ", th);
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java b/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java
index f947951..29fe90d 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/TriggerHandler.java
@@ -23,7 +23,9 @@ package backtype.storm.windowing;
*/
interface TriggerHandler {
/**
- * the code to execute when the {@link TriggerPolicy} condition is satisfied.
+ * The code to execute when the {@link TriggerPolicy} condition is satisfied.
+ *
+ * @return true if the window was evaluated with at least one event in the window, false otherwise
*/
- void onTrigger();
+ boolean onTrigger();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEvent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEvent.java b/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEvent.java
new file mode 100644
index 0000000..bada76d
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+/**
+ * Watermark event used for tracking progress of time when
+ * processing event based ts.
+ */
+public class WaterMarkEvent<T> extends EventImpl<T> {
+ public WaterMarkEvent(long ts) {
+ super(null, ts);
+ }
+
+ @Override
+ public boolean isWatermark() {
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "WaterMarkEvent{} " + super.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java b/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java
new file mode 100644
index 0000000..9820da6
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.topology.FailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tracks tuples across input streams and periodically emits watermark events.
+ * Watermark event timestamp is the minimum of the latest tuple timestamps
+ * across all the input streams (minus the lag). Once a watermark event is emitted
+ * any tuple coming with an earlier timestamp can be considered as late events.
+ */
+public class WaterMarkEventGenerator<T> implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(WaterMarkEventGenerator.class);
+ private final WindowManager<T> windowManager;
+ private final int eventTsLag;
+ private final Set<GlobalStreamId> inputStreams;
+ private final Map<GlobalStreamId, Long> streamToTs;
+ private final ScheduledExecutorService executorService;
+ private final ScheduledFuture<?> executorFuture;
+ private long lastWaterMarkTs = 0;
+
+ public WaterMarkEventGenerator(WindowManager<T> windowManager, int interval,
+ int eventTsLag, Set<GlobalStreamId> inputStreams) {
+ this.windowManager = windowManager;
+ streamToTs = new ConcurrentHashMap<>();
+ executorService = Executors.newSingleThreadScheduledExecutor();
+ this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS);
+ this.eventTsLag = eventTsLag;
+ this.inputStreams = inputStreams;
+ }
+
+ public void track(GlobalStreamId stream, long ts) {
+ Long currentVal = streamToTs.get(stream);
+ if (currentVal == null || ts > currentVal) {
+ streamToTs.put(stream, ts);
+ }
+ checkFailures();
+ }
+
+ @Override
+ public void run() {
+ try {
+ long waterMarkTs = computeWaterMarkTs();
+ if (waterMarkTs > lastWaterMarkTs) {
+ this.windowManager.add(new WaterMarkEvent<T>(waterMarkTs - eventTsLag));
+ lastWaterMarkTs = waterMarkTs;
+ }
+ } catch (Throwable th) {
+ LOG.error("Failed while processing watermark event ", th);
+ throw th;
+ }
+ }
+
+ /**
+ * Computes the min ts across all streams.
+ */
+ private long computeWaterMarkTs() {
+ long ts = Long.MIN_VALUE;
+ // only if some data has arrived on each input stream
+ if(streamToTs.size() >= inputStreams.size()) {
+ ts = Long.MAX_VALUE;
+ for (Map.Entry<GlobalStreamId, Long> entry : streamToTs.entrySet()) {
+ ts = Math.min(ts, entry.getValue());
+ }
+ }
+ return ts;
+ }
+
+ private void checkFailures() {
+ if (executorFuture.isDone()) {
+ try {
+ executorFuture.get();
+ } catch (InterruptedException ex) {
+ LOG.error("Got exception ", ex);
+ throw new FailedException(ex);
+ } catch (ExecutionException ex) {
+ LOG.error("Got exception ", ex);
+ throw new FailedException(ex.getCause());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountEvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountEvictionPolicy.java
new file mode 100644
index 0000000..0aa1c6b
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountEvictionPolicy.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+/**
+ * An eviction policy that tracks count based on watermark ts and
+ * evicts events upto the watermark based on a threshold count.
+ *
+ * @param <T> the type of event tracked by this policy.
+ */
+public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> {
+ private final WindowManager<T> windowManager;
+ /*
+ * The reference time in millis for window calculations and
+ * expiring events. If not set it will default to System.currentTimeMillis()
+ */
+ private long referenceTime;
+
+ public WatermarkCountEvictionPolicy(int count, WindowManager<T> windowManager) {
+ super(count);
+ this.windowManager = windowManager;
+ }
+
+ @Override
+ public Action evict(Event<T> event) {
+ if (event.getTimestamp() <= referenceTime) {
+ return super.evict(event);
+ } else {
+ return Action.KEEP;
+ }
+ }
+
+ @Override
+ public void track(Event<T> event) {
+ // NOOP
+ }
+
+ @Override
+ public void setContext(Object context) {
+ referenceTime = (Long) context;
+ currentCount.set(windowManager.getEventCount(referenceTime));
+ }
+
+ @Override
+ public String toString() {
+ return "WatermarkCountEvictionPolicy{" +
+ "referenceTime=" + referenceTime +
+ "} " + super.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountTriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountTriggerPolicy.java
new file mode 100644
index 0000000..510d451
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WatermarkCountTriggerPolicy.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import java.util.List;
+
+/**
+ * A trigger policy that tracks event counts and sets the context for
+ * eviction policy to evict based on latest watermark time.
+ *
+ * @param <T> the type of event tracked by this policy.
+ */
+public class WatermarkCountTriggerPolicy<T> implements TriggerPolicy<T> {
+ private final int count;
+ private final TriggerHandler handler;
+ private final EvictionPolicy<T> evictionPolicy;
+ private final WindowManager<T> windowManager;
+ private long lastProcessedTs = 0;
+
+ public WatermarkCountTriggerPolicy(int count, TriggerHandler handler,
+ EvictionPolicy<T> evictionPolicy, WindowManager<T> windowManager) {
+ this.count = count;
+ this.handler = handler;
+ this.evictionPolicy = evictionPolicy;
+ this.windowManager = windowManager;
+ }
+
+ @Override
+ public void track(Event<T> event) {
+ if (event.isWatermark()) {
+ handleWaterMarkEvent(event);
+ }
+ }
+
+ @Override
+ public void reset() {
+ // NOOP
+ }
+
+ @Override
+ public void shutdown() {
+ // NOOP
+ }
+
+ /**
+ * Triggers all the pending windows up to the waterMarkEvent timestamp
+ * based on the sliding interval count.
+ *
+ * @param waterMarkEvent the watermark event
+ */
+ private void handleWaterMarkEvent(Event<T> waterMarkEvent) {
+ long watermarkTs = waterMarkEvent.getTimestamp();
+ List<Long> eventTs = windowManager.getSlidingCountTimestamps(lastProcessedTs, watermarkTs, count);
+ for (long ts : eventTs) {
+ evictionPolicy.setContext(ts);
+ handler.onTrigger();
+ lastProcessedTs = ts;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "WatermarkCountTriggerPolicy{" +
+ "count=" + count +
+ ", lastProcessedTs=" + lastProcessedTs +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeEvictionPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeEvictionPolicy.java
new file mode 100644
index 0000000..202726e
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeEvictionPolicy.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+/**
+ * An eviction policy that evicts events based on time duration taking
+ * watermark time and event lag into account.
+ */
+public class WatermarkTimeEvictionPolicy<T> extends TimeEvictionPolicy<T> {
+ private final int lag;
+
+ /**
+ * Constructs a WatermarkTimeEvictionPolicy that evicts events older
+ * than the given window length in millis.
+ *
+ * @param windowLength the window length in milliseconds
+ */
+ public WatermarkTimeEvictionPolicy(int windowLength) {
+ this(windowLength, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Constructs a WatermarkTimeEvictionPolicy that evicts events older
+ * than the given window length in millis. The lag parameter
+ * can be used in the case of event based ts to break the queue
+ * scan early.
+ *
+ * @param windowLength the window length in milliseconds
+ * @param lag the max event lag in milliseconds
+ */
+ public WatermarkTimeEvictionPolicy(int windowLength, int lag) {
+ super(windowLength);
+ referenceTime = 0L;
+ this.lag = lag;
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p/>
+ * Keeps events with future ts in the queue for processing in the next
+ * window. If the ts difference is more than the lag, stops scanning
+ * the queue for the current window.
+ */
+ @Override
+ public Action evict(Event<T> event) {
+ long diff = referenceTime - event.getTimestamp();
+ if (diff < -lag) {
+ return Action.STOP;
+ } else if (diff < 0) {
+ return Action.KEEP;
+ } else {
+ return super.evict(event);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "WatermarkTimeEvictionPolicy{" +
+ "lag=" + lag +
+ "} " + super.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeTriggerPolicy.java b/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeTriggerPolicy.java
new file mode 100644
index 0000000..8b5cd60
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/windowing/WatermarkTimeTriggerPolicy.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles watermark events and triggers {@link TriggerHandler#onTrigger()} for each window
+ * interval that has events to be processed up to the watermark ts.
+ */
+public class WatermarkTimeTriggerPolicy<T> implements TriggerPolicy<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(WatermarkTimeTriggerPolicy.class);
+ private final long slidingIntervalMs;
+ private final TriggerHandler handler;
+ private final EvictionPolicy<T> evictionPolicy;
+ private final WindowManager<T> windowManager;
+ private long nextWindowEndTs = 0;
+
+ public WatermarkTimeTriggerPolicy(long slidingIntervalMs, TriggerHandler handler, EvictionPolicy evictionPolicy,
+ WindowManager<T> windowManager) {
+ this.slidingIntervalMs = slidingIntervalMs;
+ this.handler = handler;
+ this.evictionPolicy = evictionPolicy;
+ this.windowManager = windowManager;
+ }
+
+ @Override
+ public void track(Event<T> event) {
+ if (event.isWatermark()) {
+ handleWaterMarkEvent(event);
+ }
+ }
+
+ @Override
+ public void reset() {
+ // NOOP
+ }
+
+ @Override
+ public void shutdown() {
+ // NOOP
+ }
+
+ /**
+ * Invokes the trigger all pending windows up to the
+ * watermark timestamp. The end ts of the window is set
+ * in the eviction policy context so that the events falling
+ * within that window can be processed.
+ */
+ private void handleWaterMarkEvent(Event<T> event) {
+ long watermarkTs = event.getTimestamp();
+ long windowEndTs = nextWindowEndTs;
+ LOG.debug("Window end ts {} Watermark ts {}", windowEndTs, watermarkTs);
+ while (windowEndTs <= watermarkTs) {
+ evictionPolicy.setContext(windowEndTs);
+ if (handler.onTrigger()) {
+ windowEndTs += slidingIntervalMs;
+ } else {
+ /*
+ * No events were found in the previous window interval.
+ * Scan through the events in the queue to find the next
+ * window intervals based on event ts.
+ */
+ long ts = getNextAlignedWindowTs(windowEndTs, watermarkTs);
+ LOG.debug("Next aligned window end ts {}", ts);
+ if (ts == Long.MAX_VALUE) {
+ LOG.debug("No events to process between {} and watermark ts {}", windowEndTs, watermarkTs);
+ break;
+ }
+ windowEndTs = ts;
+ }
+ }
+ nextWindowEndTs = windowEndTs;
+ }
+
+ /**
+ * Computes the next window by scanning the events in the window and
+ * finds the next aligned window between the startTs and endTs. Return the end ts
+ * of the next aligned window, i.e. the ts when the window should fire.
+ *
+ * @param startTs the start timestamp (excluding)
+ * @param endTs the end timestamp (including)
+ * @return the aligned window end ts for the next window or Long.MAX_VALUE if there
+ * are no more events to be processed.
+ */
+ private long getNextAlignedWindowTs(long startTs, long endTs) {
+ long nextTs = windowManager.getEarliestEventTs(startTs, endTs);
+ if (nextTs == Long.MAX_VALUE || (nextTs % slidingIntervalMs == 0)) {
+ return nextTs;
+ }
+ return nextTs + (slidingIntervalMs - (nextTs % slidingIntervalMs));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java b/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
index 6603abf..b783eb0 100644
--- a/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
+++ b/storm-core/src/jvm/backtype/storm/windowing/WindowManager.java
@@ -17,11 +17,11 @@
*/
package backtype.storm.windowing;
+import backtype.storm.windowing.EvictionPolicy.Action;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -30,8 +30,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
-import static backtype.storm.topology.base.BaseWindowedBolt.Count;
-import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
+import static backtype.storm.windowing.EvictionPolicy.Action.EXPIRE;
+import static backtype.storm.windowing.EvictionPolicy.Action.PROCESS;
+import static backtype.storm.windowing.EvictionPolicy.Action.STOP;
/**
* Tracks a window of events and fires {@link WindowLifecycleListener} callbacks
@@ -49,7 +50,7 @@ public class WindowManager<T> implements TriggerHandler {
public static final int EXPIRE_EVENTS_THRESHOLD = 100;
private final WindowLifecycleListener<T> windowLifecycleListener;
- private final ConcurrentLinkedQueue<Event<T>> window;
+ private final ConcurrentLinkedQueue<Event<T>> queue;
private final List<T> expiredEvents;
private final Set<Event<T>> prevWindowEvents;
private final AtomicInteger eventsSinceLastExpiry;
@@ -59,27 +60,19 @@ public class WindowManager<T> implements TriggerHandler {
public WindowManager(WindowLifecycleListener<T> lifecycleListener) {
windowLifecycleListener = lifecycleListener;
- window = new ConcurrentLinkedQueue<>();
+ queue = new ConcurrentLinkedQueue<>();
expiredEvents = new ArrayList<>();
prevWindowEvents = new HashSet<>();
eventsSinceLastExpiry = new AtomicInteger();
lock = new ReentrantLock(true);
}
- public void setWindowLength(Count count) {
- this.evictionPolicy = new CountEvictionPolicy<>(count.value);
+ public void setEvictionPolicy(EvictionPolicy<T> evictionPolicy) {
+ this.evictionPolicy = evictionPolicy;
}
- public void setWindowLength(Duration duration) {
- this.evictionPolicy = new TimeEvictionPolicy<>(duration.value);
- }
-
- public void setSlidingInterval(Count count) {
- this.triggerPolicy = new CountTriggerPolicy<>(count.value, this);
- }
-
- public void setSlidingInterval(Duration duration) {
- this.triggerPolicy = new TimeTriggerPolicy<>(duration.value, this);
+ public void setTriggerPolicy(TriggerPolicy<T> triggerPolicy) {
+ this.triggerPolicy = triggerPolicy;
}
/**
@@ -99,8 +92,21 @@ public class WindowManager<T> implements TriggerHandler {
* @param ts the timestamp
*/
public void add(T event, long ts) {
- Event<T> windowEvent = new EventImpl<T>(event, ts);
- window.add(windowEvent);
+ add(new EventImpl<T>(event, ts));
+ }
+
+ /**
+ * Tracks a window event
+ *
+ * @param windowEvent the window event to track
+ */
+ public void add(Event<T> windowEvent) {
+ // watermark events are not added to the queue.
+ if (!windowEvent.isWatermark()) {
+ queue.add(windowEvent);
+ } else {
+ LOG.debug("Got watermark event with ts {}", windowEvent.getTimestamp());
+ }
track(windowEvent);
compactWindow();
}
@@ -109,7 +115,7 @@ public class WindowManager<T> implements TriggerHandler {
* The callback invoked by the trigger policy.
*/
@Override
- public void onTrigger() {
+ public boolean onTrigger() {
List<Event<T>> windowEvents = null;
List<T> expired = null;
try {
@@ -118,7 +124,7 @@ public class WindowManager<T> implements TriggerHandler {
* scan the entire window to handle out of order events in
* the case of time based windows.
*/
- windowEvents = expireEvents(true);
+ windowEvents = scanEvents(true);
expired = new ArrayList<>(expiredEvents);
expiredEvents.clear();
} finally {
@@ -133,10 +139,15 @@ public class WindowManager<T> implements TriggerHandler {
}
}
prevWindowEvents.clear();
- prevWindowEvents.addAll(windowEvents);
- LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", windowEvents.size());
- windowLifecycleListener.onActivation(events, newEvents, expired);
+ if (!events.isEmpty()) {
+ prevWindowEvents.addAll(windowEvents);
+ LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
+ windowLifecycleListener.onActivation(events, newEvents, expired);
+ } else {
+ LOG.debug("No events in the window, skipping onActivation");
+ }
triggerPolicy.reset();
+ return !events.isEmpty();
}
public void shutdown() {
@@ -153,7 +164,7 @@ public class WindowManager<T> implements TriggerHandler {
*/
private void compactWindow() {
if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) {
- expireEvents(false);
+ scanEvents(false);
}
}
@@ -167,29 +178,30 @@ public class WindowManager<T> implements TriggerHandler {
}
/**
- * Expire events from the window, using the expiration policy to check
+ * Scan events in the queue, using the expiration policy to check
* if the event should be evicted or not.
*
- * @param fullScan if set, will scan the entire window; if not set, will stop
+ * @param fullScan if set, will scan the entire queue; if not set, will stop
* as soon as an event not satisfying the expiration policy is found
- * @return the list of remaining events in the window after expiry
+ * @return the list of events to be processed as a part of the current window
*/
- private List<Event<T>> expireEvents(boolean fullScan) {
- LOG.debug("Expire events, eviction policy {}", evictionPolicy);
+ private List<Event<T>> scanEvents(boolean fullScan) {
+ LOG.debug("Scan events, eviction policy {}", evictionPolicy);
List<T> eventsToExpire = new ArrayList<>();
- List<Event<T>> remaining = new ArrayList<>();
+ List<Event<T>> eventsToProcess = new ArrayList<>();
try {
lock.lock();
- Iterator<Event<T>> it = window.iterator();
+ Iterator<Event<T>> it = queue.iterator();
while (it.hasNext()) {
Event<T> windowEvent = it.next();
- if (evictionPolicy.evict(windowEvent)) {
+ Action action = evictionPolicy.evict(windowEvent);
+ if (action == EXPIRE) {
eventsToExpire.add(windowEvent.get());
it.remove();
- } else if (!fullScan) {
+ } else if (!fullScan || action == STOP) {
break;
- } else {
- remaining.add(windowEvent);
+ } else if (action == PROCESS) {
+ eventsToProcess.add(windowEvent);
}
}
expiredEvents.addAll(eventsToExpire);
@@ -198,8 +210,73 @@ public class WindowManager<T> implements TriggerHandler {
}
eventsSinceLastExpiry.set(0);
LOG.debug("[{}] events expired from window.", eventsToExpire.size());
- windowLifecycleListener.onExpiry(eventsToExpire);
- return remaining;
+ if (!eventsToExpire.isEmpty()) {
+ LOG.debug("invoking windowLifecycleListener.onExpiry");
+ windowLifecycleListener.onExpiry(eventsToExpire);
+ }
+ return eventsToProcess;
+ }
+
+ /**
+ * Scans the event queue and returns the next earliest event ts
+ * between the startTs and endTs
+ *
+ * @param startTs the start ts (exclusive)
+ * @param endTs the end ts (inclusive)
+ * @return the earliest event ts between startTs and endTs
+ */
+ public long getEarliestEventTs(long startTs, long endTs) {
+ long minTs = Long.MAX_VALUE;
+ for (Event<T> event : queue) {
+ if (event.getTimestamp() > startTs && event.getTimestamp() <= endTs) {
+ minTs = Math.min(minTs, event.getTimestamp());
+ }
+ }
+ return minTs;
+ }
+
+ /**
+ * Scans the event queue and returns number of events having
+ * timestamp less than or equal to the reference time.
+ *
+ * @param referenceTime the reference timestamp in millis
+ * @return the count of events with timestamp less than or equal to referenceTime
+ */
+ public int getEventCount(long referenceTime) {
+ int count = 0;
+ for (Event<T> event : queue) {
+ if (event.getTimestamp() <= referenceTime) {
+ ++count;
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Scans the event queue and returns the list of event ts
+ * falling between startTs (exclusive) and endTs (inclusive)
+ * at each sliding interval counts.
+ *
+ * @param startTs the start timestamp (exclusive)
+ * @param endTs the end timestamp (inclusive)
+ * @param slidingCount the sliding interval count
+ * @return the list of event ts
+ */
+ public List<Long> getSlidingCountTimestamps(long startTs, long endTs, int slidingCount) {
+ List<Long> timestamps = new ArrayList<>();
+ if (endTs > startTs) {
+ int count = 0;
+ long ts = Long.MIN_VALUE;
+ for (Event<T> event : queue) {
+ if (event.getTimestamp() > startTs && event.getTimestamp() <= endTs) {
+ ts = Math.max(ts, event.getTimestamp());
+ if (++count % slidingCount == 0) {
+ timestamps.add(ts);
+ }
+ }
+ }
+ }
+ return timestamps;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/06d4bb6e/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java b/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
new file mode 100644
index 0000000..1c59f3a
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/topology/WindowedBoltExecutorTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.topology;
+
+import backtype.storm.Config;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseWindowedBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import backtype.storm.tuple.Values;
+import backtype.storm.windowing.TupleWindow;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit tests for {@link WindowedBoltExecutor}
+ */
+public class WindowedBoltExecutorTest {
+
+ private WindowedBoltExecutor executor;
+ private TestWindowedBolt testWindowedBolt;
+
+ private static class TestWindowedBolt extends BaseWindowedBolt {
+ List<TupleWindow> tupleWindows = new ArrayList<>();
+
+ @Override
+ public void execute(TupleWindow input) {
+ //System.out.println(input);
+ tupleWindows.add(input);
+ }
+ }
+
+ private GeneralTopologyContext getContext(final Fields fields) {
+ TopologyBuilder builder = new TopologyBuilder();
+ return new GeneralTopologyContext(builder.createTopology(),
+ new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+ @Override
+ public Fields getComponentOutputFields(String componentId, String streamId) {
+ return fields;
+ }
+
+ };
+ }
+
+ private Tuple getTuple(String streamId, final Fields fields, Values values) {
+ return new TupleImpl(getContext(fields), values, 1, streamId) {
+ @Override
+ public GlobalStreamId getSourceGlobalStreamId() {
+ return new GlobalStreamId("s1", "default");
+ }
+ };
+ }
+
+ private OutputCollector getOutputCollector() {
+ return Mockito.mock(OutputCollector.class);
+ }
+
+ private TopologyContext getTopologyContext() {
+ TopologyContext context = Mockito.mock(TopologyContext.class);
+ Map<GlobalStreamId, Grouping> sources = Collections.singletonMap(
+ new GlobalStreamId("s1", "default"),
+ null
+ );
+ Mockito.when(context.getThisSources()).thenReturn(sources);
+ return context;
+ }
+
+ @Before
+ public void setUp() {
+ testWindowedBolt = new TestWindowedBolt();
+ executor = new WindowedBoltExecutor(testWindowedBolt);
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+ conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20);
+ conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10);
+ conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME, "ts");
+ conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5);
+ conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 100);
+ executor.prepare(conf, getTopologyContext(), getOutputCollector());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testExecuteWithoutTs() throws Exception {
+ executor.execute(getTuple("s1", new Fields("a"), new Values(1)));
+ }
+
+ @Test
+ public void testExecuteWithTs() throws Exception {
+ long[] timstamps = {603, 605, 607, 618, 626, 636};
+ for (long ts : timstamps) {
+ executor.execute(getTuple("s1", new Fields("ts"), new Values(ts)));
+ }
+ Thread.sleep(120);
+ //System.out.println(testWindowedBolt.tupleWindows);
+ assertEquals(3, testWindowedBolt.tupleWindows.size());
+ TupleWindow first = testWindowedBolt.tupleWindows.get(0);
+ assertArrayEquals(new long[]{603, 605, 607},
+ new long[]{(long) first.get().get(0).getValue(0), (long)first.get().get(1).getValue(0),
+ (long)first.get().get(2).getValue(0)});
+
+ TupleWindow second = testWindowedBolt.tupleWindows.get(1);
+ assertArrayEquals(new long[]{603, 605, 607, 618},
+ new long[]{(long) second.get().get(0).getValue(0), (long) second.get().get(1).getValue(0),
+ (long) second.get().get(2).getValue(0), (long) second.get().get(3).getValue(0)});
+
+ TupleWindow third = testWindowedBolt.tupleWindows.get(2);
+ assertArrayEquals(new long[]{618, 626},
+ new long[]{(long) third.get().get(0).getValue(0), (long)third.get().get(1).getValue(0)});
+ }
+}
\ No newline at end of file