You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/15 09:07:28 UTC
flink git commit: [FLINK-4174] Enhance evictor functionality
Repository: flink
Updated Branches:
refs/heads/master 62192c783 -> 74bb7bb63
[FLINK-4174] Enhance evictor functionality
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74bb7bb6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74bb7bb6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74bb7bb6
Branch: refs/heads/master
Commit: 74bb7bb63919ce6de5736d52e4e5a254cf9b6509
Parents: 62192c7
Author: Vishnu Viswanath <vi...@gmail.com>
Authored: Mon Oct 31 18:21:04 2016 -0500
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Nov 15 10:05:24 2016 +0100
----------------------------------------------------------------------
.../api/windowing/evictors/CountEvictor.java | 54 ++-
.../api/windowing/evictors/DeltaEvictor.java | 56 ++-
.../api/windowing/evictors/Evictor.java | 52 ++-
.../api/windowing/evictors/TimeEvictor.java | 84 +++-
.../windowing/EvictingWindowOperator.java | 112 ++++-
.../operators/windowing/TimestampedValue.java | 112 +++++
.../operators/windowing/WindowOperator.java | 4 +-
.../windowing/EvictingWindowOperatorTest.java | 462 +++++++++++++++++++
8 files changed, 886 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
index dc82521..8f5b2d9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
@@ -19,7 +19,9 @@ package org.apache.flink.streaming.api.windowing.evictors;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import java.util.Iterator;
/**
* An {@link Evictor} that keeps up to a certain amount of elements.
@@ -31,26 +33,68 @@ public class CountEvictor<W extends Window> implements Evictor<Object, W> {
private static final long serialVersionUID = 1L;
private final long maxCount;
+ private final boolean doEvictAfter;
+
+ private CountEvictor(long count,boolean doEvictAfter) {
+ this.maxCount = count;
+ this.doEvictAfter = doEvictAfter;
+ }
private CountEvictor(long count) {
this.maxCount = count;
+ this.doEvictAfter = false;
}
@Override
- public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
- if (size > maxCount) {
- return (int) (size - maxCount);
+ public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
+ if (!doEvictAfter) {
+ evict(elements, size, ctx);
+ }
+ }
+
+
+ @Override
+ public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size,W window, EvictorContext ctx) {
+ if (doEvictAfter) {
+ evict(elements, size, ctx);
+ }
+ }
+
+ private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
+ if (size <= maxCount) {
+ return;
} else {
- return 0;
+ int evictedCount = 0;
+ for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
+ iterator.next();
+ evictedCount++;
+ if (evictedCount > size - maxCount) {
+ break;
+ } else {
+ iterator.remove();
+ }
+ }
}
}
/**
* Creates a {@code CountEvictor} that keeps the given number of elements.
+ * Eviction is done before the window function.
*
* @param maxCount The number of elements to keep in the pane.
*/
public static <W extends Window> CountEvictor<W> of(long maxCount) {
return new CountEvictor<>(maxCount);
}
+
+ /**
+ * Creates a {@code CountEvictor} that keeps the given number of elements in the pane
+ * Eviction is done before/after the window function based on the value of doEvictAfter.
+ *
+ * @param maxCount The number of elements to keep in the pane.
+ * @param doEvictAfter Whether to do eviction after the window function.
+ */
+ public static <W extends Window> CountEvictor<W> of(long maxCount, boolean doEvictAfter) {
+ return new CountEvictor<>(maxCount,doEvictAfter);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
index ef4dad6..7ae33b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
@@ -21,15 +21,16 @@ import com.google.common.collect.Iterables;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import java.util.Iterator;
/**
* An {@link Evictor} that keeps elements based on a {@link DeltaFunction} and a threshold.
*
* <p>
* Eviction starts from the first element of the buffer and removes all elements from the buffer
- * which have a higher delta then the threshold. As soon as there is an element with a lower delta,
- * the eviction stops.
+ * which have a higher delta then the threshold.
*
* @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
*/
@@ -39,24 +40,42 @@ public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
DeltaFunction<T> deltaFunction;
private double threshold;
+ private final boolean doEvictAfter;
private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction) {
this.deltaFunction = deltaFunction;
this.threshold = threshold;
+ this.doEvictAfter = false;
+ }
+
+ private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction, boolean doEvictAfter) {
+ this.deltaFunction = deltaFunction;
+ this.threshold = threshold;
+ this.doEvictAfter = doEvictAfter;
}
@Override
- public int evict(Iterable<StreamRecord<T>> elements, int size, W window) {
- StreamRecord<T> lastElement = Iterables.getLast(elements);
- int toEvict = 0;
- for (StreamRecord<T> element : elements) {
- if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) < this.threshold) {
- break;
- }
- toEvict++;
+ public void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext ctx) {
+ if (!doEvictAfter) {
+ evict(elements, size, ctx);
}
+ }
- return toEvict;
+ @Override
+ public void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext ctx) {
+ if (doEvictAfter) {
+ evict(elements, size, ctx);
+ }
+ }
+
+ private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {
+ TimestampedValue<T> lastElement = Iterables.getLast(elements);
+ for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){
+ TimestampedValue<T> element = iterator.next();
+ if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {
+ iterator.remove();
+ }
+ }
}
@Override
@@ -66,6 +85,7 @@ public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
/**
* Creates a {@code DeltaEvictor} from the given threshold and {@code DeltaFunction}.
+ * Eviction is done before the window function.
*
* @param threshold The threshold
* @param deltaFunction The {@code DeltaFunction}
@@ -73,4 +93,16 @@ public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
return new DeltaEvictor<>(threshold, deltaFunction);
}
+
+ /**
+ * Creates a {@code DeltaEvictor} from the given threshold, {@code DeltaFunction}.
+ * Eviction is done before/after the window function based on the value of doEvictAfter.
+ *
+ * @param threshold The threshold
+ * @param deltaFunction The {@code DeltaFunction}
+ * @param doEvictAfter Whether eviction should be done after window function
+ */
+ public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction, boolean doEvictAfter) {
+ return new DeltaEvictor<>(threshold, deltaFunction, doEvictAfter);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
index d8e0daa..02e93eb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
@@ -18,14 +18,17 @@
package org.apache.flink.streaming.api.windowing.evictors;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
import java.io.Serializable;
/**
- * An {@code Evictor} can remove elements from a pane before it is being processed and after
- * window evaluation was triggered by a
- * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ * An {@code Evictor} can remove elements from a pane before/after the evaluation of WindowFunction and
+ * after the window evaluation gets triggered by a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}
*
* <p>
* A pane is the bucket of elements that have the same key (assigned by the
@@ -41,13 +44,48 @@ import java.io.Serializable;
public interface Evictor<T, W extends Window> extends Serializable {
/**
- * Computes how many elements should be removed from the pane. The result specifies how
- * many elements should be removed from the beginning.
+ * Optionally evicts elements. Called before windowing function.
+ *
+ * @param elements The elements currently in the pane.
+ * @param size The current number of elements in the pane.
+ * @param window The {@link Window}
+ * @param evictorContext The context for the Evictor
+ */
+ void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
+
+ /**
+ * Optionally evicts elements. Called after windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
+ * @param evictorContext The context for the Evictor
*/
- int evict(Iterable<StreamRecord<T>> elements, int size, W window);
+ void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
+
+
+ /**
+ * A context object that is given to {@link Evictor} methods
+ */
+ interface EvictorContext {
+
+ /**
+ * Returns the current processing time, as returned by
+ * the {@link ProcessingTimeService#getCurrentProcessingTime}.
+ */
+ long getCurrentProcessingTime();
+
+ /**
+ * Returns the metric group for this {@link Evictor}. This is the same metric
+ * group that would be returned from {@link RuntimeContext#getMetricGroup()} in a user
+ * function.
+ *
+ * <p>You must not call methods that create metric objects
+ * (such as {@link MetricGroup#counter(int)} multiple times but instead call once
+ * and store the metric object in a field.
+ */
+ MetricGroup getMetricGroup();
+
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
index 369a7ae..33d1cb5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -17,16 +17,18 @@
*/
package org.apache.flink.streaming.api.windowing.evictors;
-import com.google.common.collect.Iterables;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import java.util.Iterator;
/**
* An {@link Evictor} that keeps elements for a certain amount of time. Elements older
- * than {@code current_time - keep_time} are evicted.
+ * than {@code current_time - keep_time} are evicted. The current_time is time associated
+ * with {@link TimestampedValue}
*
* @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
*/
@@ -35,23 +37,71 @@ public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
private static final long serialVersionUID = 1L;
private final long windowSize;
+ private final boolean doEvictAfter;
public TimeEvictor(long windowSize) {
this.windowSize = windowSize;
+ this.doEvictAfter = false;
+ }
+
+ public TimeEvictor(long windowSize, boolean doEvictAfter) {
+ this.windowSize = windowSize;
+ this.doEvictAfter = doEvictAfter;
}
+
@Override
- public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
- int toEvict = 0;
- long currentTime = Iterables.getLast(elements).getTimestamp();
+ public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
+ if(!doEvictAfter) {
+ evict(elements,size,ctx);
+ }
+ }
+
+ @Override
+ public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
+ if(doEvictAfter) {
+ evict(elements,size,ctx);
+ }
+ }
+
+ private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
+ if (!hasTimestamp(elements)) {
+ return;
+ }
+
+ long currentTime = getMaxTimestamp(elements);
long evictCutoff = currentTime - windowSize;
- for (StreamRecord<Object> record: elements) {
- if (record.getTimestamp() > evictCutoff) {
- break;
+
+ for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
+ TimestampedValue<Object> record = iterator.next();
+ if (record.getTimestamp() <= evictCutoff) {
+ iterator.remove();
}
- toEvict++;
}
- return toEvict;
+ }
+
+ /**
+ * Returns true if the first element in the Iterable of {@link TimestampedValue} has a timestamp.
+ */
+ private boolean hasTimestamp(Iterable<TimestampedValue<Object>> elements) {
+ Iterator<TimestampedValue<Object>> it = elements.iterator();
+ if (it.hasNext()) {
+ return it.next().hasTimestamp();
+ }
+ return false;
+ }
+
+ /**
+ * @param elements The elements currently in the pane.
+ * @return The maximum value of timestamp among the elements.
+ */
+ private long getMaxTimestamp(Iterable<TimestampedValue<Object>> elements) {
+ long currentTime = Long.MIN_VALUE;
+ for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
+ TimestampedValue<Object> record = iterator.next();
+ currentTime = Math.max(currentTime, record.getTimestamp());
+ }
+ return currentTime;
}
@Override
@@ -66,10 +116,22 @@ public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
/**
* Creates a {@code TimeEvictor} that keeps the given number of elements.
+ * Eviction is done before the window function.
*
* @param windowSize The amount of time for which to keep elements.
*/
public static <W extends Window> TimeEvictor<W> of(Time windowSize) {
return new TimeEvictor<>(windowSize.toMilliseconds());
}
+
+ /**
+ * Creates a {@code TimeEvictor} that keeps the given number of elements.
+ * Eviction is done before/after the window function based on the value of doEvictAfter.
+ *
+ * @param windowSize The amount of time for which to keep elements.
+ * @param doEvictAfter Whether eviction is done after window function.
+ */
+ public static <W extends Window> TimeEvictor<W> of(Time windowSize, boolean doEvictAfter) {
+ return new TimeEvictor<>(windowSize.toMilliseconds(),doEvictAfter);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index f9b409e..3be3f5a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -47,8 +48,8 @@ import static java.util.Objects.requireNonNull;
* A {@link WindowOperator} that also allows an {@link Evictor} to be used.
*
* <p>
- * The {@code Evictor} is used to evict elements from panes before processing a window and after
- * a {@link Trigger} has fired.
+ * The {@code Evictor} is used to remove elements from a pane before/after the evaluation of WindowFunction and
+ * after the window evaluation gets triggered by a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
*
* @param <K> The type of key returned by the {@code KeySelector}.
* @param <IN> The type of the incoming elements.
@@ -62,6 +63,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
private final Evictor<? super IN, ? super W> evictor;
+ protected transient EvictorContext evictorContext = new EvictorContext(null, null);
+
private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor;
public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
@@ -146,6 +149,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
context.key = key;
context.window = actualWindow;
+ evictorContext.key = key;
+ evictorContext.window = actualWindow;
// we might have already fired because of a merge but still call onElement
// on the (possibly merged) window
@@ -158,7 +163,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
// if we have no state, there is nothing to do
continue;
}
- fire(actualWindow, contents);
+ fire(actualWindow, contents, windowState);
}
if (combinedTriggerResult.isPurge()) {
@@ -183,6 +188,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
context.key = key;
context.window = window;
+ evictorContext.key = key;
+ evictorContext.window = window;
TriggerResult triggerResult = context.onElement(element);
@@ -192,7 +199,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
// if we have no state, there is nothing to do
continue;
}
- fire(window, contents);
+ fire(window, contents, windowState);
}
if (triggerResult.isPurge()) {
@@ -209,6 +216,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
context.key = timer.getKey();
context.window = timer.getNamespace();
+ evictorContext.key = timer.getKey();
+ evictorContext.window = timer.getNamespace();
ListState<StreamRecord<IN>> windowState;
MergingWindowSet<W> mergingWindows = null;
@@ -238,7 +247,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
if (triggerResult.isFire()) {
- fire(context.window, contents);
+ fire(context.window, contents, windowState);
}
if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
@@ -250,6 +259,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
context.key = timer.getKey();
context.window = timer.getNamespace();
+ evictorContext.key = timer.getKey();
+ evictorContext.window = timer.getNamespace();
ListState<StreamRecord<IN>> windowState;
MergingWindowSet<W> mergingWindows = null;
@@ -276,7 +287,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
if (triggerResult.isFire()) {
- fire(context.window, contents);
+ fire(context.window, contents, windowState);
}
if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
@@ -284,22 +295,79 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
}
}
- private void fire(W window, Iterable<StreamRecord<IN>> contents) throws Exception {
+ private void fire(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
// Work around type system restrictions...
- int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window);
-
- FluentIterable<IN> projectedContents = FluentIterable
+ FluentIterable<TimestampedValue<IN>> recordsWithTimestamp = FluentIterable
.from(contents)
- .skip(toEvict)
- .transform(new Function<StreamRecord<IN>, IN>() {
+ .transform(new Function<StreamRecord<IN>, TimestampedValue<IN>>() {
+ @Override
+ public TimestampedValue<IN> apply(StreamRecord<IN> input) {
+ return TimestampedValue.from(input);
+ }
+ });
+ evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
+
+ FluentIterable<IN> projectedContents = recordsWithTimestamp
+ .transform(new Function<TimestampedValue<IN>, IN>() {
@Override
- public IN apply(StreamRecord<IN> input) {
+ public IN apply(TimestampedValue<IN> input) {
return input.getValue();
}
});
+
userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
+ evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
+
+
+ //work around to fix FLINK-4369, remove the evicted elements from the windowState.
+ //this is inefficient, but there is no other way to remove elements from ListState, which is an AppendingState.
+ windowState.clear();
+ for(TimestampedValue<IN> record : recordsWithTimestamp) {
+ windowState.add(record.getStreamRecord());
+ }
+ }
+
+
+ /**
+ * {@code EvictorContext} is a utility for handling {@code Evictor} invocations. It can be reused
+ * by setting the {@code key} and {@code window} fields. No internal state must be kept in
+ * the {@code EvictorContext}.
+ */
+
+ class EvictorContext implements Evictor.EvictorContext {
+
+ protected K key;
+ protected W window;
+
+ public EvictorContext(K key, W window) {
+ this.key = key;
+ this.window = window;
+ }
+
+ @Override
+ public long getCurrentProcessingTime() {
+ return EvictingWindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
+ }
+
+ @Override
+ public MetricGroup getMetricGroup() {
+ return EvictingWindowOperator.this.getMetricGroup();
+ }
+
+
+ public K getKey() {
+ return key;
+ }
+
+ void evictBefore(Iterable<TimestampedValue<IN>> elements, int size) {
+ evictor.evictBefore((Iterable)elements, size, window, this);
+ }
+
+ void evictAfter(Iterable<TimestampedValue<IN>> elements, int size) {
+ evictor.evictAfter((Iterable)elements, size, window, this);
+ }
}
private void cleanup(W window,
@@ -314,6 +382,24 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
context.clear();
}
+ @Override
+ public void open() throws Exception {
+ super.open();
+ evictorContext = new EvictorContext(null,null);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ evictorContext = null;
+ }
+
+ @Override
+ public void dispose() throws Exception{
+ super.dispose();
+ evictorContext = null;
+ }
+
// ------------------------------------------------------------------------
// Getters for testing
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java
new file mode 100644
index 0000000..9c63a69
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Stores the value and the timestamp of the record.
+ *
+ * @param <T> The type encapsulated value
+ */
+@PublicEvolving
+public class TimestampedValue<T> {
+
+ /** The actual value held by this record */
+ private T value;
+
+ /** The timestamp of the record */
+ private long timestamp;
+
+ /** Flag whether the timestamp is actually set */
+ private boolean hasTimestamp;
+
+ /**
+ * Creates a new TimestampedValue. The record does not have a timestamp.
+ */
+ public TimestampedValue(T value) {
+ this.value = value;
+ }
+
+ /**
+ * Creates a new TimestampedValue wrapping the given value. The timestamp is set to the
+ * given timestamp.
+ *
+ * @param value The value to wrap in this {@link TimestampedValue}
+ * @param timestamp The timestamp in milliseconds
+ */
+ public TimestampedValue(T value, long timestamp) {
+ this.value = value;
+ this.timestamp = timestamp;
+ this.hasTimestamp = true;
+ }
+
+ /**
+ * @return The value wrapped in this {@link TimestampedValue}.
+ */
+ public T getValue() {
+ return value;
+ }
+
+ /**
+ * @return The timestamp associated with this stream value in milliseconds.
+ */
+ public long getTimestamp() {
+ if (hasTimestamp) {
+ return timestamp;
+ } else {
+ throw new IllegalStateException(
+ "Record has no timestamp. Is the time characteristic set to 'ProcessingTime', or " +
+ "did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
+ }
+ }
+
+ /**
+ * Checks whether this record has a timestamp.
+ *
+ * @return True if the record has a timestamp, false if not.
+ */
+ public boolean hasTimestamp() {
+ return hasTimestamp;
+ }
+
+ /**
+ * Creates a {@link StreamRecord} from this TimestampedValue.
+ */
+ public StreamRecord<T> getStreamRecord() {
+ StreamRecord<T> streamRecord = new StreamRecord<>(value);
+ if (hasTimestamp) {
+ streamRecord.setTimestamp(timestamp);
+ }
+ return streamRecord;
+ }
+
+ /**
+ * Creates a TimestampedValue from given {@link StreamRecord}.
+ *
+ * @param streamRecord The StreamRecord object from which TimestampedValue is to be created.
+ */
+ public static <T> TimestampedValue<T> from(StreamRecord<T> streamRecord) {
+ if (streamRecord.hasTimestamp()) {
+ return new TimestampedValue<>(streamRecord.getValue(), streamRecord.getTimestamp());
+ } else {
+ return new TimestampedValue<>(streamRecord.getValue());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 229d97d..6ff3999 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -181,7 +181,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
@Override
- public final void open() throws Exception {
+ public void open() throws Exception {
super.open();
timestampedCollector = new TimestampedCollector<>(output);
@@ -200,7 +200,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
@Override
- public final void close() throws Exception {
+ public void close() throws Exception {
super.close();
timestampedCollector = null;
context = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 2e3d090..46495b0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -29,10 +29,13 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ReduceIterableWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.DeltaEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
@@ -58,6 +61,464 @@ public class EvictingWindowOperatorTest {
// For counting if close() is called the correct number of times on the SumReducer
+ /**
+ * Tests CountEvictor evictAfter behavior
+ * @throws Exception
+ */
+ @Test
+ public void testCountEvictorEvictAfter() throws Exception {
+ AtomicInteger closeCalled = new AtomicInteger(0);
+ final int WINDOW_SIZE = 4;
+ final int TRIGGER_COUNT = 2;
+ final boolean EVICT_AFTER = true;
+
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
+ (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+ ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
+ new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+
+ EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+ GlobalWindows.create(),
+ new GlobalWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
+ CountTrigger.of(TRIGGER_COUNT),
+ CountEvictor.of(WINDOW_SIZE,EVICT_AFTER),
+ 0);
+
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+
+ long initialTime = 0L;
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.close();
+
+ Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+
+ }
+
+ /**
+ * Tests TimeEvictor evictAfter behavior
+ * @throws Exception
+ */
+ @Test
+ public void testTimeEvictorEvictAfter() throws Exception {
+ AtomicInteger closeCalled = new AtomicInteger(0);
+ final int TRIGGER_COUNT = 2;
+ final boolean EVICT_AFTER = true;
+
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
+ (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+ ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
+ new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+
+ EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+ GlobalWindows.create(),
+ new GlobalWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
+ CountTrigger.of(TRIGGER_COUNT),
+ TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
+ 0);
+
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+ long initialTime = 0L;
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4000));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2001));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1001));
+
+
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), Long.MAX_VALUE));
+
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1002));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.close();
+
+ Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+
+ }
+
+ /**
+ * Tests TimeEvictor evictBefore behavior
+ * @throws Exception
+ */
+ @Test
+ public void testTimeEvictorEvictBefore() throws Exception {
+ AtomicInteger closeCalled = new AtomicInteger(0);
+ final int TRIGGER_COUNT = 2;
+ final int WINDOW_SIZE = 4;
+
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
+ (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+ ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
+ new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+
+ EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new EvictingWindowOperator<>(
+ TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ new TimeWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)),
+ CountTrigger.of(TRIGGER_COUNT),
+ TimeEvictor.of(Time.seconds(2)),
+ 0);
+
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+ long initialTime = 0L;
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 5999));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2001));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1001));
+
+
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 3999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 3999));
+
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 6500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1002));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 7999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 3999));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.close();
+
+ Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+
+ }
+
+ /**
+ * Tests time evictor, if no timestamp information in the StreamRecord
+ * No element will be evicted from the window
+ * @throws Exception
+ */
+ @Test
+ public void testTimeEvictorNoTimestamp() throws Exception {
+ AtomicInteger closeCalled = new AtomicInteger(0);
+ final int TRIGGER_COUNT = 2;
+ final boolean EVICT_AFTER = true;
+
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
+ (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+ ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
+ new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+
+ EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+ GlobalWindows.create(),
+ new GlobalWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
+ CountTrigger.of(TRIGGER_COUNT),
+ TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
+ 0);
+
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
+
+
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
+
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.close();
+
+ Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+
+ }
+
+ /**
+ * Tests DeltaEvictor, evictBefore behavior
+ * @throws Exception
+ */
+ @Test
+ public void testDeltaEvictorEvictBefore() throws Exception {
+ AtomicInteger closeCalled = new AtomicInteger(0);
+ final int TRIGGER_COUNT = 2;
+ final boolean EVICT_AFTER = false;
+ final int THRESHOLD = 2;
+
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
+ (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+ ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
+ new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+
+ EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+ GlobalWindows.create(),
+ new GlobalWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
+ CountTrigger.of(TRIGGER_COUNT),
+ DeltaEvictor.of(THRESHOLD, new DeltaFunction<Tuple2<String, Integer>>() {
+ @Override
+ public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) {
+ return newDataPoint.f1 - oldDataPoint.f1;
+ }
+ }, EVICT_AFTER),
+ 0);
+
+
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+ long initialTime = 0L;
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 3999));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 5), initialTime + 999));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 1998));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 1999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 11), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 10999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 1000));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 8), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 10), Long.MAX_VALUE));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.close();
+
+ Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+ }
+
+ /**
+ * Tests DeltaEvictor, evictAfter behavior
+ * @throws Exception
+ */
+ @Test
+ public void testDeltaEvictorEvictAfter() throws Exception {
+ AtomicInteger closeCalled = new AtomicInteger(0);
+ final int TRIGGER_COUNT = 2;
+ final boolean EVICT_AFTER = true;
+ final int THRESHOLD = 2;
+
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
+ (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+ ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
+ new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+
+ EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+ GlobalWindows.create(),
+ new GlobalWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
+ CountTrigger.of(TRIGGER_COUNT),
+ DeltaEvictor.of(THRESHOLD, new DeltaFunction<Tuple2<String, Integer>>() {
+ @Override
+ public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) {
+ return newDataPoint.f1 - oldDataPoint.f1;
+ }
+ }, EVICT_AFTER),
+ 0);
+
+
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+ long initialTime = 0L;
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 3999));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 5), initialTime + 999));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 1998));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 1999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 15), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 9), initialTime + 10999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 1000));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 16), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 22), Long.MAX_VALUE));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.close();
+
+ Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+ }
+
@Test
@SuppressWarnings("unchecked")
public void testCountTrigger() throws Exception {
@@ -316,6 +777,7 @@ public class EvictingWindowOperatorTest {
for (Tuple2<String, Integer> t: input) {
sum += t.f1;
}
+
out.collect(new Tuple2<>(key, sum));
}