You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/09/28 18:15:07 UTC

[11/12] flink git commit: [FLINK-2677] Add a general-purpose keyed-window operator

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
new file mode 100644
index 0000000..cddcc42
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -0,0 +1,115 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends WindowOperator<K, IN, OUT, W> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(EvictingWindowOperator.class);
+
+	private final Evictor<? super IN, ? super W> evictor;
+
+	public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+			KeySelector<IN, K> keySelector,
+			WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
+			KeyedWindowFunction<IN, OUT, K, W> windowFunction,
+			Trigger<? super IN, ? super W> trigger,
+			Evictor<? super IN, ? super W> evictor) {
+		super(windowAssigner, keySelector, windowBufferFactory, windowFunction, trigger);
+		this.evictor = evictor;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked, rawtypes")
+	protected void emitWindow(K key, W window, boolean purge) throws Exception {
+
+		timestampedCollector.setTimestamp(window.getEnd());
+
+		Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key);
+
+		if (keyWindows == null) {
+			LOG.debug("Window {} for key {} already gone.", window, key);
+			return;
+		}
+
+		Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
+		if (purge) {
+			bufferAndTrigger = keyWindows.remove(window);
+		} else {
+			bufferAndTrigger = keyWindows.get(window);
+		}
+
+		if (bufferAndTrigger == null) {
+			LOG.debug("Window {} for key {} already gone.", window, key);
+			return;
+		}
+
+
+		EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) bufferAndTrigger.f0;
+
+		int toEvict = 0;
+		if (windowBuffer.size() > 0) {
+			// need some type trickery here...
+			toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), window);
+		}
+
+		windowBuffer.removeElements(toEvict);
+
+		userFunction.evaluate(key,
+				window,
+				bufferAndTrigger.f0.getUnpackedElements(),
+				timestampedCollector);
+
+		if (keyWindows.isEmpty()) {
+			windows.remove(key);
+		}
+	}
+
+	@Override
+	public EvictingWindowOperator<K, IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
+		super.enableSetProcessingTime(setProcessingTime);
+		return this;
+	}
+
+
+	// ------------------------------------------------------------------------
+	// Getters for testing
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	public Evictor<? super IN, ? super W> getEvictor() {
+		return evictor;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
index b1ff7e2..880c85c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
@@ -18,15 +18,41 @@
 
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.commons.lang.SerializationUtils;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.DeltaEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Count;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Delta;
 import org.apache.flink.streaming.api.windowing.windowpolicy.EventTime;
 import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
 import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
 
 /**
  * This class implements the conversion from window policies to concrete operator
@@ -55,7 +81,7 @@ public class PolicyToOperator {
 
 				@SuppressWarnings("unchecked")
 				OneInputStreamOperator<IN, OUT> op = (OneInputStreamOperator<IN, OUT>)
-						new AggregatingProcessingTimeWindowOperator<KEY, IN>(
+						new AggregatingProcessingTimeWindowOperator<>(
 								reducer, keySelector, windowLength, windowSlide);
 				return op;
 			}
@@ -63,17 +89,147 @@ public class PolicyToOperator {
 				@SuppressWarnings("unchecked")
 				KeyedWindowFunction<IN, OUT, KEY, Window> wf = (KeyedWindowFunction<IN, OUT, KEY, Window>) function;
 
-				return new AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>(
+				return new AccumulatingProcessingTimeWindowOperator<>(
 								wf, keySelector, windowLength, windowSlide);
 			}
 		}
 
 		// -- case 2: both policies are event time policies
 		if (window instanceof EventTime && (slide == null || slide instanceof EventTime)) {
-			// add event time implementation
+			final long windowLength = ((EventTime) window).toMilliseconds();
+			final long windowSlide = slide == null ? windowLength : ((EventTime) slide).toMilliseconds();
+
+			WindowAssigner<? super IN, TimeWindow> assigner;
+			if (windowSlide == windowLength) {
+				assigner = TumblingTimeWindows.of(windowLength);
+			} else {
+				assigner = SlidingTimeWindows.of(windowLength, windowSlide);
+			}
+			WindowBufferFactory<IN, ? extends WindowBuffer<IN>> windowBuffer;
+			if (function instanceof ReduceFunction) {
+				@SuppressWarnings("unchecked")
+				ReduceFunction<IN> reducer = (ReduceFunction<IN>) SerializationUtils.clone(function);
+				function = new ReduceWindowFunction<>(reducer);
+				windowBuffer = new PreAggregatingHeapWindowBuffer.Factory<>(reducer);
+			} else {
+				windowBuffer = new HeapWindowBuffer.Factory<>();
+			}
+			@SuppressWarnings("unchecked")
+			KeyedWindowFunction<IN, OUT, KEY, TimeWindow> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, TimeWindow>) function;
+
+			return new WindowOperator<>(
+					assigner,
+					keySelector,
+					windowBuffer,
+					windowFunction,
+					WatermarkTrigger.create());
 		}
-		
-		throw new UnsupportedOperationException("The windowing mechanism does not yet support " + window.toString(slide));
+
+		// -- case 3: arbitrary trigger, no eviction
+		if (slide == null) {
+			Trigger<? super IN, GlobalWindow> trigger = policyToTrigger(window);
+			// we need to make them purging triggers because the trigger/eviction policy model
+			// expects that the window is purged when no slide is used
+			Trigger<? super IN, GlobalWindow> purgingTrigger = PurgingTrigger.of(trigger);
+
+			WindowBufferFactory<IN, ? extends WindowBuffer<IN>> windowBuffer;
+			if (function instanceof ReduceFunction) {
+				@SuppressWarnings("unchecked")
+				ReduceFunction<IN> reducer = (ReduceFunction<IN>) SerializationUtils.clone(function);
+				function = new ReduceWindowFunction<>(reducer);
+				windowBuffer = new PreAggregatingHeapWindowBuffer.Factory<>(reducer);
+			} else {
+				windowBuffer = new HeapWindowBuffer.Factory<>();
+			}
+
+			if (!(function instanceof KeyedWindowFunction)) {
+				throw new IllegalStateException("Windowing function is not of type EvaluateKeyedWindowFunction.");
+			}
+			@SuppressWarnings("unchecked")
+			KeyedWindowFunction<IN, OUT, KEY, GlobalWindow> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, GlobalWindow>) function;
+
+			return new WindowOperator<>(
+					GlobalWindows.<IN>create(),
+					keySelector,
+					windowBuffer,
+					windowFunction,
+					purgingTrigger);
+		}
+
+		// -- case 4: arbitrary trigger, arbitrary eviction
+		Trigger<? super IN, GlobalWindow> trigger = policyToTrigger(slide);
+		Evictor<? super IN, GlobalWindow> evictor = policyToEvictor(window);
+
+		WindowBufferFactory<IN, ? extends EvictingWindowBuffer<IN>> windowBuffer = new HeapWindowBuffer.Factory<>();
+		if (function instanceof ReduceFunction) {
+			@SuppressWarnings("unchecked")
+			ReduceFunction<IN> reducer = (ReduceFunction<IN>) SerializationUtils.clone(function);
+			function = new ReduceWindowFunction<>(reducer);
+		}
+
+		if (!(function instanceof KeyedWindowFunction)) {
+			throw new IllegalStateException("Windowing function is not of type EvaluateKeyedWindowFunction.");
+		}
+
+		@SuppressWarnings("unchecked")
+		KeyedWindowFunction<IN, OUT, KEY, GlobalWindow> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, GlobalWindow>) function;
+
+		EvictingWindowOperator<KEY, IN, OUT, GlobalWindow> op = new EvictingWindowOperator<>(
+				GlobalWindows.<IN>create(),
+				keySelector,
+				windowBuffer,
+				windowFunction,
+				trigger,
+				evictor);
+
+		if (window instanceof ProcessingTime) {
+			// special case, we need to instruct the window operator to store the processing time in
+			// the elements so that the evictor can work on that
+			op.enableSetProcessingTime(true);
+		}
+
+		return op;
+	}
+
+	private static <IN> Trigger<? super IN, GlobalWindow> policyToTrigger(WindowPolicy policy) {
+		if (policy instanceof EventTime) {
+			EventTime eventTime = (EventTime) policy;
+			return ContinuousWatermarkTrigger.of(eventTime.getSize());
+		} else if (policy instanceof ProcessingTime) {
+			ProcessingTime processingTime = (ProcessingTime) policy;
+			return ContinuousProcessingTimeTrigger.of(processingTime.getSize());
+		} else if (policy instanceof Count) {
+			Count count = (Count) policy;
+			return CountTrigger.of(count.getSize());
+		} else if (policy instanceof Delta) {
+			@SuppressWarnings("unchecked,rawtypes")
+			Delta<IN> delta = (Delta) policy;
+			return DeltaTrigger.of(delta.getThreshold(), delta.getDeltaFunction());
+
+		}
+
+		throw new UnsupportedOperationException("Unsupported policy " + policy);
+	}
+
+	private static <IN> Evictor<? super IN, GlobalWindow> policyToEvictor(WindowPolicy policy) {
+		if (policy instanceof EventTime) {
+			EventTime eventTime = (EventTime) policy;
+			return TimeEvictor.of(eventTime.getSize());
+		} else if (policy instanceof ProcessingTime) {
+			ProcessingTime processingTime = (ProcessingTime) policy;
+			return TimeEvictor.of(processingTime.getSize());
+		} else if (policy instanceof Count) {
+			Count count = (Count) policy;
+			return CountEvictor.of(count.getSize());
+		} else if (policy instanceof Delta) {
+			@SuppressWarnings("unchecked,rawtypes")
+			Delta<IN> delta = (Delta) policy;
+			return DeltaEvictor.of(delta.getThreshold(), delta.getDeltaFunction());
+
+		}
+
+
+		throw new UnsupportedOperationException("Unsupported policy " + policy);
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
new file mode 100644
index 0000000..cda4481
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -0,0 +1,320 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+public class WindowOperator<K, IN, OUT, W extends Window>
+		extends AbstractUdfStreamOperator<OUT, KeyedWindowFunction<IN, OUT, K, W>>
+		implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
+
+
+	private final WindowAssigner<? super IN, W> windowAssigner;
+	private final KeySelector<IN, K> keySelector;
+
+	private final Trigger<? super IN, ? super W> triggerTemplate;
+	private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
+
+	protected transient Map<K, Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>>> windows;
+
+	private transient Map<Long, Set<TriggerContext>> processingTimeTimers;
+	private transient Map<Long, Set<TriggerContext>> watermarkTimers;
+
+	protected transient TimestampedCollector<OUT> timestampedCollector;
+
+	private boolean setProcessingTime = false;
+
+	private TypeSerializer<IN> inputSerializer;
+
+	public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+			KeySelector<IN, K> keySelector,
+			WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
+			KeyedWindowFunction<IN, OUT, K, W> windowFunction,
+			Trigger<? super IN, ? super W> trigger) {
+
+		super(windowFunction);
+
+		this.windowAssigner = windowAssigner;
+		this.keySelector = keySelector;
+
+		this.windowBufferFactory = windowBufferFactory;
+		this.triggerTemplate = trigger;
+
+		setChainingStrategy(ChainingStrategy.ALWAYS);
+//		forceInputCopy();
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
+		inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		windows = Maps.newHashMap();
+		watermarkTimers = Maps.newHashMap();
+		processingTimeTimers = Maps.newHashMap();
+		timestampedCollector = new TimestampedCollector<>(output);
+
+		if (inputSerializer == null) {
+			throw new IllegalStateException("Input serializer was not set.");
+		}
+
+		windowBufferFactory.setRuntimeContext(getRuntimeContext());
+		windowBufferFactory.open(parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		// emit the elements that we still keep
+		for (Map.Entry<K, Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>>> entry: windows.entrySet()) {
+			K key = entry.getKey();
+			Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = entry.getValue();
+			for (W window: keyWindows.keySet()) {
+				emitWindow(key, window, false);
+			}
+		}
+		windows.clear();
+		windowBufferFactory.close();
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		if (setProcessingTime) {
+			element.replace(element.getValue(), System.currentTimeMillis());
+		}
+		Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
+
+		K key = keySelector.getKey(element.getValue());
+
+		Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key);
+		if (keyWindows == null) {
+			keyWindows = Maps.newHashMap();
+			windows.put(key, keyWindows);
+		}
+
+		for (W window: elementWindows) {
+			Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger = keyWindows.get(window);
+			if (bufferAndTrigger == null) {
+				bufferAndTrigger = new Tuple2<>();
+				bufferAndTrigger.f0 = windowBufferFactory.create();
+				bufferAndTrigger.f1 = new TriggerContext(key, window, triggerTemplate.duplicate());
+				keyWindows.put(window, bufferAndTrigger);
+			}
+			StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
+			bufferAndTrigger.f0.storeElement(elementCopy);
+			Trigger.TriggerResult triggerResult = bufferAndTrigger.f1.trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, bufferAndTrigger.f1);
+			processTriggerResult(triggerResult, key, window);
+		}
+	}
+
+	protected void emitWindow(K key, W window, boolean purge) throws Exception {
+		timestampedCollector.setTimestamp(window.getEnd());
+
+		Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key);
+
+		if (keyWindows == null) {
+			LOG.debug("Window {} for key {} already gone.", window, key);
+			return;
+		}
+
+		Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
+		if (purge) {
+			bufferAndTrigger = keyWindows.remove(window);
+		} else {
+			bufferAndTrigger = keyWindows.get(window);
+		}
+
+		if (bufferAndTrigger == null) {
+			LOG.debug("Window {} for key {} already gone.", window, key);
+			return;
+		}
+
+
+		userFunction.evaluate(key,
+				window,
+				bufferAndTrigger.f0.getUnpackedElements(),
+				timestampedCollector);
+
+		if (keyWindows.isEmpty()) {
+			windows.remove(key);
+		}
+	}
+
+	private void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception {
+		switch (triggerResult) {
+			case FIRE:
+				emitWindow(key, window, false);
+				break;
+
+			case FIRE_AND_PURGE:
+				emitWindow(key, window, true);
+				break;
+
+			case CONTINUE:
+				// ingore
+		}
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		Set<Long> toRemove = Sets.newHashSet();
+
+		for (Map.Entry<Long, Set<TriggerContext>> triggers: watermarkTimers.entrySet()) {
+			if (triggers.getKey() <= mark.getTimestamp()) {
+				for (TriggerContext trigger: triggers.getValue()) {
+					Trigger.TriggerResult triggerResult = trigger.trigger.onTime(mark.getTimestamp(), trigger);
+					processTriggerResult(triggerResult, trigger.key, trigger.window);
+				}
+				toRemove.add(triggers.getKey());
+			}
+		}
+
+		for (Long l: toRemove) {
+			watermarkTimers.remove(l);
+		}
+		output.emitWatermark(mark);
+	}
+
+	@Override
+	public void trigger(long time) throws Exception {
+		Set<Long> toRemove = Sets.newHashSet();
+
+		for (Map.Entry<Long, Set<TriggerContext>> triggers: processingTimeTimers.entrySet()) {
+			if (triggers.getKey() < time) {
+				for (TriggerContext trigger: triggers.getValue()) {
+					Trigger.TriggerResult triggerResult = trigger.trigger.onTime(time, trigger);
+					processTriggerResult(triggerResult, trigger.key, trigger.window);
+				}
+				toRemove.add(triggers.getKey());
+			}
+		}
+
+		for (Long l: toRemove) {
+			processingTimeTimers.remove(l);
+		}
+	}
+
+	protected class TriggerContext implements Trigger.TriggerContext {
+		Trigger<? super IN, ? super W> trigger;
+		K key;
+		W window;
+
+		public TriggerContext(K key, W window, Trigger<? super IN, ? super W> trigger) {
+			this.key = key;
+			this.window = window;
+			this.trigger = trigger;
+		}
+
+		@Override
+		public void registerProcessingTimeTimer(long time) {
+			Set<TriggerContext> triggers = processingTimeTimers.get(time);
+			if (triggers == null) {
+				getRuntimeContext().registerTimer(time, WindowOperator.this);
+				triggers = Sets.newHashSet();
+				processingTimeTimers.put(time, triggers);
+			}
+			triggers.add(this);
+		}
+
+		@Override
+		public void registerWatermarkTimer(long time) {
+			Set<TriggerContext> triggers = watermarkTimers.get(time);
+			if (triggers == null) {
+				triggers = Sets.newHashSet();
+				watermarkTimers.put(time, triggers);
+			}
+			triggers.add(this);
+		}
+	}
+
+	/**
+	 * When this flag is enabled the current processing time is set as the timestamp of elements
+	 * upon arrival. This must be used, for example, when using the
+	 * {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} with processing
+	 * time semantics.
+	 */
+	public WindowOperator<K, IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
+		this.setProcessingTime = setProcessingTime;
+		return this;
+	}
+
+	// ------------------------------------------------------------------------
+	// Getters for testing
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	public Trigger<? super IN, ? super W> getTriggerTemplate() {
+		return triggerTemplate;
+	}
+
+	@VisibleForTesting
+	public KeySelector<IN, K> getKeySelector() {
+		return keySelector;
+	}
+
+	@VisibleForTesting
+	public WindowAssigner<? super IN, W> getWindowAssigner() {
+		return windowAssigner;
+	}
+
+	@VisibleForTesting
+	public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() {
+		return windowBufferFactory;
+	}
+
+	@VisibleForTesting
+	public boolean isSetProcessingTime() {
+		return setProcessingTime;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
new file mode 100644
index 0000000..50e392b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+public interface EvictingWindowBuffer<T> extends WindowBuffer<T> {
+	public boolean removeElements(int count);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
new file mode 100644
index 0000000..092718a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayDeque;
+
+public class HeapWindowBuffer<T> implements EvictingWindowBuffer<T> {
+	private static final long serialVersionUID = 1L;
+
+	private ArrayDeque<StreamRecord<T>> elements;
+
+	protected HeapWindowBuffer() {
+		this.elements = new ArrayDeque<>();
+	}
+
+	@Override
+	public void storeElement(StreamRecord<T> element) {
+		elements.add(element);
+	}
+
+	@Override
+	public boolean removeElements(int count) {
+		// TODO determine if this can be done in a better way
+		for (int i = 0; i < count; i++) {
+			elements.removeFirst();
+		}
+		return false;
+	}
+
+	@Override
+	public Iterable<StreamRecord<T>> getElements() {
+		return elements;
+	}
+
+	@Override
+	public Iterable<T> getUnpackedElements() {
+		return FluentIterable.from(elements).transform(new Function<StreamRecord<T>, T>() {
+			@Override
+			public T apply(StreamRecord<T> record) {
+				return record.getValue();
+			}
+		});
+	}
+
+	@Override
+	public int size() {
+		return elements.size();
+	}
+
+	public static class Factory<T> implements WindowBufferFactory<T, HeapWindowBuffer<T>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void setRuntimeContext(RuntimeContext ctx) {}
+
+		@Override
+		public void open(Configuration config) {}
+
+		@Override
+		public void close() {}
+
+		@Override
+		public HeapWindowBuffer<T> create() {
+			return new HeapWindowBuffer<>();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
new file mode 100644
index 0000000..85f90b0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collections;
+
+public class PreAggregatingHeapWindowBuffer<T> implements WindowBuffer<T> {
+	private static final long serialVersionUID = 1L;
+
+	private final ReduceFunction<T> reduceFunction;
+	private transient StreamRecord<T> data;
+
+	protected PreAggregatingHeapWindowBuffer(ReduceFunction<T> reduceFunction) {
+		this.reduceFunction = reduceFunction;
+	}
+
+	@Override
+	public void storeElement(StreamRecord<T> element) throws Exception {
+		if (data == null) {
+			data = new StreamRecord<>(element.getValue(), element.getTimestamp());
+		} else {
+			data.replace(reduceFunction.reduce(data.getValue(), element.getValue()));
+		}
+	}
+
+	@Override
+	public Iterable<StreamRecord<T>> getElements() {
+		return Collections.singleton(data);
+	}
+
+	@Override
+	public Iterable<T> getUnpackedElements() {
+		return Collections.singleton(data.getValue());
+	}
+
+	@Override
+	public int size() {
+		return 1;
+	}
+
+	public static class Factory<T> implements WindowBufferFactory<T, PreAggregatingHeapWindowBuffer<T>> {
+		private static final long serialVersionUID = 1L;
+
+		private final ReduceFunction<T> reduceFunction;
+
+		public Factory(ReduceFunction<T> reduceFunction) {
+			this.reduceFunction = reduceFunction;
+		}
+
+		@Override
+		public void setRuntimeContext(RuntimeContext ctx) {
+			FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
+		}
+
+		@Override
+		public void open(Configuration config) throws Exception {
+			FunctionUtils.openFunction(reduceFunction, config);
+		}
+
+		@Override
+		public void close() throws Exception {
+			FunctionUtils.closeFunction(reduceFunction);
+		}
+
+		@Override
+		public PreAggregatingHeapWindowBuffer<T> create() {
+			return new PreAggregatingHeapWindowBuffer<T>(reduceFunction);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
new file mode 100644
index 0000000..8c891d5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+
+public interface WindowBuffer<T> extends Serializable {
+
+	public void storeElement(StreamRecord<T> element) throws Exception;
+
+	public Iterable<StreamRecord<T>> getElements();
+
+	public Iterable<T> getUnpackedElements();
+
+	public int size();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
new file mode 100644
index 0000000..4a7f6df
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.Serializable;
+
+public interface WindowBufferFactory<T, B extends WindowBuffer<T>> extends Serializable {
+	void setRuntimeContext(RuntimeContext ctx);
+	void open(Configuration config) throws Exception;
+	void close() throws Exception;
+	B create();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
new file mode 100644
index 0000000..3d9605e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class EvictingWindowOperatorTest {
+
+	// For counting if close() is called the correct number of times on the SumReducer
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testCountTrigger() throws Exception {
+		AtomicInteger closeCalled = new AtomicInteger(0);
+
+		final int WINDOW_SIZE = 4;
+		final int WINDOW_SLIDE = 2;
+
+		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+				GlobalWindows.create(),
+				new TupleKeySelector(),
+				new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(),
+				new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
+				CountTrigger.of(WINDOW_SLIDE),
+				CountEvictor.of(WINDOW_SIZE));
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		// The global window actually ignores these timestamps...
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.close();
+
+		Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+
+
+	}
+
+	// ------------------------------------------------------------------------
+	//  UDFs
+	// ------------------------------------------------------------------------
+
+	public static class SumReducer extends RichReduceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private boolean openCalled = false;
+
+		private  AtomicInteger closeCalled;
+
+		public SumReducer(AtomicInteger closeCalled) {
+			this.closeCalled = closeCalled;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			openCalled = true;
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			closeCalled.incrementAndGet();
+		}
+
+		@Override
+		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
+				Tuple2<String, Integer> value2) throws Exception {
+			if (!openCalled) {
+				Assert.fail("Open was not called");
+			}
+			return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private static class ResultSortComparator implements Comparator<Object> {
+		@Override
+		public int compare(Object o1, Object o2) {
+			if (o1 instanceof Watermark || o2 instanceof Watermark) {
+				return 0;
+			} else {
+				StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String, Integer>>) o1;
+				StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String, Integer>>) o2;
+				if (sr0.getTimestamp() != sr1.getTimestamp()) {
+					return (int) (sr0.getTimestamp() - sr1.getTimestamp());
+				}
+				int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0);
+				if (comparison != 0) {
+					return comparison;
+				} else {
+					return sr0.getValue().f1 - sr1.getValue().f1;
+				}
+			}
+		}
+	}
+
+	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String getKey(Tuple2<String, Integer> value) throws Exception {
+			return value.f0;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyWindowTranslationTest.java
new file mode 100644
index 0000000..6f42514
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyWindowTranslationTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Count;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Delta;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Time;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * These tests verify that the api calls on
+ * {@link org.apache.flink.streaming.api.datastream.KeyedWindowDataStream} instantiate
+ * the correct window operator.
+ */
+public class PolicyWindowTranslationTest extends StreamingMultipleProgramsTestBase {
+
+	/**
+	 * These tests ensure that the fast aligned time windows operator is used if the
+	 * conditions are right.
+	 */
+	@Test
+	public void testFastTimeWindows() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(0)
+				.window(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
+				.reduceWindow(reducer);
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
+
+		DataStream<Tuple2<String, Integer>> window2 = source
+				.keyBy(0)
+				.window(Time.of(1000, TimeUnit.MILLISECONDS))
+				.mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, Window>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void evaluate(Tuple tuple,
+							Window window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+		Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testNonEvicting() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(0)
+				.window(Count.of(200))
+				.reduceWindow(reducer);
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof WindowOperator);
+		WindowOperator winOperator1 = (WindowOperator) operator1;
+		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof PurgingTrigger);
+		Assert.assertTrue(((PurgingTrigger)winOperator1.getTriggerTemplate()).getNestedTrigger() instanceof CountTrigger);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof GlobalWindows);
+		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
+
+		DataStream<Tuple2<String, Integer>> window2 = source
+				.keyBy(0)
+				.window(Delta.of(15.0, new DeltaFunction<Object>() {
+					@Override
+					public double getDelta(Object oldDataPoint, Object newDataPoint) {
+						return 0;
+					}
+				}))
+				.mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, Window>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void evaluate(Tuple tuple,
+							Window window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+		Assert.assertTrue(operator2 instanceof WindowOperator);
+		WindowOperator winOperator2 = (WindowOperator) operator2;
+		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof PurgingTrigger);
+		Assert.assertTrue(((PurgingTrigger)winOperator2.getTriggerTemplate()).getNestedTrigger() instanceof DeltaTrigger);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof GlobalWindows);
+		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testEvicting() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(0)
+				.window(Time.of(1000, TimeUnit.MICROSECONDS), Count.of(100))
+				.reduceWindow(reducer);
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
+		EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
+		// ensure that the operator sets the current processing time as timestamp
+		Assert.assertTrue(winOperator1.isSetProcessingTime());
+		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof GlobalWindows);
+		Assert.assertTrue(winOperator1.getEvictor() instanceof TimeEvictor);
+		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+
+		DataStream<Tuple2<String, Integer>> window2 = source
+				.keyBy(0)
+				.window(Count.of(1000), Delta.of(1.0, new DeltaFunction<Object>() {
+					@Override
+					public double getDelta(Object oldDataPoint, Object newDataPoint) {
+						return 0;
+					}
+				}))
+				.mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, Window>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void evaluate(Tuple tuple,
+							Window window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+		Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
+		EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2;
+		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof DeltaTrigger);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof GlobalWindows);
+		Assert.assertTrue(winOperator2.getEvictor() instanceof CountEvictor);
+		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+	}
+
+	// ------------------------------------------------------------------------
+	//  UDFs
+	// ------------------------------------------------------------------------
+
+	public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
+			return value1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerWindowTranslationTest.java
new file mode 100644
index 0000000..5078c8c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerWindowTranslationTest.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * These tests verify that the api calls on
+ * {@link org.apache.flink.streaming.api.datastream.KeyedTriggerWindowDataStream} instantiate
+ * the correct window operator.
+ */
+public class TriggerWindowTranslationTest extends StreamingMultipleProgramsTestBase {
+
+	/**
+	 * These tests ensure that the fast aligned time windows operator is used if the
+	 * conditions are right.
+	 */
+	@Test
+	public void testFastTimeWindows() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(0)
+				.window(SlidingProcessingTimeWindows.of(1000, 100))
+				.reduceWindow(reducer);
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
+
+		DataStream<Tuple2<String, Integer>> window2 = source
+				.keyBy(0)
+				.window(SlidingProcessingTimeWindows.of(1000, 100))
+				.mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void evaluate(Tuple tuple,
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+		Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testNonEvicting() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(0)
+				.window(SlidingProcessingTimeWindows.of(1000, 100))
+				.trigger(CountTrigger.of(100))
+				.reduceWindow(reducer);
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof WindowOperator);
+		WindowOperator winOperator1 = (WindowOperator) operator1;
+		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
+
+		DataStream<Tuple2<String, Integer>> window2 = source
+				.keyBy(0)
+				.window(TumblingProcessingTimeWindows.of(1000))
+				.trigger(CountTrigger.of(100))
+				.mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void evaluate(Tuple tuple,
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+		Assert.assertTrue(operator2 instanceof WindowOperator);
+		WindowOperator winOperator2 = (WindowOperator) operator2;
+		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testEvicting() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(0)
+				.window(SlidingProcessingTimeWindows.of(1000, 100))
+				.evictor(CountEvictor.of(100))
+				.reduceWindow(reducer);
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
+		EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
+		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
+		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+
+		DataStream<Tuple2<String, Integer>> window2 = source
+				.keyBy(0)
+				.window(TumblingProcessingTimeWindows.of(1000))
+				.trigger(CountTrigger.of(100))
+				.evictor(TimeEvictor.of(100))
+				.mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void evaluate(Tuple tuple,
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+		Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
+		EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2;
+		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
+		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+	}
+
+	// ------------------------------------------------------------------------
+	//  UDFs
+	// ------------------------------------------------------------------------
+
+	public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
+			return value1;
+		}
+	}
+}