You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/05 22:17:04 UTC

[14/17] flink git commit: [hotfix] [streaming api] Remove obsolete and unused InputTypeSerializer from WindowOperator

[hotfix] [streaming api] Remove obsolete and unused InputTypeSerializer from WindowOperator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1cd8d4f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1cd8d4f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1cd8d4f4

Branch: refs/heads/master
Commit: 1cd8d4f418a707790c091fed2428627eae9da423
Parents: 47e4977
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Oct 4 23:49:54 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Oct 5 20:04:34 2016 +0200

----------------------------------------------------------------------
 .../operators/windowing/WindowOperator.java     | 20 +------
 .../windowing/EvictingWindowOperatorTest.java   |  6 ---
 .../operators/windowing/WindowOperatorTest.java | 56 +-------------------
 .../streaming/util/WindowingTestHarness.java    |  2 -
 4 files changed, 3 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1cd8d4f4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index de316e7..c5f1ca2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.AppendingState;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -34,7 +33,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -98,7 +96,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
-	implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
+	implements OneInputStreamOperator<IN, OUT>, Triggerable {
 
 	private static final long serialVersionUID = 1L;
 
@@ -115,12 +113,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
 
 	/**
-	 * This is used to copy the incoming element because it can be put into several window
-	 * buffers.
-	 */
-	protected TypeSerializer<IN> inputSerializer;
-
-	/**
 	 * For serializing the key in checkpoints.
 	 */
 	protected final TypeSerializer<K> keySerializer;
@@ -211,21 +203,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
-	public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
-		inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
-	}
-
-	@Override
 	public final void open() throws Exception {
 		super.open();
 
 		timestampedCollector = new TimestampedCollector<>(output);
 
-		if (inputSerializer == null) {
-			throw new IllegalStateException("Input serializer was not set.");
-		}
-
 		// these could already be initialized from restoreState()
 		if (watermarkTimers == null) {
 			watermarkTimers = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/1cd8d4f4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 681a334..8f3af15 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -82,8 +82,6 @@ public class EvictingWindowOperatorTest {
 				CountEvictor.of(WINDOW_SIZE),
 				0);
 
-		operator.setInputType(inputType, new ExecutionConfig());
-
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -152,8 +150,6 @@ public class EvictingWindowOperatorTest {
 			CountEvictor.of(WINDOW_SIZE),
 			0);
 
-		operator.setInputType(inputType, new ExecutionConfig());
-
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -221,8 +217,6 @@ public class EvictingWindowOperatorTest {
 			CountEvictor.of(WINDOW_SIZE),
 			0);
 
-		operator.setInputType(inputType, new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1cd8d4f4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index e98bc91..cda6524 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -180,8 +180,6 @@ public class WindowOperatorTest extends TestLogger {
 				EventTimeTrigger.create(),
 				0);
 
-		operator.setInputType(inputType, new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -216,8 +214,6 @@ public class WindowOperatorTest extends TestLogger {
 				EventTimeTrigger.create(),
 				0);
 
-		operator.setInputType(inputType, new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -317,8 +313,6 @@ public class WindowOperatorTest extends TestLogger {
 				EventTimeTrigger.create(),
 				0);
 
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -351,8 +345,6 @@ public class WindowOperatorTest extends TestLogger {
 				EventTimeTrigger.create(),
 				0);
 
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -388,8 +380,6 @@ public class WindowOperatorTest extends TestLogger {
 				EventTimeTrigger.create(),
 				0);
 
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -462,8 +452,6 @@ public class WindowOperatorTest extends TestLogger {
 				EventTimeTrigger.create(),
 				0);
 
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -538,8 +526,6 @@ public class WindowOperatorTest extends TestLogger {
 				PurgingTrigger.of(CountTrigger.of(4)),
 				0);
 
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 		
@@ -643,8 +629,6 @@ public class WindowOperatorTest extends TestLogger {
 				EventTimeTrigger.create(),
 				0);
 
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 		
@@ -703,8 +687,6 @@ public class WindowOperatorTest extends TestLogger {
 				ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
 				0);
 
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -793,9 +775,6 @@ public class WindowOperatorTest extends TestLogger {
 				PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
 				0);
 
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
-				"Tuple2<String, Integer>"), new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -839,9 +818,6 @@ public class WindowOperatorTest extends TestLogger {
 
 		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
-				"Tuple2<String, Integer>"), new ExecutionConfig());
-
 		testHarness.setup();
 		testHarness.restore(snapshot);
 		testHarness.open();
@@ -876,7 +852,6 @@ public class WindowOperatorTest extends TestLogger {
 		final int WINDOW_SLIDE = 1;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
-		TestTimeServiceProvider timer = new TestTimeServiceProvider();
 
 		TestTimeServiceProvider timer = new TestTimeServiceProvider();
 
@@ -957,7 +932,6 @@ public class WindowOperatorTest extends TestLogger {
 				ProcessingTimeTrigger.create(), 0);
 
 		TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1017,7 +991,6 @@ public class WindowOperatorTest extends TestLogger {
 				ProcessingTimeTrigger.create(), 0);
 
 		TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1090,7 +1063,6 @@ public class WindowOperatorTest extends TestLogger {
 				ProcessingTimeTrigger.create(), 0);
 
 		TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1159,8 +1131,7 @@ public class WindowOperatorTest extends TestLogger {
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
-
-		operator.setInputType(inputType, new ExecutionConfig());
+		
 		testHarness.open();
 		
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -1220,8 +1191,7 @@ public class WindowOperatorTest extends TestLogger {
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
-
-		operator.setInputType(inputType, new ExecutionConfig());
+		
 		testHarness.open();
 
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -1288,7 +1258,6 @@ public class WindowOperatorTest extends TestLogger {
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
 
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -1349,7 +1318,6 @@ public class WindowOperatorTest extends TestLogger {
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
 		
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -1422,8 +1390,6 @@ public class WindowOperatorTest extends TestLogger {
 				PurgingTrigger.of(EventTimeTrigger.create()),
 				LATENESS);
 
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -1514,8 +1480,6 @@ public class WindowOperatorTest extends TestLogger {
 				EventTimeTrigger.create(),
 				LATENESS);
 
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -1600,8 +1564,6 @@ public class WindowOperatorTest extends TestLogger {
 				PurgingTrigger.of(EventTimeTrigger.create()),
 				LATENESS);
 
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -1686,8 +1648,6 @@ public class WindowOperatorTest extends TestLogger {
 				EventTimeTrigger.create(),
 				LATENESS);
 
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -1781,8 +1741,6 @@ public class WindowOperatorTest extends TestLogger {
 				PurgingTrigger.of(EventTimeTrigger.create()),
 				LATENESS);
 
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -1868,8 +1826,6 @@ public class WindowOperatorTest extends TestLogger {
 				EventTimeTrigger.create(),
 				LATENESS);
 
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -1958,7 +1914,6 @@ public class WindowOperatorTest extends TestLogger {
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
 
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -2013,7 +1968,6 @@ public class WindowOperatorTest extends TestLogger {
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
 
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -2060,7 +2014,6 @@ public class WindowOperatorTest extends TestLogger {
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
 
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -2116,7 +2069,6 @@ public class WindowOperatorTest extends TestLogger {
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
 
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -2162,7 +2114,6 @@ public class WindowOperatorTest extends TestLogger {
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
 
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -2204,8 +2155,6 @@ public class WindowOperatorTest extends TestLogger {
 				EventTimeTrigger.create(),
 				LATENESS);
 
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -2261,7 +2210,6 @@ public class WindowOperatorTest extends TestLogger {
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
 
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/1cd8d4f4/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index d47136c..ab8b70f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -80,8 +80,6 @@ public class WindowingTestHarness<K, IN, W extends Window> {
 				trigger,
 				allowedLateness);
 
-		operator.setInputType(inputType, executionConfig);
-
 		timeServiceProvider = new TestTimeServiceProvider();
 		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, executionConfig, timeServiceProvider, keySelector, keyType);
 	}