You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ar...@apache.org on 2017/05/30 09:18:28 UTC
[1/3] storm git commit: STORM-2489: Overlap and data loss on
WindowedBolt based on Duration
Repository: storm
Updated Branches:
refs/heads/master adc3dd63b -> 349eb0283
STORM-2489: Overlap and data loss on WindowedBolt based on Duration
- Fixed time eviction to not evict events beyond current timestamp.
- Introduce a processing time delta for dynamically adjusting window boundaries
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/918849fb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/918849fb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/918849fb
Branch: refs/heads/master
Commit: 918849fb44d9d9133801f561a608ed42f774b421
Parents: 465c9c6
Author: Arun Mahadevan <ar...@apache.org>
Authored: Tue Apr 25 12:37:43 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Sun May 28 23:40:02 2017 +0530
----------------------------------------------------------------------
.../storm/topology/WindowedBoltExecutor.java | 2 +-
.../storm/windowing/DefaultEvictionContext.java | 11 ++++++++
.../apache/storm/windowing/EvictionContext.java | 8 ++++++
.../storm/windowing/TimeEvictionPolicy.java | 27 ++++++++++++++++++--
.../storm/windowing/TimeTriggerPolicy.java | 4 ++-
5 files changed, 48 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/918849fb/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
index 56c329e..0efe557 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
@@ -273,7 +273,7 @@ public class WindowedBoltExecutor implements IRichBolt {
this.listener = newWindowLifecycleListener();
this.windowManager = initWindowManager(listener, topoConf, context);
start();
- LOG.debug("Initialized window manager {} ", this.windowManager);
+ LOG.info("Initialized window manager {} ", windowManager);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/918849fb/storm-client/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java b/storm-client/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java
index ef65f66..cc16664 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java
@@ -21,6 +21,7 @@ public class DefaultEvictionContext implements EvictionContext {
private final Long referenceTime;
private final Long currentCount;
private final Long slidingCount;
+ private final Long slidingInterval;
public DefaultEvictionContext(Long referenceTime) {
this(referenceTime, null);
@@ -31,9 +32,14 @@ public class DefaultEvictionContext implements EvictionContext {
}
public DefaultEvictionContext(Long referenceTime, Long currentCount, Long slidingCount) {
+ this(referenceTime, currentCount, slidingCount, null);
+ }
+
+ public DefaultEvictionContext(Long referenceTime, Long currentCount, Long slidingCount, Long slidingInterval) {
this.referenceTime = referenceTime;
this.currentCount = currentCount;
this.slidingCount = slidingCount;
+ this.slidingInterval = slidingInterval;
}
@Override
@@ -50,4 +56,9 @@ public class DefaultEvictionContext implements EvictionContext {
public Long getSlidingCount() {
return slidingCount;
}
+
+ @Override
+ public Long getSlidingInterval() {
+ return slidingInterval;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/918849fb/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java b/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java
index ee5fdb9..7582e26 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java
@@ -37,6 +37,14 @@ public interface EvictionContext {
*/
Long getSlidingCount();
+
+ /**
+ * Returns the sliding interval for time based windows
+ *
+ * @return the sliding interval
+ */
+ Long getSlidingInterval();
+
/**
* Returns the current count of events in the queue up to the reference time
* based on which count based evictions can be performed.
http://git-wip-us.apache.org/repos/asf/storm/blob/918849fb/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
index 570b057..d484db3 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
@@ -17,12 +17,18 @@
*/
package org.apache.storm.windowing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Eviction policy that evicts events based on time duration.
*/
public class TimeEvictionPolicy<T> implements EvictionPolicy<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(TimeEvictionPolicy.class);
+
private final int windowLength;
protected EvictionContext evictionContext;
+ private long delta;
/**
* Constructs a TimeEvictionPolicy that evicts events older
@@ -41,8 +47,10 @@ public class TimeEvictionPolicy<T> implements EvictionPolicy<T> {
public Action evict(Event<T> event) {
long now = evictionContext == null ? System.currentTimeMillis() : evictionContext.getReferenceTime();
long diff = now - event.getTimestamp();
- if (diff >= windowLength) {
+ if (diff >= (windowLength + delta)) {
return Action.EXPIRE;
+ } else if (diff < 0) { // do not process events beyond current ts
+ return Action.KEEP;
}
return Action.PROCESS;
}
@@ -54,7 +62,22 @@ public class TimeEvictionPolicy<T> implements EvictionPolicy<T> {
@Override
public void setContext(EvictionContext context) {
- this.evictionContext = context;
+ EvictionContext prevContext = evictionContext;
+ evictionContext = context;
+ // compute window length adjustment (delta) to account for time drift
+ if (context.getSlidingInterval() != null) {
+ if (prevContext == null) {
+ delta = Integer.MAX_VALUE; // consider all events for the initial window
+ } else {
+ delta = context.getReferenceTime() - prevContext.getReferenceTime() - context.getSlidingInterval();
+ if (Math.abs(delta) > 100) {
+ LOG.warn("Possible clock drift or long running computation in window; " +
+ "Previous eviction time: {}, current eviction time: {}",
+ prevContext.getReferenceTime(),
+ context.getReferenceTime());
+ }
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/918849fb/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
index 6b6d9fa..e23c2e2 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
@@ -109,12 +109,14 @@ public class TimeTriggerPolicy<T> implements TriggerPolicy<T> {
return new Runnable() {
@Override
public void run() {
+ // do not process current timestamp since tuples might arrive while the trigger is executing
+ long now = System.currentTimeMillis() - 1;
try {
/*
* set the current timestamp as the reference time for the eviction policy
* to evict the events
*/
- evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
+ evictionPolicy.setContext(new DefaultEvictionContext(now, null, null, duration));
handler.onTrigger();
} catch (Throwable th) {
LOG.error("handler.onTrigger failed ", th);
[2/3] storm git commit: Merge branch 'STORM-2489' of
https://github.com/arunmahadevan/storm into STORM-2489-apache
Posted by ar...@apache.org.
Merge branch 'STORM-2489' of https://github.com/arunmahadevan/storm into STORM-2489-apache
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d4584c19
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d4584c19
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d4584c19
Branch: refs/heads/master
Commit: d4584c197932798817c8b14c8f63dc99df8917d9
Parents: adc3dd6 918849f
Author: Arun Mahadevan <ar...@apache.org>
Authored: Tue May 30 10:17:34 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Tue May 30 10:17:34 2017 +0530
----------------------------------------------------------------------
.../storm/topology/WindowedBoltExecutor.java | 2 +-
.../storm/windowing/DefaultEvictionContext.java | 11 ++++++++
.../apache/storm/windowing/EvictionContext.java | 8 ++++++
.../storm/windowing/TimeEvictionPolicy.java | 27 ++++++++++++++++++--
.../storm/windowing/TimeTriggerPolicy.java | 4 ++-
5 files changed, 48 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: Added STORM-2489 to CHANGELOG.md
Posted by ar...@apache.org.
Added STORM-2489 to CHANGELOG.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/349eb028
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/349eb028
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/349eb028
Branch: refs/heads/master
Commit: 349eb02831ce89ac18f6294624218c0db25f8230
Parents: d4584c1
Author: Arun Mahadevan <ar...@apache.org>
Authored: Tue May 30 10:18:11 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Tue May 30 10:18:11 2017 +0530
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/349eb028/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c5df0c8..299e65f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-2489: Overlap and data loss on WindowedBolt based on Duration
* STORM-2206: replacing visualization with viz.js
* STORM-2527: Initialize java.sql.DriverManager earlier to avoid deadlock
* STORM-2525: Fix flaky integration tests