You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/20 15:09:23 UTC

[08/15] flink git commit: [FLINK-5295] Migrate the AlignedWindowOperators to the WindowOperator.

[FLINK-5295] Migrate the AlignedWindowOperators to the WindowOperator.

This adds code that lets WindowOperator restore from the Flink 1.1
fast aligned processing-time windows operator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0819dc7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0819dc7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0819dc7

Branch: refs/heads/master
Commit: e0819dc7b72487670dd3ba06628980e27fdbedb0
Parents: b0e2a2c
Author: kl0u <kk...@gmail.com>
Authored: Thu Dec 15 17:59:13 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100

----------------------------------------------------------------------
 .../datastream/LegacyWindowOperatorType.java    |  63 +++++
 .../api/datastream/WindowedStream.java          | 155 +++++------
 ...ractAlignedProcessingTimeWindowOperator.java |   1 +
 ...ccumulatingProcessingTimeWindowOperator.java | 109 +-------
 ...AggregatingProcessingTimeWindowOperator.java |   1 +
 .../operators/windowing/WindowOperator.java     | 259 +++++++++++++++++--
 .../windowing/TimeWindowTranslationTest.java    |   1 +
 .../windowing/WindowingTestHarnessTest.java     |   3 +-
 .../streaming/util/WindowingTestHarness.java    |  14 +-
 .../api/scala/WindowTranslationTest.scala       |  10 +-
 10 files changed, 380 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
new file mode 100644
index 0000000..bb6e4bc
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+/**
+ * For specifying what type of window operator was used to create the state
+ * that a {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator}
+ * is restoring from. This is used to signal that state written using an aligned processing-time
+ * window operator should be restored.
+ */
+public enum LegacyWindowOperatorType {
+
+	FAST_ACCUMULATING(true, false),
+
+	FAST_AGGREGATING(false, true),
+
+	NONE(false, false);
+
+	// ------------------------------------------------------------------------
+
+	private final boolean fastAccumulating;
+	private final boolean fastAggregating;
+
+	LegacyWindowOperatorType(boolean fastAccumulating, boolean fastAggregating) {
+		this.fastAccumulating = fastAccumulating;
+		this.fastAggregating = fastAggregating;
+	}
+
+	public boolean isFastAccumulating() {
+		return fastAccumulating;
+	}
+
+	public boolean isFastAggregating() {
+		return fastAggregating;
+	}
+
+	@Override
+	public String toString() {
+		if (fastAccumulating) {
+			return "AccumulatingProcessingTimeWindowOperator";
+		} else if (fastAggregating) {
+			return "AggregatingProcessingTimeWindowOperator";
+		} else {
+			return "WindowOperator";
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index ad7f371..98bf89a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -49,10 +49,7 @@ import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
@@ -190,16 +187,44 @@ public class WindowedStream<T, K, W extends Window> {
 
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
+		LegacyWindowOperatorType legacyOpType = getLegacyWindowType(function);
+		return reduce(function, new PassThroughWindowFunction<K, W, T>(), legacyOpType);
+	}
 
-		String callLocation = Utils.getCallLocationName();
-		String udfName = "WindowedStream." + callLocation;
-
-		SingleOutputStreamOperator<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
-		if (result != null) {
-			return result;
-		}
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is incrementally aggregated using the given reducer.
+	 *
+	 * @param reduceFunction The reduce function that is used for incremental aggregation.
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
+		return reduce(reduceFunction, function, LegacyWindowOperatorType.NONE);
+	}
 
-		return reduce(function, new PassThroughWindowFunction<K, W, T>());
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is incrementally aggregated using the given reducer.
+	 *
+	 * @param reduceFunction The reduce function that is used for incremental aggregation.
+	 * @param function The window function.
+	 * @param resultType Type information for the result type of the window function
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> SingleOutputStreamOperator<R> reduce(
+		ReduceFunction<T> reduceFunction,
+		WindowFunction<T, R, K, W> function,
+		TypeInformation<R> resultType) {
+		return reduce(reduceFunction, function, resultType, LegacyWindowOperatorType.NONE);
 	}
 
 	/**
@@ -212,14 +237,20 @@ public class WindowedStream<T, K, W extends Window> {
 	 *
 	 * @param reduceFunction The reduce function that is used for incremental aggregation.
 	 * @param function The window function.
+	 * @param legacyWindowOpType When migrating from an older Flink version, this flag indicates
+	 *                           the type of the previous operator whose state we inherit.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
-	public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
+	private <R> SingleOutputStreamOperator<R> reduce(
+			ReduceFunction<T> reduceFunction,
+			WindowFunction<T, R, K, W> function,
+			LegacyWindowOperatorType legacyWindowOpType) {
+
 		TypeInformation<T> inType = input.getType();
 		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
 			function, WindowFunction.class, true, true, inType, null, false);
 
-		return reduce(reduceFunction, function, resultType);
+		return reduce(reduceFunction, function, resultType, legacyWindowOpType);
 	}
 
 	/**
@@ -232,10 +263,17 @@ public class WindowedStream<T, K, W extends Window> {
 	 *
 	 * @param reduceFunction The reduce function that is used for incremental aggregation.
 	 * @param function The window function.
-	 * @param resultType Type information for the result type of the window function
+	 * @param resultType Type information for the result type of the window function.
+	 * @param legacyWindowOpType When migrating from an older Flink version, this flag indicates
+	 *                           the type of the previous operator whose state we inherit.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
-	public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
+	private <R> SingleOutputStreamOperator<R> reduce(
+			ReduceFunction<T> reduceFunction,
+			WindowFunction<T, R, K, W> function,
+			TypeInformation<R> resultType,
+			LegacyWindowOperatorType legacyWindowOpType) {
+
 		if (reduceFunction instanceof RichFunction) {
 			throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction.");
 		}
@@ -288,7 +326,8 @@ public class WindowedStream<T, K, W extends Window> {
 					stateDesc,
 					new InternalSingleValueWindowFunction<>(function),
 					trigger,
-					allowedLateness);
+					allowedLateness,
+					legacyWindowOpType);
 		}
 
 		return input.transform(opName, resultType, operator);
@@ -458,7 +497,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * interpreted as a regular non-windowed stream.
 	 *
 	 * <p>
-	 * Not that this function requires that all data in the windows is buffered until the window
+	 * Note that this function requires that all data in the windows is buffered until the window
 	 * is evaluated, as the function provides no means of incremental aggregation.
 	 *
 	 * @param function The window function.
@@ -473,12 +512,7 @@ public class WindowedStream<T, K, W extends Window> {
 		String callLocation = Utils.getCallLocationName();
 		String udfName = "WindowedStream." + callLocation;
 
-		SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
-		if (result != null) {
-			return result;
-		}
-
-
+		LegacyWindowOperatorType legacyWindowOpType = getLegacyWindowType(function);
 		String opName;
 		KeySelector<T, K> keySel = input.getKeySelector();
 
@@ -519,7 +553,8 @@ public class WindowedStream<T, K, W extends Window> {
 					stateDesc,
 					new InternalIterableWindowFunction<>(function),
 					trigger,
-					allowedLateness);
+					allowedLateness,
+					legacyWindowOpType);
 		}
 
 		return input.transform(opName, resultType, operator);
@@ -925,77 +960,21 @@ public class WindowedStream<T, K, W extends Window> {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
-			Function function,
-			TypeInformation<R> resultType,
-			String functionName) {
-
+	private LegacyWindowOperatorType getLegacyWindowType(Function function) {
 		if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
-			SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
-			final long windowLength = timeWindows.getSize();
-			final long windowSlide = timeWindows.getSlide();
-
-			String opName = "Fast " + timeWindows + " of " + functionName;
-
 			if (function instanceof ReduceFunction) {
-				@SuppressWarnings("unchecked")
-				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
-				@SuppressWarnings("unchecked")
-				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
-						new AggregatingProcessingTimeWindowOperator<>(
-								reducer, input.getKeySelector(), 
-								input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-								input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-								windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
-			else if (function instanceof WindowFunction) {
-				@SuppressWarnings("unchecked")
-				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
-
-				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
-						wf, input.getKeySelector(),
-						input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-						input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-						windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
+				return LegacyWindowOperatorType.FAST_AGGREGATING;
+			} else if (function instanceof WindowFunction) {
+				return LegacyWindowOperatorType.FAST_ACCUMULATING;
 			}
 		} else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
-			TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
-			final long windowLength = timeWindows.getSize();
-			final long windowSlide = timeWindows.getSize();
-
-			String opName = "Fast " + timeWindows + " of " + functionName;
-
 			if (function instanceof ReduceFunction) {
-				@SuppressWarnings("unchecked")
-				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
-				@SuppressWarnings("unchecked")
-				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
-						new AggregatingProcessingTimeWindowOperator<>(
-								reducer,
-								input.getKeySelector(),
-								input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-								input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-								windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
-			else if (function instanceof WindowFunction) {
-				@SuppressWarnings("unchecked")
-				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
-
-				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
-						wf, input.getKeySelector(),
-						input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-						input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-						windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
+				return LegacyWindowOperatorType.FAST_AGGREGATING;
+			} else if (function instanceof WindowFunction) {
+				return LegacyWindowOperatorType.FAST_ACCUMULATING;
 			}
 		}
-
-		return null;
+		return LegacyWindowOperatorType.NONE;
 	}
 
 	public StreamExecutionEnvironment getExecutionEnvironment() {

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 24fd0de..14500ee 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import static java.util.Objects.requireNonNull;
 
 @Internal
+@Deprecated
 public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, STATE, F extends Function> 
 		extends AbstractUdfStreamOperator<OUT, F> 
 		implements OneInputStreamOperator<IN, OUT>, ProcessingTimeCallback {

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 9ea2949..90e4b52 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -22,16 +22,15 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
-import java.io.IOException;
 import java.util.ArrayList;
 
 @Internal
+@Deprecated
 public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
 		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> {
 
@@ -57,108 +56,4 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
 		
 		return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
 	}
-	
-	// ------------------------------------------------------------------------
-	//  Utility Serializer for Lists of Elements
-	// ------------------------------------------------------------------------
-	
-	@SuppressWarnings("ForLoopReplaceableByForEach")
-	private static final class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
-
-		private static final long serialVersionUID = 1119562170939152304L;
-		
-		private final TypeSerializer<T> elementSerializer;
-
-		ArrayListSerializer(TypeSerializer<T> elementSerializer) {
-			this.elementSerializer = elementSerializer;
-		}
-
-		@Override
-		public boolean isImmutableType() {
-			return false;
-		}
-
-		@Override
-		public TypeSerializer<ArrayList<T>> duplicate() {
-			TypeSerializer<T> duplicateElement = elementSerializer.duplicate();
-			return duplicateElement == elementSerializer ? this : new ArrayListSerializer<T>(duplicateElement);
-		}
-
-		@Override
-		public ArrayList<T> createInstance() {
-			return new ArrayList<>();
-		}
-
-		@Override
-		public ArrayList<T> copy(ArrayList<T> from) {
-			ArrayList<T> newList = new ArrayList<>(from.size());
-			for (int i = 0; i < from.size(); i++) {
-				newList.add(elementSerializer.copy(from.get(i)));
-			}
-			return newList;
-		}
-
-		@Override
-		public ArrayList<T> copy(ArrayList<T> from, ArrayList<T> reuse) {
-			return copy(from);
-		}
-
-		@Override
-		public int getLength() {
-			return -1; // var length
-		}
-
-		@Override
-		public void serialize(ArrayList<T> list, DataOutputView target) throws IOException {
-			final int size = list.size();
-			target.writeInt(size);
-			for (int i = 0; i < size; i++) {
-				elementSerializer.serialize(list.get(i), target);
-			}
-		}
-
-		@Override
-		public ArrayList<T> deserialize(DataInputView source) throws IOException {
-			final int size = source.readInt();
-			final ArrayList<T> list = new ArrayList<>(size);
-			for (int i = 0; i < size; i++) {
-				list.add(elementSerializer.deserialize(source));
-			}
-			return list;
-		}
-
-		@Override
-		public ArrayList<T> deserialize(ArrayList<T> reuse, DataInputView source) throws IOException {
-			return deserialize(source);
-		}
-
-		@Override
-		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			// copy number of elements
-			final int num = source.readInt();
-			target.writeInt(num);
-			for (int i = 0; i < num; i++) {
-				elementSerializer.copy(source, target);
-			}
-		}
-
-		// --------------------------------------------------------------------
-		
-		@Override
-		public boolean equals(Object obj) {
-			return obj == this || 
-					(obj != null && obj.getClass() == getClass() && 
-						elementSerializer.equals(((ArrayListSerializer<?>) obj).elementSerializer));
-		}
-
-		@Override
-		public boolean canEqual(Object obj) {
-			return true;
-		}
-
-		@Override
-		public int hashCode() {
-			return elementSerializer.hashCode();
-		}
-	} 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
index 9d85cf0..2175fb5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 
 @Internal
+@Deprecated
 public class AggregatingProcessingTimeWindowOperator<KEY, IN> 
 		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN, IN, ReduceFunction<IN>> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 1cfeba8..990162e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.commons.math3.util.ArithmeticUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.AppendingState;
@@ -35,10 +36,13 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -47,16 +51,21 @@ import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
 import java.util.PriorityQueue;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -148,11 +157,29 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	// State restored in case of migration from an older version (backwards compatibility)
 	// ------------------------------------------------------------------------
 
-	/** The restored processing time timers. */
-	protected transient PriorityQueue<Timer<K, W>> restoredFromLegacyProcessingTimeTimers;
+	/**
+	 * A flag indicating if we are migrating from a regular {@link WindowOperator}
+	 * or one of the deprecated {@link AccumulatingProcessingTimeWindowOperator} and
+	 * {@link AggregatingProcessingTimeWindowOperator}.
+	 */
+	private final LegacyWindowOperatorType legacyWindowOperatorType;
+
+	/**
+	 * The elements restored when migrating from an older, deprecated
+	 * {@link AccumulatingProcessingTimeWindowOperator} or
+	 * {@link AggregatingProcessingTimeWindowOperator}. */
+	private transient PriorityQueue<StreamRecord<IN>> restoredFromLegacyAlignedOpRecords;
+
+	/**
+	 * The restored processing time timers when migrating from an
+	 * older version of the {@link WindowOperator}.
+	 */
+	private transient PriorityQueue<Timer<K, W>> restoredFromLegacyProcessingTimeTimers;
 
-	/** The restored event time timers. */
-	protected transient PriorityQueue<Timer<K, W>> restoredFromLegacyEventTimeTimers;
+	/** The restored event time timer when migrating from an
+	 * older version of the {@link WindowOperator}.
+	 */
+	private transient PriorityQueue<Timer<K, W>> restoredFromLegacyEventTimeTimers;
 
 	/**
 	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
@@ -167,6 +194,24 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			Trigger<? super IN, ? super W> trigger,
 			long allowedLateness) {
 
+		this(windowAssigner, windowSerializer, keySelector, keySerializer,
+			windowStateDescriptor, windowFunction, trigger, allowedLateness, LegacyWindowOperatorType.NONE);
+	}
+
+	/**
+	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
+	 */
+	public WindowOperator(
+			WindowAssigner<? super IN, W> windowAssigner,
+			TypeSerializer<W> windowSerializer,
+			KeySelector<IN, K> keySelector,
+			TypeSerializer<K> keySerializer,
+			StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
+			InternalWindowFunction<ACC, OUT, K, W> windowFunction,
+			Trigger<? super IN, ? super W> trigger,
+			long allowedLateness,
+			LegacyWindowOperatorType legacyWindowOperatorType) {
+
 		super(windowFunction);
 
 		checkArgument(allowedLateness >= 0);
@@ -181,6 +226,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		this.windowStateDescriptor = windowStateDescriptor;
 		this.trigger = checkNotNull(trigger);
 		this.allowedLateness = allowedLateness;
+		this.legacyWindowOperatorType = legacyWindowOperatorType;
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			@SuppressWarnings({"unchecked", "rawtypes"})
@@ -211,26 +257,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			}
 		};
 
-		// if we restore from an older version,
-		// we have to re-register the timers.
-
-		if (restoredFromLegacyEventTimeTimers != null) {
-			for (Timer<K, W> timer : restoredFromLegacyEventTimeTimers) {
-				setCurrentKey(timer.key);
-				internalTimerService.registerEventTimeTimer(timer.window, timer.timestamp);
-			}
-		}
-
-		if (restoredFromLegacyProcessingTimeTimers != null) {
-			for (Timer<K, W> timer : restoredFromLegacyProcessingTimeTimers) {
-				setCurrentKey(timer.key);
-				internalTimerService.registerProcessingTimeTimer(timer.window, timer.timestamp);
-			}
-		}
-
-		// gc friendliness
-		this.restoredFromLegacyEventTimeTimers = null;
-		this.restoredFromLegacyProcessingTimeTimers = null;
+		registerRestoredLegacyStateState();
 	}
 
 	@Override
@@ -745,17 +772,157 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	//  Restoring / Migrating from an older Flink version.
 	// ------------------------------------------------------------------------
 
+	private static final int BEGIN_OF_STATE_MAGIC_NUMBER = 0x0FF1CE42;
+
+	private static final int BEGIN_OF_PANE_MAGIC_NUMBER = 0xBADF00D5;
+
 	@Override
 	public void restoreState(FSDataInputStream in) throws Exception {
 		super.restoreState(in);
 
-		LOG.info("{} (taskIdx={}) restoring timers from an older Flink version.",
-			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
+		LOG.info("{} (taskIdx={}) restoring {} state from an older Flink version.",
+			getClass().getSimpleName(), legacyWindowOperatorType, getRuntimeContext().getIndexOfThisSubtask());
+
+		DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in);
 
-		restoreTimers(new DataInputViewStreamWrapper(in));
+		switch (legacyWindowOperatorType) {
+			case NONE:
+				restoreFromLegacyWindowOperator(streamWrapper);
+				break;
+			case FAST_ACCUMULATING:
+			case FAST_AGGREGATING:
+				restoreFromLegacyAlignedWindowOperator(streamWrapper);
+				break;
+		}
 	}
 
-	private void restoreTimers(DataInputViewStreamWrapper in) throws IOException {
+	public void registerRestoredLegacyStateState() throws Exception {
+
+		LOG.info("{} (taskIdx={}) re-registering state from an older Flink version.",
+			getClass().getSimpleName(), legacyWindowOperatorType, getRuntimeContext().getIndexOfThisSubtask());
+
+		switch (legacyWindowOperatorType) {
+			case NONE:
+				reregisterStateFromLegacyWindowOperator();
+				break;
+			case FAST_ACCUMULATING:
+			case FAST_AGGREGATING:
+				reregisterStateFromLegacyAlignedWindowOperator();
+				break;
+		}
+	}
+
+	private void restoreFromLegacyAlignedWindowOperator(DataInputViewStreamWrapper in) throws IOException {
+		Preconditions.checkArgument(legacyWindowOperatorType != LegacyWindowOperatorType.NONE);
+
+		final long nextEvaluationTime = in.readLong();
+		final long nextSlideTime = in.readLong();
+
+		validateMagicNumber(BEGIN_OF_STATE_MAGIC_NUMBER, in.readInt());
+
+		restoredFromLegacyAlignedOpRecords = new PriorityQueue<>(42,
+			new Comparator<StreamRecord<IN>>() {
+				@Override
+				public int compare(StreamRecord<IN> o1, StreamRecord<IN> o2) {
+					return Long.compare(o1.getTimestamp(), o2.getTimestamp());
+				}
+			}
+		);
+
+		switch (legacyWindowOperatorType) {
+			case FAST_ACCUMULATING:
+				restoreElementsFromLegacyAccumulatingAlignedWindowOperator(in, nextSlideTime);
+				break;
+			case FAST_AGGREGATING:
+				restoreElementsFromLegacyAggregatingAlignedWindowOperator(in, nextSlideTime);
+				break;
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("{} (taskIdx={}) restored {} events from legacy {}.",
+				getClass().getSimpleName(),
+				getRuntimeContext().getIndexOfThisSubtask(),
+				restoredFromLegacyAlignedOpRecords.size(),
+				legacyWindowOperatorType);
+		}
+	}
+
+	private void restoreElementsFromLegacyAccumulatingAlignedWindowOperator(DataInputView in, long nextSlideTime) throws IOException {
+		int numPanes = in.readInt();
+		final long paneSize = getPaneSize();
+		long nextElementTimestamp = nextSlideTime - (numPanes * paneSize);
+
+		@SuppressWarnings("unchecked")
+		ArrayListSerializer<IN> ser = new ArrayListSerializer<>((TypeSerializer<IN>) getStateDescriptor().getSerializer());
+
+		while (numPanes > 0) {
+			validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, in.readInt());
+
+			nextElementTimestamp += paneSize - 1; // the -1 is so that the elements fall into the correct time-frame
+
+			final int numElementsInPane = in.readInt();
+			for (int i = numElementsInPane - 1; i >= 0; i--) {
+				K key = keySerializer.deserialize(in);
+
+				@SuppressWarnings("unchecked")
+				List<IN> valueList = ser.deserialize(in);
+				for (IN record: valueList) {
+					restoredFromLegacyAlignedOpRecords.add(new StreamRecord<>(record, nextElementTimestamp));
+				}
+			}
+			numPanes--;
+		}
+	}
+
+	private void restoreElementsFromLegacyAggregatingAlignedWindowOperator(DataInputView in, long nextSlideTime) throws IOException {
+		int numPanes = in.readInt();
+		final long paneSize = getPaneSize();
+		long nextElementTimestamp = nextSlideTime - (numPanes * paneSize);
+
+		while (numPanes > 0) {
+			validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, in.readInt());
+
+			nextElementTimestamp += paneSize - 1; // the -1 is so that the elements fall into the correct time-frame
+
+			final int numElementsInPane = in.readInt();
+			for (int i = numElementsInPane - 1; i >= 0; i--) {
+				K key = keySerializer.deserialize(in);
+
+				@SuppressWarnings("unchecked")
+				IN value = (IN) getStateDescriptor().getSerializer().deserialize(in);
+				restoredFromLegacyAlignedOpRecords.add(new StreamRecord<>(value, nextElementTimestamp));
+			}
+			numPanes--;
+		}
+	}
+
+	private long getPaneSize() {
+		Preconditions.checkArgument(
+			legacyWindowOperatorType == LegacyWindowOperatorType.FAST_ACCUMULATING ||
+				legacyWindowOperatorType == LegacyWindowOperatorType.FAST_AGGREGATING);
+
+		final long paneSlide;
+		if (windowAssigner instanceof SlidingProcessingTimeWindows) {
+			SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
+			paneSlide = ArithmeticUtils.gcd(timeWindows.getSize(), timeWindows.getSlide());
+		} else {
+			TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
+			paneSlide = timeWindows.getSize(); // this is valid as windowLength == windowSlide == timeWindows.getSize
+		}
+		return paneSlide;
+	}
+
+	private static void validateMagicNumber(int expected, int found) throws IOException {
+		if (expected != found) {
+			throw new IOException("Corrupt state stream - wrong magic number. " +
+				"Expected '" + Integer.toHexString(expected) +
+				"', found '" + Integer.toHexString(found) + '\'');
+		}
+	}
+
+	private void restoreFromLegacyWindowOperator(DataInputViewStreamWrapper in) throws IOException {
+		Preconditions.checkArgument(legacyWindowOperatorType == LegacyWindowOperatorType.NONE);
+
 		int numWatermarkTimers = in.readInt();
 		this.restoredFromLegacyEventTimeTimers = new PriorityQueue<>(Math.max(numWatermarkTimers, 1));
 
@@ -806,6 +973,42 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		}
 	}
 
+	public void reregisterStateFromLegacyWindowOperator() {
+		// if we restore from an older version,
+		// we have to re-register the recovered state.
+
+		if (restoredFromLegacyEventTimeTimers != null) {
+			for (Timer<K, W> timer : restoredFromLegacyEventTimeTimers) {
+				setCurrentKey(timer.key);
+				internalTimerService.registerEventTimeTimer(timer.window, timer.timestamp);
+			}
+		}
+
+		if (restoredFromLegacyProcessingTimeTimers != null) {
+			for (Timer<K, W> timer : restoredFromLegacyProcessingTimeTimers) {
+				setCurrentKey(timer.key);
+				internalTimerService.registerProcessingTimeTimer(timer.window, timer.timestamp);
+			}
+		}
+
+		// gc friendliness
+		restoredFromLegacyEventTimeTimers = null;
+		restoredFromLegacyProcessingTimeTimers = null;
+	}
+
+	public void reregisterStateFromLegacyAlignedWindowOperator() throws Exception {
+		if (restoredFromLegacyAlignedOpRecords != null) {
+			while (!restoredFromLegacyAlignedOpRecords.isEmpty()) {
+				StreamRecord<IN> record = restoredFromLegacyAlignedOpRecords.poll();
+				setCurrentKey(keySelector.getKey(record.getValue()));
+				processElement(record);
+			}
+
+			// gc friendliness
+			restoredFromLegacyAlignedOpRecords = null;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	// Getters for testing
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index c1ad0fc..5aa8151 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -55,6 +55,7 @@ public class TimeWindowTranslationTest {
 	 * conditions are right.
 	 */
 	@Test
+	@Ignore
 	public void testFastTimeWindows() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
index 8e33c92..82c3d71 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.WindowingTestHarness;
 import org.junit.Test;
 
@@ -171,7 +172,7 @@ public class WindowingTestHarnessTest {
 		testHarness.compareActualToExpectedOutput("Output was not correct.");
 
 		// do a snapshot, close and restore again
-		StreamStateHandle snapshot = testHarness.snapshot(0L, 0L);
+		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
 		testHarness.close();
 		testHarness.restore(snapshot);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index 25deb54..efb0d7e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -31,6 +30,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
@@ -159,20 +159,20 @@ public class WindowingTestHarness<K, IN, W extends Window> {
 	/**
 	 * Takes a snapshot of the current state of the operator. This can be used to test fault-tolerance.
 	 */
-	public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception {
-		return testHarness.snapshotLegacy(checkpointId, timestamp);
+	public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception {
+		return testHarness.snapshot(checkpointId, timestamp);
 	}
 
 	/**
-	 * Resumes execution from a provided {@link StreamStateHandle}. This is used to test recovery after a failure.
+	 * Resumes execution from the provided {@link OperatorStateHandles}. This is used to test recovery after a failure.
 	 */
-	public void restore(StreamStateHandle stateHandle) throws Exception {
+	public void restore(OperatorStateHandles stateHandles) throws Exception {
 		Preconditions.checkArgument(!isOpen,
 			"You are trying to restore() while the operator is still open. " +
 				"Please call close() first.");
 
 		testHarness.setup();
-		testHarness.restore(stateHandle);
+		testHarness.initializeState(stateHandles);
 		openOperator();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index c67c215..299932f 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -24,17 +24,16 @@ import org.apache.flink.api.common.state.{ListStateDescriptor, ReducingStateDesc
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.scala.function.WindowFunction
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingEventTimeWindows, SlidingEventTimeWindows}
+import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, SlidingProcessingTimeWindows, TumblingEventTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
 import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
+import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, ProcessingTimeTrigger}
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.runtime.operators.windowing.{EvictingWindowOperator, WindowOperator, AccumulatingProcessingTimeWindowOperator, AggregatingProcessingTimeWindowOperator}
+import org.apache.flink.streaming.runtime.operators.windowing.{AccumulatingProcessingTimeWindowOperator, AggregatingProcessingTimeWindowOperator, EvictingWindowOperator, WindowOperator}
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.util.Collector
-
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Ignore, Test}
 
 class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
@@ -43,6 +42,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
    * conditions are right.
    */
   @Test
+  @Ignore
   def testFastTimeWindows(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment