You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sa...@apache.org on 2017/04/13 04:23:35 UTC

[1/3] storm git commit: [STORM-2455] Expose the window start and end timestamp in TupleWindow

Repository: storm
Updated Branches:
  refs/heads/master 4c27ef9d3 -> 4df35c571


[STORM-2455] Expose the window start and end timestamp in TupleWindow


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bf0f9bbf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bf0f9bbf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bf0f9bbf

Branch: refs/heads/master
Commit: bf0f9bbf1d854643193c82b6a68f75b41cb6726c
Parents: 1850dd5
Author: Arun Mahadevan <ar...@apache.org>
Authored: Fri Apr 7 14:25:21 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Tue Apr 11 14:30:05 2017 +0530

----------------------------------------------------------------------
 .../storm/streams/WindowedProcessorBolt.java    |  2 +-
 .../storm/topology/WindowedBoltExecutor.java    | 12 ++++++++++--
 .../apache/storm/windowing/TupleWindowImpl.java | 20 ++++++++++++++------
 .../windowing/WatermarkCountEvictionPolicy.java |  2 +-
 .../jvm/org/apache/storm/windowing/Window.java  | 14 +++++++++++---
 5 files changed, 37 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bf0f9bbf/storm-client/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java b/storm-client/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java
index 3971346..6fc4e69 100644
--- a/storm-client/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java
@@ -62,7 +62,7 @@ class WindowedProcessorBolt extends BaseWindowedBolt implements StreamBolt {
     public void execute(TupleWindow inputWindow) {
         LOG.trace("Window triggered at {}, inputWindow {}", new Date(), inputWindow);
         if (delegate.isEventTimestamp()) {
-            delegate.setEventTimestamp(inputWindow.getTimestamp());
+            delegate.setEventTimestamp(inputWindow.getEndTimestamp());
         }
         for (Tuple tuple : inputWindow.get()) {
             Pair<Object, String> valueAndStream = delegate.getValueAndStream(tuple);

http://git-wip-us.apache.org/repos/asf/storm/blob/bf0f9bbf/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
index b592e0b..bfbdb47 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
@@ -71,6 +71,7 @@ public class WindowedBoltExecutor implements IRichBolt {
     private transient String lateTupleStream;
     private transient TriggerPolicy<Tuple> triggerPolicy;
     private transient EvictionPolicy<Tuple> evictionPolicy;
+    private transient Duration windowLengthDuration;
     // package level for unit tests
     transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
 
@@ -146,7 +147,6 @@ public class WindowedBoltExecutor implements IRichBolt {
     private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf,
                                                    TopologyContext context) {
         WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener);
-        Duration windowLengthDuration = null;
         Count windowLengthCount = null;
         Duration slidingIntervalDuration = null;
         Count slidingIntervalCount = null;
@@ -329,7 +329,15 @@ public class WindowedBoltExecutor implements IRichBolt {
             @Override
             public void onActivation(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) {
                 windowedOutputCollector.setContext(tuples);
-                bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples, timestamp));
+                bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples, getWindowStartTs(timestamp), timestamp));
+            }
+
+            private Long getWindowStartTs(Long endTs) {
+                Long res = null;
+                if (endTs != null && windowLengthDuration != null) {
+                    res = endTs - windowLengthDuration.value;
+                }
+                return res;
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/bf0f9bbf/storm-client/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java b/storm-client/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java
index 1e8b022..4b61c8e 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java
@@ -28,17 +28,20 @@ public class TupleWindowImpl implements TupleWindow {
     private final List<Tuple> tuples;
     private final List<Tuple> newTuples;
     private final List<Tuple> expiredTuples;
-    private final Long timestamp;
+    private final Long startTimestamp;
+    private final Long endTimestamp;
 
     public TupleWindowImpl(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples) {
-        this(tuples, newTuples, expiredTuples, null);
+        this(tuples, newTuples, expiredTuples, null, null);
     }
 
-    public TupleWindowImpl(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) {
+    public TupleWindowImpl(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples,
+                           Long startTimestamp, Long endTimestamp) {
         this.tuples = tuples;
         this.newTuples = newTuples;
         this.expiredTuples = expiredTuples;
-        this.timestamp = timestamp;
+        this.startTimestamp = startTimestamp;
+        this.endTimestamp = endTimestamp;
     }
 
     @Override
@@ -57,8 +60,13 @@ public class TupleWindowImpl implements TupleWindow {
     }
 
     @Override
-    public Long getTimestamp() {
-        return timestamp != null ? timestamp : System.currentTimeMillis();
+    public Long getStartTimestamp() {
+        return startTimestamp;
+    }
+
+    @Override
+    public Long getEndTimestamp() {
+        return endTimestamp;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/bf0f9bbf/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
index 7304366..c5d7b49 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
@@ -33,7 +33,7 @@ public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> {
     @Override
     public Action evict(Event<T> event) {
         Action action;
-        if (event.getTimestamp() <= super.getContext().getReferenceTime() && processed < currentCount.get()) {
+        if (event.getTimestamp() <= getContext().getReferenceTime() && processed < currentCount.get()) {
             action = super.evict(event);
             if (action == Action.PROCESS) {
                 ++processed;

http://git-wip-us.apache.org/repos/asf/storm/blob/bf0f9bbf/storm-client/src/jvm/org/apache/storm/windowing/Window.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/Window.java b/storm-client/src/jvm/org/apache/storm/windowing/Window.java
index 9a62eef..2e2973a 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/Window.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/Window.java
@@ -47,9 +47,17 @@ public interface Window<T> {
     List<T> getExpired();
 
     /**
-     * If processing based on event time, returns the watermark time otherwise the current timestamp.
+     * If processing based on event time, returns the window end time based on watermark otherwise
+     * returns the window end time based on processing time.
      *
-     * @return the window timestamp
+     * @return the window end timestamp
      */
-    Long getTimestamp();
+    Long getEndTimestamp();
+
+    /**
+     * Returns the window start timestamp. Will return null if the window length is not based on time duration.
+     *
+     * @return the window start timestamp or null if the window length is not time based
+     */
+    Long getStartTimestamp();
 }


[2/3] storm git commit: Merge branch 'STORM-2455' of https://github.com/arunmahadevan/storm into STORM-2455

Posted by sa...@apache.org.
Merge branch 'STORM-2455' of https://github.com/arunmahadevan/storm into STORM-2455


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/51e60185
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/51e60185
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/51e60185

Branch: refs/heads/master
Commit: 51e601852d9c3de3c9af722f5d2c5ea79b5e8b9f
Parents: 4c27ef9 bf0f9bb
Author: Satish Duggana <sa...@apache.org>
Authored: Thu Apr 13 09:35:56 2017 +0530
Committer: Satish Duggana <sa...@apache.org>
Committed: Thu Apr 13 09:35:56 2017 +0530

----------------------------------------------------------------------
 .../storm/streams/WindowedProcessorBolt.java    |  2 +-
 .../storm/topology/WindowedBoltExecutor.java    | 12 ++++++++++--
 .../apache/storm/windowing/TupleWindowImpl.java | 20 ++++++++++++++------
 .../windowing/WatermarkCountEvictionPolicy.java |  2 +-
 .../jvm/org/apache/storm/windowing/Window.java  | 14 +++++++++++---
 5 files changed, 37 insertions(+), 13 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: Added STORM-2455 to CHANGELOG.md

Posted by sa...@apache.org.
Added STORM-2455 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4df35c57
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4df35c57
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4df35c57

Branch: refs/heads/master
Commit: 4df35c571a1c8d549a57f6102d413b60b9c86458
Parents: 51e6018
Author: Satish Duggana <sa...@apache.org>
Authored: Thu Apr 13 09:53:06 2017 +0530
Committer: Satish Duggana <sa...@apache.org>
Committed: Thu Apr 13 09:53:06 2017 +0530

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4df35c57/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d626c6e..da21850 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 \ufeff## 2.0.0
+ * STORM-2455: Expose the window start and end timestamp in TupleWindow
  * STORM-2435: Logging levels and consistency with console.log etc
  * STORM-2465: modify storm-redis's READEME.md and update storm-redis.md
  * STORM-2464: update storm-mongodb.md