You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sa...@apache.org on 2017/04/13 04:23:35 UTC
[1/3] storm git commit: [STORM-2455] Expose the window start and end
timestamp in TupleWindow
Repository: storm
Updated Branches:
refs/heads/master 4c27ef9d3 -> 4df35c571
[STORM-2455] Expose the window start and end timestamp in TupleWindow
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bf0f9bbf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bf0f9bbf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bf0f9bbf
Branch: refs/heads/master
Commit: bf0f9bbf1d854643193c82b6a68f75b41cb6726c
Parents: 1850dd5
Author: Arun Mahadevan <ar...@apache.org>
Authored: Fri Apr 7 14:25:21 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Tue Apr 11 14:30:05 2017 +0530
----------------------------------------------------------------------
.../storm/streams/WindowedProcessorBolt.java | 2 +-
.../storm/topology/WindowedBoltExecutor.java | 12 ++++++++++--
.../apache/storm/windowing/TupleWindowImpl.java | 20 ++++++++++++++------
.../windowing/WatermarkCountEvictionPolicy.java | 2 +-
.../jvm/org/apache/storm/windowing/Window.java | 14 +++++++++++---
5 files changed, 37 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bf0f9bbf/storm-client/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java b/storm-client/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java
index 3971346..6fc4e69 100644
--- a/storm-client/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java
@@ -62,7 +62,7 @@ class WindowedProcessorBolt extends BaseWindowedBolt implements StreamBolt {
public void execute(TupleWindow inputWindow) {
LOG.trace("Window triggered at {}, inputWindow {}", new Date(), inputWindow);
if (delegate.isEventTimestamp()) {
- delegate.setEventTimestamp(inputWindow.getTimestamp());
+ delegate.setEventTimestamp(inputWindow.getEndTimestamp());
}
for (Tuple tuple : inputWindow.get()) {
Pair<Object, String> valueAndStream = delegate.getValueAndStream(tuple);
http://git-wip-us.apache.org/repos/asf/storm/blob/bf0f9bbf/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
index b592e0b..bfbdb47 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
@@ -71,6 +71,7 @@ public class WindowedBoltExecutor implements IRichBolt {
private transient String lateTupleStream;
private transient TriggerPolicy<Tuple> triggerPolicy;
private transient EvictionPolicy<Tuple> evictionPolicy;
+ private transient Duration windowLengthDuration;
// package level for unit tests
transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
@@ -146,7 +147,6 @@ public class WindowedBoltExecutor implements IRichBolt {
private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf,
TopologyContext context) {
WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener);
- Duration windowLengthDuration = null;
Count windowLengthCount = null;
Duration slidingIntervalDuration = null;
Count slidingIntervalCount = null;
@@ -329,7 +329,15 @@ public class WindowedBoltExecutor implements IRichBolt {
@Override
public void onActivation(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) {
windowedOutputCollector.setContext(tuples);
- bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples, timestamp));
+ bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples, getWindowStartTs(timestamp), timestamp));
+ }
+
+ private Long getWindowStartTs(Long endTs) {
+ Long res = null;
+ if (endTs != null && windowLengthDuration != null) {
+ res = endTs - windowLengthDuration.value;
+ }
+ return res;
}
};
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bf0f9bbf/storm-client/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java b/storm-client/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java
index 1e8b022..4b61c8e 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java
@@ -28,17 +28,20 @@ public class TupleWindowImpl implements TupleWindow {
private final List<Tuple> tuples;
private final List<Tuple> newTuples;
private final List<Tuple> expiredTuples;
- private final Long timestamp;
+ private final Long startTimestamp;
+ private final Long endTimestamp;
public TupleWindowImpl(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples) {
- this(tuples, newTuples, expiredTuples, null);
+ this(tuples, newTuples, expiredTuples, null, null);
}
- public TupleWindowImpl(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) {
+ public TupleWindowImpl(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples,
+ Long startTimestamp, Long endTimestamp) {
this.tuples = tuples;
this.newTuples = newTuples;
this.expiredTuples = expiredTuples;
- this.timestamp = timestamp;
+ this.startTimestamp = startTimestamp;
+ this.endTimestamp = endTimestamp;
}
@Override
@@ -57,8 +60,13 @@ public class TupleWindowImpl implements TupleWindow {
}
@Override
- public Long getTimestamp() {
- return timestamp != null ? timestamp : System.currentTimeMillis();
+ public Long getStartTimestamp() {
+ return startTimestamp;
+ }
+
+ @Override
+ public Long getEndTimestamp() {
+ return endTimestamp;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/bf0f9bbf/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
index 7304366..c5d7b49 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
@@ -33,7 +33,7 @@ public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> {
@Override
public Action evict(Event<T> event) {
Action action;
- if (event.getTimestamp() <= super.getContext().getReferenceTime() && processed < currentCount.get()) {
+ if (event.getTimestamp() <= getContext().getReferenceTime() && processed < currentCount.get()) {
action = super.evict(event);
if (action == Action.PROCESS) {
++processed;
http://git-wip-us.apache.org/repos/asf/storm/blob/bf0f9bbf/storm-client/src/jvm/org/apache/storm/windowing/Window.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/Window.java b/storm-client/src/jvm/org/apache/storm/windowing/Window.java
index 9a62eef..2e2973a 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/Window.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/Window.java
@@ -47,9 +47,17 @@ public interface Window<T> {
List<T> getExpired();
/**
- * If processing based on event time, returns the watermark time otherwise the current timestamp.
+ * If processing based on event time, returns the window end time based on watermark otherwise
+ * returns the window end time based on processing time.
*
- * @return the window timestamp
+ * @return the window end timestamp
*/
- Long getTimestamp();
+ Long getEndTimestamp();
+
+ /**
+ * Returns the window start timestamp. Will return null if the window length is not based on time duration.
+ *
+ * @return the window start timestamp or null if the window length is not time based
+ */
+ Long getStartTimestamp();
}
[2/3] storm git commit: Merge branch 'STORM-2455' of
https://github.com/arunmahadevan/storm into STORM-2455
Posted by sa...@apache.org.
Merge branch 'STORM-2455' of https://github.com/arunmahadevan/storm into STORM-2455
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/51e60185
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/51e60185
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/51e60185
Branch: refs/heads/master
Commit: 51e601852d9c3de3c9af722f5d2c5ea79b5e8b9f
Parents: 4c27ef9 bf0f9bb
Author: Satish Duggana <sa...@apache.org>
Authored: Thu Apr 13 09:35:56 2017 +0530
Committer: Satish Duggana <sa...@apache.org>
Committed: Thu Apr 13 09:35:56 2017 +0530
----------------------------------------------------------------------
.../storm/streams/WindowedProcessorBolt.java | 2 +-
.../storm/topology/WindowedBoltExecutor.java | 12 ++++++++++--
.../apache/storm/windowing/TupleWindowImpl.java | 20 ++++++++++++++------
.../windowing/WatermarkCountEvictionPolicy.java | 2 +-
.../jvm/org/apache/storm/windowing/Window.java | 14 +++++++++++---
5 files changed, 37 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: Added STORM-2455 to CHANGELOG.md
Posted by sa...@apache.org.
Added STORM-2455 to CHANGELOG.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4df35c57
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4df35c57
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4df35c57
Branch: refs/heads/master
Commit: 4df35c571a1c8d549a57f6102d413b60b9c86458
Parents: 51e6018
Author: Satish Duggana <sa...@apache.org>
Authored: Thu Apr 13 09:53:06 2017 +0530
Committer: Satish Duggana <sa...@apache.org>
Committed: Thu Apr 13 09:53:06 2017 +0530
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4df35c57/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d626c6e..da21850 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
\ufeff## 2.0.0
+ * STORM-2455: Expose the window start and end timestamp in TupleWindow
* STORM-2435: Logging levels and consistency with console.log etc
* STORM-2465: modify storm-redis's READEME.md and update storm-redis.md
* STORM-2464: update storm-mongodb.md