You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/07 15:26:33 UTC

[5/6] flink git commit: [FLINK-4957] Remove Key Serializer Parameter getInternalTimerService()

[FLINK-4957] Remove Key Serializer Parameter getInternalTimerService()

It's not needed because we can get the key serializer from the keyed
state backend.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b873ac3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b873ac3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b873ac3

Branch: refs/heads/master
Commit: 0b873ac343343fd1d7716d075b54e66324374f47
Parents: 06fb9f1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 28 14:34:47 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/KeyedStream.java     |  2 +-
 .../api/operators/AbstractStreamOperator.java     | 13 ++++++-------
 .../api/operators/StreamTimelyFlatMap.java        |  9 ++-------
 .../api/operators/co/CoStreamTimelyFlatMap.java   | 11 ++---------
 .../operators/windowing/WindowOperator.java       |  2 +-
 .../api/operators/AbstractStreamOperatorTest.java |  1 -
 .../api/operators/TimelyFlatMapTest.java          | 18 +++++++++---------
 .../api/operators/co/TimelyCoFlatMapTest.java     | 16 ++++++++--------
 8 files changed, 29 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index c938f5b..4063b60 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -231,7 +231,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 			TypeInformation<R> outputType) {
 
 		StreamTimelyFlatMap<KEY, T, R> operator =
-				new StreamTimelyFlatMap<>(keyType.createSerializer(getExecutionConfig()), clean(flatMapper));
+				new StreamTimelyFlatMap<>(clean(flatMapper));
 
 		return transform("Flat Map", outputType, operator);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7b555b7..839abf8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -772,24 +772,21 @@ public abstract class AbstractStreamOperator<OUT>
 	 *
 	 * @param name The name of the requested timer service. If no service exists under the given
 	 *             name a new one will be created and returned.
-	 * @param keySerializer {@code TypeSerializer} for the keys of the timers.
 	 * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
 	 * @param triggerable The {@link Triggerable} that should be invoked when timers fire
 	 *
-	 * @param <K> The type of the timer keys.
 	 * @param <N> The type of the timer namespace.
 	 */
-	public <K, N> InternalTimerService<N> getInternalTimerService(
+	public <N> InternalTimerService<N> getInternalTimerService(
 			String name,
-			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer,
-			Triggerable<K, N> triggerable) {
+			Triggerable<?, N> triggerable) {
 		if (getKeyedStateBackend() == null) {
 			throw new UnsupportedOperationException("Timers can only be used on keyed operators.");
 		}
 
 		@SuppressWarnings("unchecked")
-		HeapInternalTimerService<K, N> timerService = (HeapInternalTimerService<K, N>) timerServices.get(name);
+		HeapInternalTimerService<Object, N> timerService = (HeapInternalTimerService<Object, N>) timerServices.get(name);
 
 		if (timerService == null) {
 			timerService = new HeapInternalTimerService<>(
@@ -799,7 +796,9 @@ public abstract class AbstractStreamOperator<OUT>
 				getRuntimeContext().getProcessingTimeService());
 			timerServices.put(name, timerService);
 		}
-		timerService.startTimerService(keySerializer, namespaceSerializer, triggerable);
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		Triggerable rawTriggerable = (Triggerable) triggerable;
+		timerService.startTimerService(getKeyedStateBackend().getKeySerializer(), namespaceSerializer, rawTriggerable);
 		return timerService;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
index 962f264..d507ba6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.SimpleTimerService;
@@ -34,17 +33,13 @@ public class StreamTimelyFlatMap<K, IN, OUT>
 
 	private static final long serialVersionUID = 1L;
 
-	private final TypeSerializer<K> keySerializer;
-
 	private transient TimestampedCollector<OUT> collector;
 
 	private transient TimerService timerService;
 
-	public StreamTimelyFlatMap(TypeSerializer<K> keySerializer, TimelyFlatMapFunction<IN, OUT> flatMapper) {
+	public StreamTimelyFlatMap(TimelyFlatMapFunction<IN, OUT> flatMapper) {
 		super(flatMapper);
 
-		this.keySerializer = keySerializer;
-
 		chainingStrategy = ChainingStrategy.ALWAYS;
 	}
 
@@ -54,7 +49,7 @@ public class StreamTimelyFlatMap<K, IN, OUT>
 		collector = new TimestampedCollector<>(output);
 
 		InternalTimerService<VoidNamespace> internalTimerService =
-				getInternalTimerService("user-timers", keySerializer, VoidNamespaceSerializer.INSTANCE, this);
+				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
 
 		this.timerService = new SimpleTimerService(internalTimerService);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
index df2320f..212aafd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.operators.co;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.SimpleTimerService;
@@ -40,18 +39,12 @@ public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
 
 	private static final long serialVersionUID = 1L;
 
-	private final TypeSerializer<K> keySerializer;
-
 	private transient TimestampedCollector<OUT> collector;
 
 	private transient TimerService timerService;
 
-	public CoStreamTimelyFlatMap(
-			TypeSerializer<K> keySerializer,
-			TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
+	public CoStreamTimelyFlatMap(TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
 		super(flatMapper);
-
-		this.keySerializer = keySerializer;
 	}
 
 	@Override
@@ -60,7 +53,7 @@ public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
 		collector = new TimestampedCollector<>(output);
 
 		InternalTimerService<VoidNamespace> internalTimerService =
-				getInternalTimerService("user-timers", keySerializer, VoidNamespaceSerializer.INSTANCE, this);
+				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
 
 		this.timerService = new SimpleTimerService(internalTimerService);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index c465767..229d97d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -187,7 +187,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		timestampedCollector = new TimestampedCollector<>(output);
 
 		internalTimerService =
-				getInternalTimerService("window-timers", keySerializer, windowSerializer, this);
+				getInternalTimerService("window-timers", windowSerializer, this);
 
 		context = new Context(null, null);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index fd05353..2fb0089 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -448,7 +448,6 @@ public class AbstractStreamOperatorTest {
 
 			this.timerService = getInternalTimerService(
 					"test-timers",
-					IntSerializer.INSTANCE,
 					VoidNamespaceSerializer.INSTANCE,
 					this);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
index f3b09eb..46b52ee 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
@@ -49,7 +49,7 @@ public class TimelyFlatMapTest extends TestLogger {
 	public void testCurrentEventTime() throws Exception {
 
 		StreamTimelyFlatMap<Integer, Integer, String> operator =
-				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
+				new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -79,7 +79,7 @@ public class TimelyFlatMapTest extends TestLogger {
 	public void testCurrentProcessingTime() throws Exception {
 
 		StreamTimelyFlatMap<Integer, Integer, String> operator =
-				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
+				new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -107,7 +107,7 @@ public class TimelyFlatMapTest extends TestLogger {
 	public void testEventTimeTimers() throws Exception {
 
 		StreamTimelyFlatMap<Integer, Integer, Integer> operator =
-				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
+				new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
 
 		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -137,7 +137,7 @@ public class TimelyFlatMapTest extends TestLogger {
 	public void testProcessingTimeTimers() throws Exception {
 
 		StreamTimelyFlatMap<Integer, Integer, Integer> operator =
-				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
+				new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
 
 		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -166,7 +166,7 @@ public class TimelyFlatMapTest extends TestLogger {
 	public void testEventTimeTimerWithState() throws Exception {
 
 		StreamTimelyFlatMap<Integer, Integer, String> operator =
-				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
+				new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -206,7 +206,7 @@ public class TimelyFlatMapTest extends TestLogger {
 	public void testProcessingTimeTimerWithState() throws Exception {
 
 		StreamTimelyFlatMap<Integer, Integer, String> operator =
-				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
+				new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -239,7 +239,7 @@ public class TimelyFlatMapTest extends TestLogger {
 	public void testSnapshotAndRestore() throws Exception {
 
 		StreamTimelyFlatMap<Integer, Integer, String> operator =
-				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+				new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -254,7 +254,7 @@ public class TimelyFlatMapTest extends TestLogger {
 
 		testHarness.close();
 
-		operator = new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+		operator = new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
 
 		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
 
@@ -352,7 +352,7 @@ public class TimelyFlatMapTest extends TestLogger {
 		private static final long serialVersionUID = 1L;
 
 		private final ValueStateDescriptor<Integer> state =
-				new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null);
+				new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE,  null);
 
 		private final TimeDomain timeDomain;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
index 25808f4..cb5d6c2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
@@ -49,7 +49,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
 	public void testCurrentEventTime() throws Exception {
 
 		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction());
+				new CoStreamTimelyFlatMap<>(new WatermarkQueryingFlatMapFunction());
 
 		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
 				new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -85,7 +85,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
 	public void testCurrentProcessingTime() throws Exception {
 
 		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction());
+				new CoStreamTimelyFlatMap<>(new ProcessingTimeQueryingFlatMapFunction());
 
 		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
 				new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -117,7 +117,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
 	public void testEventTimeTimers() throws Exception {
 
 		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new EventTimeTriggeringFlatMapFunction());
+				new CoStreamTimelyFlatMap<>(new EventTimeTriggeringFlatMapFunction());
 
 		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
 				new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -156,7 +156,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
 	public void testProcessingTimeTimers() throws Exception {
 
 		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeTriggeringFlatMapFunction());
+				new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringFlatMapFunction());
 
 		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
 				new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -193,7 +193,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
 	public void testEventTimeTimerWithState() throws Exception {
 
 		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new EventTimeTriggeringStatefulFlatMapFunction());
+				new CoStreamTimelyFlatMap<>(new EventTimeTriggeringStatefulFlatMapFunction());
 
 		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
 				new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -242,7 +242,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
 	public void testProcessingTimeTimerWithState() throws Exception {
 
 		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeTriggeringStatefulFlatMapFunction());
+				new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringStatefulFlatMapFunction());
 
 		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
 				new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -279,7 +279,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
 	public void testSnapshotAndRestore() throws Exception {
 
 		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+				new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
 
 		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
 				new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -299,7 +299,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
 
 		testHarness.close();
 
-		operator = new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+		operator = new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
 
 		testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
 				operator,