You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/05 22:17:04 UTC
[14/17] flink git commit: [hotfix] [streaming api] Remove obsolete
and unused InputTypeSerializer from WindowOperator
[hotfix] [streaming api] Remove obsolete and unused InputTypeSerializer from WindowOperator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1cd8d4f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1cd8d4f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1cd8d4f4
Branch: refs/heads/master
Commit: 1cd8d4f418a707790c091fed2428627eae9da423
Parents: 47e4977
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Oct 4 23:49:54 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Oct 5 20:04:34 2016 +0200
----------------------------------------------------------------------
.../operators/windowing/WindowOperator.java | 20 +------
.../windowing/EvictingWindowOperatorTest.java | 6 ---
.../operators/windowing/WindowOperatorTest.java | 56 +-------------------
.../streaming/util/WindowingTestHarness.java | 2 -
4 files changed, 3 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1cd8d4f4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index de316e7..c5f1ca2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -34,7 +33,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.fs.FSDataInputStream;
@@ -98,7 +96,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
- implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
+ implements OneInputStreamOperator<IN, OUT>, Triggerable {
private static final long serialVersionUID = 1L;
@@ -115,12 +113,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
/**
- * This is used to copy the incoming element because it can be put into several window
- * buffers.
- */
- protected TypeSerializer<IN> inputSerializer;
-
- /**
* For serializing the key in checkpoints.
*/
protected final TypeSerializer<K> keySerializer;
@@ -211,21 +203,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
@Override
- @SuppressWarnings("unchecked")
- public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
- inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
- }
-
- @Override
public final void open() throws Exception {
super.open();
timestampedCollector = new TimestampedCollector<>(output);
- if (inputSerializer == null) {
- throw new IllegalStateException("Input serializer was not set.");
- }
-
// these could already be initialized from restoreState()
if (watermarkTimers == null) {
watermarkTimers = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/1cd8d4f4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 681a334..8f3af15 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -82,8 +82,6 @@ public class EvictingWindowOperatorTest {
CountEvictor.of(WINDOW_SIZE),
0);
- operator.setInputType(inputType, new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -152,8 +150,6 @@ public class EvictingWindowOperatorTest {
CountEvictor.of(WINDOW_SIZE),
0);
- operator.setInputType(inputType, new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -221,8 +217,6 @@ public class EvictingWindowOperatorTest {
CountEvictor.of(WINDOW_SIZE),
0);
- operator.setInputType(inputType, new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
http://git-wip-us.apache.org/repos/asf/flink/blob/1cd8d4f4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index e98bc91..cda6524 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -180,8 +180,6 @@ public class WindowOperatorTest extends TestLogger {
EventTimeTrigger.create(),
0);
- operator.setInputType(inputType, new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -216,8 +214,6 @@ public class WindowOperatorTest extends TestLogger {
EventTimeTrigger.create(),
0);
- operator.setInputType(inputType, new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -317,8 +313,6 @@ public class WindowOperatorTest extends TestLogger {
EventTimeTrigger.create(),
0);
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -351,8 +345,6 @@ public class WindowOperatorTest extends TestLogger {
EventTimeTrigger.create(),
0);
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -388,8 +380,6 @@ public class WindowOperatorTest extends TestLogger {
EventTimeTrigger.create(),
0);
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -462,8 +452,6 @@ public class WindowOperatorTest extends TestLogger {
EventTimeTrigger.create(),
0);
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -538,8 +526,6 @@ public class WindowOperatorTest extends TestLogger {
PurgingTrigger.of(CountTrigger.of(4)),
0);
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -643,8 +629,6 @@ public class WindowOperatorTest extends TestLogger {
EventTimeTrigger.create(),
0);
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -703,8 +687,6 @@ public class WindowOperatorTest extends TestLogger {
ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
0);
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
@@ -793,9 +775,6 @@ public class WindowOperatorTest extends TestLogger {
PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
0);
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
- "Tuple2<String, Integer>"), new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
@@ -839,9 +818,6 @@ public class WindowOperatorTest extends TestLogger {
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
- "Tuple2<String, Integer>"), new ExecutionConfig());
-
testHarness.setup();
testHarness.restore(snapshot);
testHarness.open();
@@ -876,7 +852,6 @@ public class WindowOperatorTest extends TestLogger {
final int WINDOW_SLIDE = 1;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
- TestTimeServiceProvider timer = new TestTimeServiceProvider();
TestTimeServiceProvider timer = new TestTimeServiceProvider();
@@ -957,7 +932,6 @@ public class WindowOperatorTest extends TestLogger {
ProcessingTimeTrigger.create(), 0);
TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1017,7 +991,6 @@ public class WindowOperatorTest extends TestLogger {
ProcessingTimeTrigger.create(), 0);
TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1090,7 +1063,6 @@ public class WindowOperatorTest extends TestLogger {
ProcessingTimeTrigger.create(), 0);
TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1159,8 +1131,7 @@ public class WindowOperatorTest extends TestLogger {
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
-
- operator.setInputType(inputType, new ExecutionConfig());
+
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -1220,8 +1191,7 @@ public class WindowOperatorTest extends TestLogger {
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
-
- operator.setInputType(inputType, new ExecutionConfig());
+
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -1288,7 +1258,6 @@ public class WindowOperatorTest extends TestLogger {
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
- operator.setInputType(inputType, new ExecutionConfig());
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -1349,7 +1318,6 @@ public class WindowOperatorTest extends TestLogger {
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
- operator.setInputType(inputType, new ExecutionConfig());
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -1422,8 +1390,6 @@ public class WindowOperatorTest extends TestLogger {
PurgingTrigger.of(EventTimeTrigger.create()),
LATENESS);
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
@@ -1514,8 +1480,6 @@ public class WindowOperatorTest extends TestLogger {
EventTimeTrigger.create(),
LATENESS);
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
@@ -1600,8 +1564,6 @@ public class WindowOperatorTest extends TestLogger {
PurgingTrigger.of(EventTimeTrigger.create()),
LATENESS);
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
@@ -1686,8 +1648,6 @@ public class WindowOperatorTest extends TestLogger {
EventTimeTrigger.create(),
LATENESS);
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
@@ -1781,8 +1741,6 @@ public class WindowOperatorTest extends TestLogger {
PurgingTrigger.of(EventTimeTrigger.create()),
LATENESS);
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
@@ -1868,8 +1826,6 @@ public class WindowOperatorTest extends TestLogger {
EventTimeTrigger.create(),
LATENESS);
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
@@ -1958,7 +1914,6 @@ public class WindowOperatorTest extends TestLogger {
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
- operator.setInputType(inputType, new ExecutionConfig());
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -2013,7 +1968,6 @@ public class WindowOperatorTest extends TestLogger {
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
- operator.setInputType(inputType, new ExecutionConfig());
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -2060,7 +2014,6 @@ public class WindowOperatorTest extends TestLogger {
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
- operator.setInputType(inputType, new ExecutionConfig());
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -2116,7 +2069,6 @@ public class WindowOperatorTest extends TestLogger {
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
- operator.setInputType(inputType, new ExecutionConfig());
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -2162,7 +2114,6 @@ public class WindowOperatorTest extends TestLogger {
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
- operator.setInputType(inputType, new ExecutionConfig());
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -2204,8 +2155,6 @@ public class WindowOperatorTest extends TestLogger {
EventTimeTrigger.create(),
LATENESS);
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
@@ -2261,7 +2210,6 @@ public class WindowOperatorTest extends TestLogger {
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
- operator.setInputType(inputType, new ExecutionConfig());
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/1cd8d4f4/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index d47136c..ab8b70f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -80,8 +80,6 @@ public class WindowingTestHarness<K, IN, W extends Window> {
trigger,
allowedLateness);
- operator.setInputType(inputType, executionConfig);
-
timeServiceProvider = new TestTimeServiceProvider();
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, executionConfig, timeServiceProvider, keySelector, keyType);
}