You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by arunmahadevan <gi...@git.apache.org> on 2017/04/25 07:16:03 UTC

[GitHub] storm pull request #2090: STORM-2489: Overlap and data loss on WindowedBolt ...

GitHub user arunmahadevan opened a pull request:

    https://github.com/apache/storm/pull/2090

    STORM-2489: Overlap and data loss on WindowedBolt based on Duration

    - Fixed time eviction to not evict events beyond current timestamp.
    - Modified time trigger to compute the reference tim based on window duration.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/arunmahadevan/storm STORM-2489

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2090.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2090
    
----
commit 09cf77bb84ddcf5ab05757ac4f72ab1d20cf1e82
Author: Arun Mahadevan <ar...@apache.org>
Date:   2017-04-25T07:07:43Z

    STORM-2489: Overlap and data loss on WindowedBolt based on Duration
    
    - Fixed time eviction to not evict events beyond current timestamp.
    - Modified time trigger to compute the reference tim based on window duration.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2090: STORM-2489: Overlap and data loss on WindowedBolt ...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2090#discussion_r113368411
  
    --- Diff: storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java ---
    @@ -62,7 +63,9 @@ public void reset() {
     
         @Override
         public void start() {
    -        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
    +        // initial delay is slightly less than the duration so that the initial tuples wont't expire due to time drift
    +        long initialDelay = duration - Math.min((long) (duration * .1), 100);
    +        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), initialDelay, duration, TimeUnit.MILLISECONDS);
    --- End diff --
    
    Subsequent scheduling will happen after "duration" which would be the window sliding interval.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2090: STORM-2489: Overlap and data loss on WindowedBolt based o...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/2090
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2090: STORM-2489: Overlap and data loss on WindowedBolt ...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2090#discussion_r118846424
  
    --- Diff: storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java ---
    @@ -54,7 +62,22 @@ public void track(Event<T> event) {
     
         @Override
         public void setContext(EvictionContext context) {
    -        this.evictionContext = context;
    +        EvictionContext prevContext = evictionContext;
    +        evictionContext = context;
    +        // compute window length adjustment (delta) to account for time drift
    +        if (context != null && context.getSlidingInterval() != null) {
    --- End diff --
    
    Addressed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2090: STORM-2489: Overlap and data loss on WindowedBolt ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2090#discussion_r118845758
  
    --- Diff: storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java ---
    @@ -54,7 +62,22 @@ public void track(Event<T> event) {
     
         @Override
         public void setContext(EvictionContext context) {
    -        this.evictionContext = context;
    +        EvictionContext prevContext = evictionContext;
    +        evictionContext = context;
    +        // compute window length adjustment (delta) to account for time drift
    +        if (context != null && context.getSlidingInterval() != null) {
    --- End diff --
    
    Nitpick: I'd rather not check for null unless there's a good reason. As far as I can tell there's no reason to reset the EvictionContext to null, so I'd rather we get an NPE if it happens unexpectedly, so we can fix what is likely a bug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2090: STORM-2489: Overlap and data loss on WindowedBolt ...

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2090#discussion_r113366037
  
    --- Diff: storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java ---
    @@ -109,13 +112,22 @@ private Runnable newTriggerTask() {
             return new Runnable() {
                 @Override
                 public void run() {
    +                // compute the current trigger timestamp based on prevTriggerTimestamp,
    +                // since the calculation based on System.currentTimeMillis might have a slight drift
    +                long now = System.currentTimeMillis();
    +                long triggerTs = prevTriggerTimestamp == 0 ? now : prevTriggerTimestamp + duration;
    +                prevTriggerTimestamp = triggerTs;
    +                if (Math.abs(triggerTs - now) > 1000) {
    --- End diff --
    
    Did we think about an option to execute a bolt in a different thread to avoid timing issues in trigger because of user code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2090: STORM-2489: Overlap and data loss on WindowedBolt ...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2090#discussion_r113368417
  
    --- Diff: storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java ---
    @@ -109,13 +112,22 @@ private Runnable newTriggerTask() {
             return new Runnable() {
                 @Override
                 public void run() {
    +                // compute the current trigger timestamp based on prevTriggerTimestamp,
    +                // since the calculation based on System.currentTimeMillis might have a slight drift
    +                long now = System.currentTimeMillis();
    +                long triggerTs = prevTriggerTimestamp == 0 ? now : prevTriggerTimestamp + duration;
    +                prevTriggerTimestamp = triggerTs;
    +                if (Math.abs(triggerTs - now) > 1000) {
    --- End diff --
    
    The timing issues are due to the scheduled executor (which uses nanoTime) which does not exactly trigger after the duration when measured with System.currentTimeMillis. Theres a mismatch of a few ms, which is addressed here. In general user code is expected to return before the next trigger. If user code takes more time than the trigger interval still we set the next trigger timestamp correctly and execute as soon as they return from execute, but this should be very rare and users are not expected to do this, so we log the warning. I don't think we want multiple concurrent executes (i.e trigger execute when they haven't returned from the first execute)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2090: STORM-2489: Overlap and data loss on WindowedBolt ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2090


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2090: STORM-2489: Overlap and data loss on WindowedBolt ...

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2090#discussion_r113369572
  
    --- Diff: storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java ---
    @@ -109,13 +112,22 @@ private Runnable newTriggerTask() {
             return new Runnable() {
                 @Override
                 public void run() {
    +                // compute the current trigger timestamp based on prevTriggerTimestamp,
    +                // since the calculation based on System.currentTimeMillis might have a slight drift
    +                long now = System.currentTimeMillis();
    +                long triggerTs = prevTriggerTimestamp == 0 ? now : prevTriggerTimestamp + duration;
    +                prevTriggerTimestamp = triggerTs;
    +                if (Math.abs(triggerTs - now) > 1000) {
    --- End diff --
    
    OK, Throwing warning incase of large drifts looks fine for now.
    
    Multiple concurrent executes will not happen as there is only one thread which executes the bolt and the subsequent bolt executions wait till the current execution is done.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2090: STORM-2489: Overlap and data loss on WindowedBolt ...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2090#discussion_r113383001
  
    --- Diff: storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java ---
    @@ -62,7 +63,9 @@ public void reset() {
     
         @Override
         public void start() {
    -        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
    +        // initial delay is slightly less than the duration so that the initial tuples wont't expire due to time drift
    +        long initialDelay = duration - Math.min((long) (duration * .1), 100);
    +        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), initialDelay, duration, TimeUnit.MILLISECONDS);
    --- End diff --
    
    We cant't guarantee the absolute time at which the trigger will fire since it depends on when and how long storm takes to initialize the bolt. Only the relative time difference between two window firings can be guaranteed. The issue was that the first trigger was firing a few milliseconds after the duration which was causing the initial tuples to directly expire. I have refactored the logic to not depend on the initial delay being slightly lesser.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2090: STORM-2489: Overlap and data loss on WindowedBolt based o...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2090
  
    I'm wondering if you guys with some experience with windowing would mind taking a look at the question posted in https://github.com/apache/storm/pull/2127? It also has to do with tuples possibly getting expired before they're passed to the user's code, and affects the same policies as this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2090: STORM-2489: Overlap and data loss on WindowedBolt ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2090#discussion_r117826059
  
    --- Diff: storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java ---
    @@ -62,7 +62,9 @@ public void reset() {
     
         @Override
         public void start() {
    -        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
    +        // initial delay is slightly less than the duration so that the initial tuples wont't expire due to time drift
    +        long initialDelay = duration - Math.min((long) (duration * .1), 10);
    +        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), initialDelay, duration, TimeUnit.MILLISECONDS);
    --- End diff --
    
    Also since this is likely going to rely on in-bolt state to avoid expiring tuples before the user sees them, I think we should add to the documentation that this set of policies don't guarantee at-least-once processing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2090: STORM-2489: Overlap and data loss on WindowedBolt ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2090#discussion_r118845794
  
    --- Diff: storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java ---
    @@ -41,8 +47,10 @@ public TimeEvictionPolicy(int windowLength) {
         public Action evict(Event<T> event) {      
             long now = evictionContext == null ? System.currentTimeMillis() : evictionContext.getReferenceTime();
             long diff = now - event.getTimestamp();
    -        if (diff >= windowLength) {
    +        if (diff >= (windowLength + delta)) {
                 return Action.EXPIRE;
    +        } else if (diff < 0) { // do not process events beyond current ts
    +            return Action.KEEP;
    --- End diff --
    
    Nitpick: I think the events are ordered when iterated over in WindowManager, so returning Action.STOP might be marginally faster.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2090: STORM-2489: Overlap and data loss on WindowedBolt based o...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2090
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2090: STORM-2489: Overlap and data loss on WindowedBolt ...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2090#discussion_r118845346
  
    --- Diff: storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java ---
    @@ -62,7 +62,9 @@ public void reset() {
     
         @Override
         public void start() {
    -        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
    +        // initial delay is slightly less than the duration so that the initial tuples wont't expire due to time drift
    +        long initialDelay = duration - Math.min((long) (duration * .1), 10);
    +        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), initialDelay, duration, TimeUnit.MILLISECONDS);
    --- End diff --
    
    @srdo, I removed the initial delay and changed the logic so that the initial events are processed in the first window. Subsequent windows will be adjusted so that even if the trigger is slightly off, the events are considered. 
    
    The processing time windows collects the events into windows based on system timestamp so the user may not see all the events. However the processed events are still guaranteed at-least once based on storm's acking mechanisms. 
    
    With the proposed patch we do a best effort to slightly adjust the window sizes if the triggers are off and log warnings if it exceeds a threshold.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2090: STORM-2489: Overlap and data loss on WindowedBolt ...

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2090#discussion_r113369191
  
    --- Diff: storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java ---
    @@ -62,7 +63,9 @@ public void reset() {
     
         @Override
         public void start() {
    -        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
    +        // initial delay is slightly less than the duration so that the initial tuples wont't expire due to time drift
    +        long initialDelay = duration - Math.min((long) (duration * .1), 100);
    +        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), initialDelay, duration, TimeUnit.MILLISECONDS);
    --- End diff --
    
    subsequentTrigger(x) = initialDelay + x*(delay)
    Right, but the subsequent triggers may happen earlier as the initialDelay is slightly reduced.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2090: STORM-2489: Overlap and data loss on WindowedBolt ...

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2090#discussion_r113363343
  
    --- Diff: storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java ---
    @@ -62,7 +63,9 @@ public void reset() {
     
         @Override
         public void start() {
    -        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
    +        // initial delay is slightly less than the duration so that the initial tuples wont't expire due to time drift
    +        long initialDelay = duration - Math.min((long) (duration * .1), 100);
    +        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), initialDelay, duration, TimeUnit.MILLISECONDS);
    --- End diff --
    
    If the initial delay is slightly less than the given duration then its subsequent scheduling will happen after the given period of initialDelay which can be earlier than expected. Is not it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2090: STORM-2489: Overlap and data loss on WindowedBolt ...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2090#discussion_r118846419
  
    --- Diff: storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java ---
    @@ -41,8 +47,10 @@ public TimeEvictionPolicy(int windowLength) {
         public Action evict(Event<T> event) {      
             long now = evictionContext == null ? System.currentTimeMillis() : evictionContext.getReferenceTime();
             long diff = now - event.getTimestamp();
    -        if (diff >= windowLength) {
    +        if (diff >= (windowLength + delta)) {
                 return Action.EXPIRE;
    +        } else if (diff < 0) { // do not process events beyond current ts
    +            return Action.KEEP;
    --- End diff --
    
    yes, ideally events should be ordered since its processing time, unless `System.currentTimeMillis` goes backwards, so I assumed it could be out of order.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2090: STORM-2489: Overlap and data loss on WindowedBolt ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2090#discussion_r117821993
  
    --- Diff: storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java ---
    @@ -62,7 +62,9 @@ public void reset() {
     
         @Override
         public void start() {
    -        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
    +        // initial delay is slightly less than the duration so that the initial tuples wont't expire due to time drift
    +        long initialDelay = duration - Math.min((long) (duration * .1), 10);
    +        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), initialDelay, duration, TimeUnit.MILLISECONDS);
    --- End diff --
    
    Won't this still be vulnerable to losing initial tuples if the first execution is delayed longer than expected for some reason?
    
    I'm wondering if we could make this more reliable by tracking the time we consider expired instead of/in addition to tracking what time it is now. Tuples between the prevExpired timestamp and the nowExpired timestamp are tuples we can't be sure have been processed, so they should be included in the window. Since the expired timestamps also move ~duration every onTrigger, we still get the tuples expired we expect. For the first onTrigger, we can just process all the tuples because we can tell it's the first call since the prevExpired timestamp isn't set yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---