You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/15 09:07:28 UTC

flink git commit: [FLINK-4174] Enhance evictor functionality

Repository: flink
Updated Branches:
  refs/heads/master 62192c783 -> 74bb7bb63


[FLINK-4174] Enhance evictor functionality


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74bb7bb6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74bb7bb6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74bb7bb6

Branch: refs/heads/master
Commit: 74bb7bb63919ce6de5736d52e4e5a254cf9b6509
Parents: 62192c7
Author: Vishnu Viswanath <vi...@gmail.com>
Authored: Mon Oct 31 18:21:04 2016 -0500
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Nov 15 10:05:24 2016 +0100

----------------------------------------------------------------------
 .../api/windowing/evictors/CountEvictor.java    |  54 ++-
 .../api/windowing/evictors/DeltaEvictor.java    |  56 ++-
 .../api/windowing/evictors/Evictor.java         |  52 ++-
 .../api/windowing/evictors/TimeEvictor.java     |  84 +++-
 .../windowing/EvictingWindowOperator.java       | 112 ++++-
 .../operators/windowing/TimestampedValue.java   | 112 +++++
 .../operators/windowing/WindowOperator.java     |   4 +-
 .../windowing/EvictingWindowOperatorTest.java   | 462 +++++++++++++++++++
 8 files changed, 886 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
index dc82521..8f5b2d9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
@@ -19,7 +19,9 @@ package org.apache.flink.streaming.api.windowing.evictors;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import java.util.Iterator;
 
 /**
  * An {@link Evictor} that keeps up to a certain amount of elements.
@@ -31,26 +33,68 @@ public class CountEvictor<W extends Window> implements Evictor<Object, W> {
 	private static final long serialVersionUID = 1L;
 
 	private final long maxCount;
+	private final boolean doEvictAfter;
+
+	private CountEvictor(long count,boolean doEvictAfter) {
+		this.maxCount = count;
+		this.doEvictAfter = doEvictAfter;
+	}
 
 	private CountEvictor(long count) {
 		this.maxCount = count;
+		this.doEvictAfter = false;
 	}
 
 	@Override
-	public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
-		if (size > maxCount) {
-			return (int) (size - maxCount);
+	public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
+		if (!doEvictAfter) {
+			evict(elements, size, ctx);
+		}
+	}
+
+
+	@Override
+	public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size,W window, EvictorContext ctx) {
+		if (doEvictAfter) {
+			evict(elements, size, ctx);
+		}
+	}
+
+	private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
+		if (size <= maxCount) {
+			return;
 		} else {
-			return 0;
+			int evictedCount = 0;
+			for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
+				iterator.next();
+				evictedCount++;
+				if (evictedCount > size - maxCount) {
+					break;
+				} else {
+					iterator.remove();
+				}
+			}
 		}
 	}
 
 	/**
 	 * Creates a {@code CountEvictor} that keeps the given number of elements.
+	 * Eviction is done before the window function.
 	 *
 	 * @param maxCount The number of elements to keep in the pane.
 	 */
 	public static <W extends Window> CountEvictor<W> of(long maxCount) {
 		return new CountEvictor<>(maxCount);
 	}
+
+	/**
+	 * Creates a {@code CountEvictor} that keeps the given number of elements in the pane
+	 * Eviction is done before/after the window function based on the value of doEvictAfter.
+	 *
+	 * @param maxCount The number of elements to keep in the pane.
+	 * @param doEvictAfter Whether to do eviction after the window function.
+     */
+	public static <W extends Window> CountEvictor<W> of(long maxCount, boolean doEvictAfter) {
+		return new CountEvictor<>(maxCount,doEvictAfter);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
index ef4dad6..7ae33b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
@@ -21,15 +21,16 @@ import com.google.common.collect.Iterables;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import java.util.Iterator;
 
 /**
  * An {@link Evictor} that keeps elements based on a {@link DeltaFunction} and a threshold.
  *
  * <p>
  * Eviction starts from the first element of the buffer and removes all elements from the buffer
- * which have a higher delta then the threshold. As soon as there is an element with a lower delta,
- * the eviction stops.
+ * which have a higher delta then the threshold.
  *
  * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
  */
@@ -39,24 +40,42 @@ public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
 
 	DeltaFunction<T> deltaFunction;
 	private double threshold;
+	private final boolean doEvictAfter;
 
 	private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction) {
 		this.deltaFunction = deltaFunction;
 		this.threshold = threshold;
+		this.doEvictAfter = false;
+	}
+
+	private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction, boolean doEvictAfter) {
+		this.deltaFunction = deltaFunction;
+		this.threshold = threshold;
+		this.doEvictAfter = doEvictAfter;
 	}
 
 	@Override
-	public int evict(Iterable<StreamRecord<T>> elements, int size, W window) {
-		StreamRecord<T> lastElement = Iterables.getLast(elements);
-		int toEvict = 0;
-		for (StreamRecord<T> element : elements) {
-			if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) < this.threshold) {
-				break;
-			}
-			toEvict++;
+	public void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext ctx) {
+		if (!doEvictAfter) {
+			evict(elements, size, ctx);
 		}
+	}
 
-		return toEvict;
+	@Override
+	public void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext ctx) {
+		if (doEvictAfter) {
+			evict(elements, size, ctx);
+		}
+	}
+
+	private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {
+		TimestampedValue<T> lastElement = Iterables.getLast(elements);
+		for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){
+			TimestampedValue<T> element = iterator.next();
+			if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {
+				iterator.remove();
+			}
+		}
 	}
 
 	@Override
@@ -66,6 +85,7 @@ public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
 
 	/**
 	 * Creates a {@code DeltaEvictor} from the given threshold and {@code DeltaFunction}.
+	 * Eviction is done before the window function.
 	 *
 	 * @param threshold The threshold
 	 * @param deltaFunction The {@code DeltaFunction}
@@ -73,4 +93,16 @@ public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
 	public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
 		return new DeltaEvictor<>(threshold, deltaFunction);
 	}
+
+	/**
+	 * Creates a {@code DeltaEvictor} from the given threshold, {@code DeltaFunction}.
+	 * Eviction is done before/after the window function based on the value of doEvictAfter.
+	 *
+	 * @param threshold The threshold
+	 * @param deltaFunction The {@code DeltaFunction}
+	 * @param doEvictAfter Whether eviction should be done after window function
+     */
+	public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction, boolean doEvictAfter) {
+		return new DeltaEvictor<>(threshold, deltaFunction, doEvictAfter);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
index d8e0daa..02e93eb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
@@ -18,14 +18,17 @@
 package org.apache.flink.streaming.api.windowing.evictors;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
 import java.io.Serializable;
 
 /**
- * An {@code Evictor} can remove elements from a pane before it is being processed and after
- * window evaluation was triggered by a
- * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ * An {@code Evictor} can remove elements from a pane before/after the evaluation of WindowFunction and
+ * after the window evaluation gets triggered by a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}
  *
  * <p>
  * A pane is the bucket of elements that have the same key (assigned by the
@@ -41,13 +44,48 @@ import java.io.Serializable;
 public interface Evictor<T, W extends Window> extends Serializable {
 
 	/**
-	 * Computes how many elements should be removed from the pane. The result specifies how
-	 * many elements should be removed from the beginning.
+	 * Optionally evicts elements. Called before windowing function.
+	 *
+	 * @param elements The elements currently in the pane.
+	 * @param size The current number of elements in the pane.
+	 * @param window The {@link Window}
+	 * @param evictorContext The context for the Evictor
+     */
+	void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
+
+	/**
+	 * Optionally evicts elements. Called after windowing function.
 	 *
 	 * @param elements The elements currently in the pane.
 	 * @param size The current number of elements in the pane.
 	 * @param window The {@link Window}
+	 * @param evictorContext The context for the Evictor
 	 */
-	int evict(Iterable<StreamRecord<T>> elements, int size, W window);
+	void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
+
+
+	/**
+	 * A context object that is given to {@link Evictor} methods
+	 */
+	interface EvictorContext {
+
+		/**
+		 * Returns the current processing time, as returned by
+		 * the {@link ProcessingTimeService#getCurrentProcessingTime}.
+		 */
+		long getCurrentProcessingTime();
+
+		/**
+		 * Returns the metric group for this {@link Evictor}. This is the same metric
+		 * group that would be returned from {@link RuntimeContext#getMetricGroup()} in a user
+		 * function.
+		 *
+		 * <p>You must not call methods that create metric objects
+		 * (such as {@link MetricGroup#counter(int)} multiple times but instead call once
+		 * and store the metric object in a field.
+		 */
+		MetricGroup getMetricGroup();
+
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
index 369a7ae..33d1cb5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -17,16 +17,18 @@
  */
 package org.apache.flink.streaming.api.windowing.evictors;
 
-import com.google.common.collect.Iterables;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import java.util.Iterator;
 
 /**
  * An {@link Evictor} that keeps elements for a certain amount of time. Elements older
- * than {@code current_time - keep_time} are evicted.
+ * than {@code current_time - keep_time} are evicted. The current_time is time associated
+ * with {@link TimestampedValue}
  *
  * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
  */
@@ -35,23 +37,71 @@ public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
 	private static final long serialVersionUID = 1L;
 
 	private final long windowSize;
+	private final boolean doEvictAfter;
 
 	public TimeEvictor(long windowSize) {
 		this.windowSize = windowSize;
+		this.doEvictAfter = false;
+	}
+
+	public TimeEvictor(long windowSize, boolean doEvictAfter) {
+		this.windowSize = windowSize;
+		this.doEvictAfter = doEvictAfter;
 	}
 
+
 	@Override
-	public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
-		int toEvict = 0;
-		long currentTime = Iterables.getLast(elements).getTimestamp();
+	public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
+		if(!doEvictAfter) {
+			evict(elements,size,ctx);
+		}
+	}
+
+	@Override
+	public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
+		if(doEvictAfter) {
+			evict(elements,size,ctx);
+		}
+	}
+
+	private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
+		if (!hasTimestamp(elements)) {
+			return;
+		}
+
+		long currentTime = getMaxTimestamp(elements);
 		long evictCutoff = currentTime - windowSize;
-		for (StreamRecord<Object> record: elements) {
-			if (record.getTimestamp() > evictCutoff) {
-				break;
+
+		for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
+			TimestampedValue<Object> record = iterator.next();
+			if (record.getTimestamp() <= evictCutoff) {
+				iterator.remove();
 			}
-			toEvict++;
 		}
-		return toEvict;
+	}
+
+	/**
+	 * Returns true if the first element in the Iterable of {@link TimestampedValue} has a timestamp.
+     */
+	private boolean hasTimestamp(Iterable<TimestampedValue<Object>> elements) {
+		Iterator<TimestampedValue<Object>> it = elements.iterator();
+		if (it.hasNext()) {
+			return it.next().hasTimestamp();
+		}
+		return false;
+	}
+
+	/**
+	 * @param elements The elements currently in the pane.
+	 * @return The maximum value of timestamp among the elements.
+     */
+	private long getMaxTimestamp(Iterable<TimestampedValue<Object>> elements) {
+		long currentTime = Long.MIN_VALUE;
+		for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
+			TimestampedValue<Object> record = iterator.next();
+			currentTime = Math.max(currentTime, record.getTimestamp());
+		}
+		return currentTime;
 	}
 
 	@Override
@@ -66,10 +116,22 @@ public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
 
 	/**
 	 * Creates a {@code TimeEvictor} that keeps the given number of elements.
+	 * Eviction is done before the window function.
 	 *
 	 * @param windowSize The amount of time for which to keep elements.
 	 */
 	public static <W extends Window> TimeEvictor<W> of(Time windowSize) {
 		return new TimeEvictor<>(windowSize.toMilliseconds());
 	}
+
+	/**
+	 * Creates a {@code TimeEvictor} that keeps the given number of elements.
+	 * Eviction is done before/after the window function based on the value of doEvictAfter.
+	 *
+	 * @param windowSize The amount of time for which to keep elements.
+	 * @param doEvictAfter Whether eviction is done after window function.
+     */
+	public static <W extends Window> TimeEvictor<W> of(Time windowSize, boolean doEvictAfter) {
+		return new TimeEvictor<>(windowSize.toMilliseconds(),doEvictAfter);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index f9b409e..3be3f5a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -47,8 +48,8 @@ import static java.util.Objects.requireNonNull;
  * A {@link WindowOperator} that also allows an {@link Evictor} to be used.
  *
  * <p>
- * The {@code Evictor} is used to evict elements from panes before processing a window and after
- * a {@link Trigger} has fired.
+ * The {@code Evictor} is used to remove elements from a pane before/after the evaluation of WindowFunction and
+ * after the window evaluation gets triggered by a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
  *
  * @param <K> The type of key returned by the {@code KeySelector}.
  * @param <IN> The type of the incoming elements.
@@ -62,6 +63,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 
 	private final Evictor<? super IN, ? super W> evictor;
 
+	protected transient EvictorContext evictorContext = new EvictorContext(null, null);
+
 	private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor;
 
 	public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
@@ -146,6 +149,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 
 				context.key = key;
 				context.window = actualWindow;
+				evictorContext.key = key;
+				evictorContext.window = actualWindow;
 
 				// we might have already fired because of a merge but still call onElement
 				// on the (possibly merged) window
@@ -158,7 +163,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 						// if we have no state, there is nothing to do
 						continue;
 					}
-					fire(actualWindow, contents);
+					fire(actualWindow, contents, windowState);
 				}
 
 				if (combinedTriggerResult.isPurge()) {
@@ -183,6 +188,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 
 				context.key = key;
 				context.window = window;
+				evictorContext.key = key;
+				evictorContext.window = window;
 
 				TriggerResult triggerResult = context.onElement(element);
 
@@ -192,7 +199,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 						// if we have no state, there is nothing to do
 						continue;
 					}
-					fire(window, contents);
+					fire(window, contents, windowState);
 				}
 
 				if (triggerResult.isPurge()) {
@@ -209,6 +216,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 
 		context.key = timer.getKey();
 		context.window = timer.getNamespace();
+		evictorContext.key = timer.getKey();
+		evictorContext.window = timer.getNamespace();
 
 		ListState<StreamRecord<IN>> windowState;
 		MergingWindowSet<W> mergingWindows = null;
@@ -238,7 +247,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 
 		TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
 		if (triggerResult.isFire()) {
-			fire(context.window, contents);
+			fire(context.window, contents, windowState);
 		}
 
 		if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
@@ -250,6 +259,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 	public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
 		context.key = timer.getKey();
 		context.window = timer.getNamespace();
+		evictorContext.key = timer.getKey();
+		evictorContext.window = timer.getNamespace();
 
 		ListState<StreamRecord<IN>> windowState;
 		MergingWindowSet<W> mergingWindows = null;
@@ -276,7 +287,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 
 		TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
 		if (triggerResult.isFire()) {
-			fire(context.window, contents);
+			fire(context.window, contents, windowState);
 		}
 
 		if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
@@ -284,22 +295,79 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 		}
 	}
 
-	private void fire(W window, Iterable<StreamRecord<IN>> contents) throws Exception {
+	private void fire(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
 		timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
 
 		// Work around type system restrictions...
-		int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window);
-
-		FluentIterable<IN> projectedContents = FluentIterable
+		FluentIterable<TimestampedValue<IN>> recordsWithTimestamp = FluentIterable
 			.from(contents)
-			.skip(toEvict)
-			.transform(new Function<StreamRecord<IN>, IN>() {
+			.transform(new Function<StreamRecord<IN>, TimestampedValue<IN>>() {
+				@Override
+				public TimestampedValue<IN> apply(StreamRecord<IN> input) {
+					return TimestampedValue.from(input);
+				}
+			});
+		evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
+
+		FluentIterable<IN> projectedContents = recordsWithTimestamp
+			.transform(new Function<TimestampedValue<IN>, IN>() {
 				@Override
-				public IN apply(StreamRecord<IN> input) {
+				public IN apply(TimestampedValue<IN> input) {
 					return input.getValue();
 				}
 			});
+
 		userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
+		evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
+
+
+		//work around to fix FLINK-4369, remove the evicted elements from the windowState.
+		//this is inefficient, but there is no other way to remove elements from ListState, which is an AppendingState.
+		windowState.clear();
+		for(TimestampedValue<IN> record : recordsWithTimestamp) {
+			windowState.add(record.getStreamRecord());
+		}
+	}
+
+
+	/**
+	 * {@code EvictorContext} is a utility for handling {@code Evictor} invocations. It can be reused
+	 * by setting the {@code key} and {@code window} fields. No internal state must be kept in
+	 * the {@code EvictorContext}.
+	 */
+
+	class EvictorContext implements Evictor.EvictorContext {
+
+		protected K key;
+		protected W window;
+
+		public EvictorContext(K key, W window) {
+			this.key = key;
+			this.window = window;
+		}
+
+		@Override
+		public long getCurrentProcessingTime() {
+			return EvictingWindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
+		}
+
+		@Override
+		public MetricGroup getMetricGroup() {
+			return EvictingWindowOperator.this.getMetricGroup();
+		}
+
+
+		public K getKey() {
+			return key;
+		}
+
+		void evictBefore(Iterable<TimestampedValue<IN>> elements, int size) {
+			evictor.evictBefore((Iterable)elements, size, window, this);
+		}
+
+		void evictAfter(Iterable<TimestampedValue<IN>>  elements, int size) {
+			evictor.evictAfter((Iterable)elements, size, window, this);
+		}
 	}
 
 	private void cleanup(W window,
@@ -314,6 +382,24 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 		context.clear();
 	}
 
+	@Override
+	public void open() throws Exception {
+		super.open();
+		evictorContext = new EvictorContext(null,null);
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		evictorContext = null;
+	}
+
+	@Override
+	public void dispose() throws Exception{
+		super.dispose();
+		evictorContext = null;
+	}
+
 	// ------------------------------------------------------------------------
 	// Getters for testing
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java
new file mode 100644
index 0000000..9c63a69
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Stores the value and the timestamp of the record.
+ *
+ * @param <T> The type encapsulated value
+ */
+@PublicEvolving
+public class TimestampedValue<T> {
+
+	/** The actual value held by this record */
+	private T value;
+
+	/** The timestamp of the record */
+	private long timestamp;
+
+	/** Flag whether the timestamp is actually set */
+	private boolean hasTimestamp;
+
+	/**
+	 * Creates a new TimestampedValue. The record does not have a timestamp.
+	 */
+	public TimestampedValue(T value) {
+		this.value = value;
+	}
+
+	/**
+	 * Creates a new TimestampedValue wrapping the given value. The timestamp is set to the
+	 * given timestamp.
+	 *
+	 * @param value The value to wrap in this {@link TimestampedValue}
+	 * @param timestamp The timestamp in milliseconds
+	 */
+	public TimestampedValue(T value, long timestamp) {
+		this.value = value;
+		this.timestamp = timestamp;
+		this.hasTimestamp = true;
+	}
+
+	/**
+	 * @return The value wrapped in this {@link TimestampedValue}.
+	 */
+	public T getValue() {
+		return value;
+	}
+
+	/**
+	 * @return The timestamp associated with this stream value in milliseconds.
+     */
+	public long getTimestamp() {
+		if (hasTimestamp) {
+			return timestamp;
+		} else {
+			throw new IllegalStateException(
+					"Record has no timestamp. Is the time characteristic set to 'ProcessingTime', or " +
+							"did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
+		}
+	}
+
+	/**
+	 * Checks whether this record has a timestamp.
+	 *
+	 * @return True if the record has a timestamp, false if not.
+	 */
+	public boolean hasTimestamp() {
+		return hasTimestamp;
+	}
+
+	/**
+	 * Creates a {@link StreamRecord} from this TimestampedValue.
+     */
+	public StreamRecord<T> getStreamRecord() {
+		StreamRecord<T> streamRecord = new StreamRecord<>(value);
+		if (hasTimestamp) {
+			streamRecord.setTimestamp(timestamp);
+		}
+		return streamRecord;
+	}
+
+	/**
+	 * Creates a TimestampedValue from given {@link StreamRecord}.
+	 *
+	 * @param streamRecord The StreamRecord object from which TimestampedValue is to be created.
+     */
+	public static <T> TimestampedValue<T> from(StreamRecord<T> streamRecord) {
+		if (streamRecord.hasTimestamp()) {
+			return new TimestampedValue<>(streamRecord.getValue(), streamRecord.getTimestamp());
+		} else {
+			return new TimestampedValue<>(streamRecord.getValue());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 229d97d..6ff3999 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -181,7 +181,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	}
 
 	@Override
-	public final void open() throws Exception {
+	public void open() throws Exception {
 		super.open();
 
 		timestampedCollector = new TimestampedCollector<>(output);
@@ -200,7 +200,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	}
 
 	@Override
-	public final void close() throws Exception {
+	public void close() throws Exception {
 		super.close();
 		timestampedCollector = null;
 		context = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 2e3d090..46495b0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -29,10 +29,13 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ReduceIterableWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.DeltaEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
@@ -58,6 +61,464 @@ public class EvictingWindowOperatorTest {
 
 	// For counting if close() is called the correct number of times on the SumReducer
 
+	/**
+	 * Tests CountEvictor evictAfter behavior
+	 * @throws Exception
+     */
+	@Test
+	public void testCountEvictorEvictAfter() throws Exception {
+		AtomicInteger closeCalled = new AtomicInteger(0);
+		final int WINDOW_SIZE = 4;
+		final int TRIGGER_COUNT = 2;
+		final boolean EVICT_AFTER = true;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
+			(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+		ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
+			new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+
+		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+			GlobalWindows.create(),
+			new GlobalWindow.Serializer(),
+			new TupleKeySelector(),
+			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+			stateDesc,
+			new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
+			CountTrigger.of(TRIGGER_COUNT),
+			CountEvictor.of(WINDOW_SIZE,EVICT_AFTER),
+			0);
+
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.close();
+
+		Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+
+	}
+
+	/**
+	 * Tests TimeEvictor evictAfter behavior
+	 * @throws Exception
+	 */
+	@Test
+	public void testTimeEvictorEvictAfter() throws Exception {
+		AtomicInteger closeCalled = new AtomicInteger(0);
+		final int TRIGGER_COUNT = 2;
+		final boolean EVICT_AFTER = true;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
+			(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+		ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
+			new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+
+		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+			GlobalWindows.create(),
+			new GlobalWindow.Serializer(),
+			new TupleKeySelector(),
+			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+			stateDesc,
+			new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
+			CountTrigger.of(TRIGGER_COUNT),
+			TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
+			0);
+
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4000));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2001));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1001));
+
+
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), Long.MAX_VALUE));
+
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1002));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.close();
+
+		Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+
+	}
+
+	/**
+	 * Tests TimeEvictor evictBefore behavior
+	 * @throws Exception
+	 */
+	@Test
+	public void testTimeEvictorEvictBefore() throws Exception {
+		AtomicInteger closeCalled = new AtomicInteger(0);
+		final int TRIGGER_COUNT = 2;
+		final int WINDOW_SIZE = 4;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
+			(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+		ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
+			new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+
+		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new EvictingWindowOperator<>(
+			TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+			new TimeWindow.Serializer(),
+			new TupleKeySelector(),
+			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+			stateDesc,
+			new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)),
+			CountTrigger.of(TRIGGER_COUNT),
+			TimeEvictor.of(Time.seconds(2)),
+			0);
+
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 5999));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2001));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1001));
+
+
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 3999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 3999));
+
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 6500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1002));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 7999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 3999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.close();
+
+		Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+
+	}
+
+	/**
+	 * Tests time evictor, if no timestamp information in the StreamRecord
+	 * No element will be evicted from the window
+	 * @throws Exception
+	 */
+	@Test
+	public void testTimeEvictorNoTimestamp() throws Exception {
+		AtomicInteger closeCalled = new AtomicInteger(0);
+		final int TRIGGER_COUNT = 2;
+		final boolean EVICT_AFTER = true;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
+			(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+		ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
+			new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+
+		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+			GlobalWindows.create(),
+			new GlobalWindow.Serializer(),
+			new TupleKeySelector(),
+			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+			stateDesc,
+			new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
+			CountTrigger.of(TRIGGER_COUNT),
+			TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
+			0);
+
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
+
+
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
+
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.close();
+
+		Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+
+	}
+
+	/**
+	 * Tests DeltaEvictor, evictBefore behavior
+	 * @throws Exception
+	 */
+	@Test
+	public void testDeltaEvictorEvictBefore() throws Exception {
+		AtomicInteger closeCalled = new AtomicInteger(0);
+		final int TRIGGER_COUNT = 2;
+		final boolean EVICT_AFTER = false;
+		final int THRESHOLD = 2;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
+			(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+		ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
+			new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+
+		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+			GlobalWindows.create(),
+			new GlobalWindow.Serializer(),
+			new TupleKeySelector(),
+			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+			stateDesc,
+			new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
+			CountTrigger.of(TRIGGER_COUNT),
+			DeltaEvictor.of(THRESHOLD, new DeltaFunction<Tuple2<String, Integer>>() {
+				@Override
+				public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) {
+					return newDataPoint.f1 - oldDataPoint.f1;
+				}
+			}, EVICT_AFTER),
+			0);
+
+
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 3999));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 5), initialTime + 999));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 1998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 11), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 10999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 1000));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 8), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 10), Long.MAX_VALUE));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.close();
+
+		Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+	}
+
+	/**
+	 * Tests DeltaEvictor, evictAfter behavior
+	 * @throws Exception
+	 */
+	@Test
+	public void testDeltaEvictorEvictAfter() throws Exception {
+		AtomicInteger closeCalled = new AtomicInteger(0);
+		final int TRIGGER_COUNT = 2;
+		final boolean EVICT_AFTER = true;
+		final int THRESHOLD = 2;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
+			(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+		ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
+			new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+
+		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+			GlobalWindows.create(),
+			new GlobalWindow.Serializer(),
+			new TupleKeySelector(),
+			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+			stateDesc,
+			new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
+			CountTrigger.of(TRIGGER_COUNT),
+			DeltaEvictor.of(THRESHOLD, new DeltaFunction<Tuple2<String, Integer>>() {
+				@Override
+				public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) {
+					return newDataPoint.f1 - oldDataPoint.f1;
+				}
+			}, EVICT_AFTER),
+			0);
+
+
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 3999));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 5), initialTime + 999));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 1998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 15), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 9), initialTime + 10999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 1000));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 16), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 22), Long.MAX_VALUE));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.close();
+
+		Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+	}
+
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testCountTrigger() throws Exception {
@@ -316,6 +777,7 @@ public class EvictingWindowOperatorTest {
 			for (Tuple2<String, Integer> t: input) {
 				sum += t.f1;
 			}
+
 			out.collect(new Tuple2<>(key, sum));
 
 		}