You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ar...@apache.org on 2017/05/30 09:18:28 UTC

[1/3] storm git commit: STORM-2489: Overlap and data loss on WindowedBolt based on Duration

Repository: storm
Updated Branches:
  refs/heads/master adc3dd63b -> 349eb0283


STORM-2489: Overlap and data loss on WindowedBolt based on Duration

- Fixed time eviction to not evict events beyond current timestamp.
- Introduce a processing time delta for dynamically adjusting window boundaries


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/918849fb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/918849fb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/918849fb

Branch: refs/heads/master
Commit: 918849fb44d9d9133801f561a608ed42f774b421
Parents: 465c9c6
Author: Arun Mahadevan <ar...@apache.org>
Authored: Tue Apr 25 12:37:43 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Sun May 28 23:40:02 2017 +0530

----------------------------------------------------------------------
 .../storm/topology/WindowedBoltExecutor.java    |  2 +-
 .../storm/windowing/DefaultEvictionContext.java | 11 ++++++++
 .../apache/storm/windowing/EvictionContext.java |  8 ++++++
 .../storm/windowing/TimeEvictionPolicy.java     | 27 ++++++++++++++++++--
 .../storm/windowing/TimeTriggerPolicy.java      |  4 ++-
 5 files changed, 48 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/918849fb/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
index 56c329e..0efe557 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
@@ -273,7 +273,7 @@ public class WindowedBoltExecutor implements IRichBolt {
         this.listener = newWindowLifecycleListener();
         this.windowManager = initWindowManager(listener, topoConf, context);
         start();
-        LOG.debug("Initialized window manager {} ", this.windowManager);
+        LOG.info("Initialized window manager {} ", windowManager);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/918849fb/storm-client/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java b/storm-client/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java
index ef65f66..cc16664 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java
@@ -21,6 +21,7 @@ public class DefaultEvictionContext implements EvictionContext {
     private final Long referenceTime;
     private final Long currentCount;
     private final Long slidingCount;
+    private final Long slidingInterval;
 
     public DefaultEvictionContext(Long referenceTime) {
         this(referenceTime, null);
@@ -31,9 +32,14 @@ public class DefaultEvictionContext implements EvictionContext {
     }
 
     public DefaultEvictionContext(Long referenceTime, Long currentCount, Long slidingCount) {
+        this(referenceTime, currentCount, slidingCount, null);
+    }
+
+    public DefaultEvictionContext(Long referenceTime, Long currentCount, Long slidingCount, Long slidingInterval) {
         this.referenceTime = referenceTime;
         this.currentCount = currentCount;
         this.slidingCount = slidingCount;
+        this.slidingInterval = slidingInterval;
     }
 
     @Override
@@ -50,4 +56,9 @@ public class DefaultEvictionContext implements EvictionContext {
     public Long getSlidingCount() {
         return slidingCount;
     }
+
+    @Override
+    public Long getSlidingInterval() {
+        return slidingInterval;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/918849fb/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java b/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java
index ee5fdb9..7582e26 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java
@@ -37,6 +37,14 @@ public interface EvictionContext {
      */
     Long getSlidingCount();
 
+
+    /**
+     * Returns the sliding interval for time based windows
+     *
+     * @return the sliding interval
+     */
+    Long getSlidingInterval();
+
     /**
      * Returns the current count of events in the queue up to the reference time
      * based on which count based evictions can be performed.

http://git-wip-us.apache.org/repos/asf/storm/blob/918849fb/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
index 570b057..d484db3 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
@@ -17,12 +17,18 @@
  */
 package org.apache.storm.windowing;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Eviction policy that evicts events based on time duration.
  */
 public class TimeEvictionPolicy<T> implements EvictionPolicy<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(TimeEvictionPolicy.class);
+
     private final int windowLength;
     protected EvictionContext evictionContext;
+    private long delta;
 
     /**
      * Constructs a TimeEvictionPolicy that evicts events older
@@ -41,8 +47,10 @@ public class TimeEvictionPolicy<T> implements EvictionPolicy<T> {
     public Action evict(Event<T> event) {      
         long now = evictionContext == null ? System.currentTimeMillis() : evictionContext.getReferenceTime();
         long diff = now - event.getTimestamp();
-        if (diff >= windowLength) {
+        if (diff >= (windowLength + delta)) {
             return Action.EXPIRE;
+        } else if (diff < 0) { // do not process events beyond current ts
+            return Action.KEEP;
         }
         return Action.PROCESS;
     }
@@ -54,7 +62,22 @@ public class TimeEvictionPolicy<T> implements EvictionPolicy<T> {
 
     @Override
     public void setContext(EvictionContext context) {
-        this.evictionContext = context;
+        EvictionContext prevContext = evictionContext;
+        evictionContext = context;
+        // compute window length adjustment (delta) to account for time drift
+        if (context.getSlidingInterval() != null) {
+            if (prevContext == null) {
+                delta = Integer.MAX_VALUE; // consider all events for the initial window
+            } else {
+                delta = context.getReferenceTime() - prevContext.getReferenceTime() - context.getSlidingInterval();
+                if (Math.abs(delta) > 100) {
+                    LOG.warn("Possible clock drift or long running computation in window; " +
+                                    "Previous eviction time: {}, current eviction time: {}",
+                            prevContext.getReferenceTime(),
+                            context.getReferenceTime());
+                }
+            }
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/918849fb/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
index 6b6d9fa..e23c2e2 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
@@ -109,12 +109,14 @@ public class TimeTriggerPolicy<T> implements TriggerPolicy<T> {
         return new Runnable() {
             @Override
             public void run() {
+                // do not process current timestamp since tuples might arrive while the trigger is executing
+                long now = System.currentTimeMillis() - 1;
                 try {
                     /*
                      * set the current timestamp as the reference time for the eviction policy
                      * to evict the events
                      */
-                    evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
+                    evictionPolicy.setContext(new DefaultEvictionContext(now, null, null, duration));
                     handler.onTrigger();
                 } catch (Throwable th) {
                     LOG.error("handler.onTrigger failed ", th);


[2/3] storm git commit: Merge branch 'STORM-2489' of https://github.com/arunmahadevan/storm into STORM-2489-apache

Posted by ar...@apache.org.
Merge branch 'STORM-2489' of https://github.com/arunmahadevan/storm into STORM-2489-apache


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d4584c19
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d4584c19
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d4584c19

Branch: refs/heads/master
Commit: d4584c197932798817c8b14c8f63dc99df8917d9
Parents: adc3dd6 918849f
Author: Arun Mahadevan <ar...@apache.org>
Authored: Tue May 30 10:17:34 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Tue May 30 10:17:34 2017 +0530

----------------------------------------------------------------------
 .../storm/topology/WindowedBoltExecutor.java    |  2 +-
 .../storm/windowing/DefaultEvictionContext.java | 11 ++++++++
 .../apache/storm/windowing/EvictionContext.java |  8 ++++++
 .../storm/windowing/TimeEvictionPolicy.java     | 27 ++++++++++++++++++--
 .../storm/windowing/TimeTriggerPolicy.java      |  4 ++-
 5 files changed, 48 insertions(+), 4 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: Added STORM-2489 to CHANGELOG.md

Posted by ar...@apache.org.
Added STORM-2489 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/349eb028
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/349eb028
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/349eb028

Branch: refs/heads/master
Commit: 349eb02831ce89ac18f6294624218c0db25f8230
Parents: d4584c1
Author: Arun Mahadevan <ar...@apache.org>
Authored: Tue May 30 10:18:11 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Tue May 30 10:18:11 2017 +0530

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/349eb028/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c5df0c8..299e65f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-2489: Overlap and data loss on WindowedBolt based on Duration
  * STORM-2206: replacing visualization with viz.js
  * STORM-2527: Initialize java.sql.DriverManager earlier to avoid deadlock
  * STORM-2525: Fix flaky integration tests