You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/21 17:14:26 UTC

[11/11] flink git commit: [FLINK-4877] Refactor Operator TestHarnesses to use Common Base Class

[FLINK-4877] Refactor Operator TestHarnesses to use Common Base Class

This also introduces KeyedTwoInputStreamOperatorTestHarness which
is similar to KeyedOneInputStreamOperatorTestHarness


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f305baab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f305baab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f305baab

Branch: refs/heads/master
Commit: f305baabbcd9b035b696b8b7a4334deaf1b28d23
Parents: 0859a69
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 29 16:04:29 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 21 19:03:05 2016 +0200

----------------------------------------------------------------------
 .../runtime/streamrecord/StreamRecord.java      |   2 +-
 .../util/AbstractStreamOperatorTestHarness.java | 366 +++++++++++++++++++
 .../KeyedOneInputStreamOperatorTestHarness.java |  18 +-
 .../KeyedTwoInputStreamOperatorTestHarness.java | 144 ++++++++
 .../util/OneInputStreamOperatorTestHarness.java | 328 +----------------
 .../util/TwoInputStreamOperatorTestHarness.java | 130 +------
 6 files changed, 537 insertions(+), 451 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
index 9f75161..da606a9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
@@ -171,7 +171,7 @@ public final class StreamRecord<T> extends StreamElement {
 		else if (o != null && getClass() == o.getClass()) {
 			StreamRecord<?> that = (StreamRecord<?>) o;
 			return this.hasTimestamp == that.hasTimestamp &&
-					this.timestamp == that.timestamp &&
+					(!this.hasTimestamp || this.timestamp == that.timestamp) &&
 					(this.value == null ? that.value == null : this.value.equals(that.value));
 		}
 		else {

http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
new file mode 100644
index 0000000..a61d995
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ClosableRegistry;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+/**
+ * Base class for {@code AbstractStreamOperator} test harnesses.
+ */
+public class AbstractStreamOperatorTestHarness<OUT> {
+
+	public static final int MAX_PARALLELISM = 10;
+
+	final protected StreamOperator<OUT> operator;
+
+	final protected ConcurrentLinkedQueue<Object> outputList;
+
+	final protected StreamConfig config;
+
+	final protected ExecutionConfig executionConfig;
+
+	final protected TestProcessingTimeService processingTimeService;
+
+	final protected StreamTask<?, ?> mockTask;
+
+	ClosableRegistry closableRegistry;
+
+	// use this as default for tests
+	protected AbstractStateBackend stateBackend = new MemoryStateBackend();
+
+	private final Object checkpointLock;
+
+	/**
+	 * Whether setup() was called on the operator. This is reset when calling close().
+	 */
+	private boolean setupCalled = false;
+	private boolean initializeCalled = false;
+
+	private volatile boolean wasFailedExternally = false;
+
+	public AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator) throws Exception {
+		this(operator, new ExecutionConfig());
+	}
+
+	public AbstractStreamOperatorTestHarness(
+			StreamOperator<OUT> operator,
+			ExecutionConfig executionConfig) throws Exception {
+		this.operator = operator;
+		this.outputList = new ConcurrentLinkedQueue<>();
+		Configuration underlyingConfig = new Configuration();
+		this.config = new StreamConfig(underlyingConfig);
+		this.config.setCheckpointingEnabled(true);
+		this.executionConfig = executionConfig;
+		this.closableRegistry = new ClosableRegistry();
+		this.checkpointLock = new Object();
+
+		final Environment env = new MockEnvironment(
+				"MockTask",
+				3 * 1024 * 1024,
+				new MockInputSplitProvider(),
+				1024,
+				underlyingConfig,
+				executionConfig,
+				MAX_PARALLELISM,
+				1, 0);
+
+		mockTask = mock(StreamTask.class);
+		processingTimeService = new TestProcessingTimeService();
+		processingTimeService.setCurrentTime(0);
+
+		when(mockTask.getName()).thenReturn("Mock Task");
+		when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
+		when(mockTask.getConfiguration()).thenReturn(config);
+		when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
+		when(mockTask.getEnvironment()).thenReturn(env);
+		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+		when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
+		when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				wasFailedExternally = true;
+				return null;
+			}
+		}).when(mockTask).handleAsyncException(any(String.class), any(Throwable.class));
+
+		try {
+			doAnswer(new Answer<CheckpointStreamFactory>() {
+				@Override
+				public CheckpointStreamFactory answer(InvocationOnMock invocationOnMock) throws Throwable {
+
+					final StreamOperator<?> operator = (StreamOperator<?>) invocationOnMock.getArguments()[0];
+					return stateBackend.createStreamFactory(new JobID(), operator.getClass().getSimpleName());
+				}
+			}).when(mockTask).createCheckpointStreamFactory(any(StreamOperator.class));
+		} catch (Exception e) {
+			throw new RuntimeException(e.getMessage(), e);
+		}
+
+		try {
+			doAnswer(new Answer<OperatorStateBackend>() {
+				@Override
+				public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
+					final StreamOperator<?> operator = (StreamOperator<?>) invocationOnMock.getArguments()[0];
+					final Collection<OperatorStateHandle> stateHandles = (Collection<OperatorStateHandle>) invocationOnMock.getArguments()[1];
+					OperatorStateBackend osb;
+					if (null == stateHandles) {
+						osb = stateBackend.createOperatorStateBackend(env, operator.getClass().getSimpleName());
+					} else {
+						osb = stateBackend.restoreOperatorStateBackend(env, operator.getClass().getSimpleName(), stateHandles);
+					}
+					mockTask.getCancelables().registerClosable(osb);
+					return osb;
+				}
+			}).when(mockTask).createOperatorStateBackend(any(StreamOperator.class), any(Collection.class));
+		} catch (Exception e) {
+			throw new RuntimeException(e.getMessage(), e);
+		}
+
+		doAnswer(new Answer<ProcessingTimeService>() {
+			@Override
+			public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable {
+				return processingTimeService;
+			}
+		}).when(mockTask).getProcessingTimeService();
+	}
+
+	public void setStateBackend(AbstractStateBackend stateBackend) {
+		this.stateBackend = stateBackend;
+	}
+
+	public Object getCheckpointLock() {
+		return mockTask.getCheckpointLock();
+	}
+
+	public Environment getEnvironment() {
+		return this.mockTask.getEnvironment();
+	}
+
+	/**
+	 * Get all the output from the task. This contains StreamRecords and Events interleaved.
+	 */
+	public ConcurrentLinkedQueue<Object> getOutput() {
+		return outputList;
+	}
+
+	/**
+	 * Get all the output from the task and clear the output buffer.
+	 * This contains only StreamRecords.
+	 */
+	@SuppressWarnings("unchecked")
+	public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() {
+		List<StreamRecord<? extends OUT>> resultElements = new LinkedList<>();
+		for (Object e: getOutput()) {
+			if (e instanceof StreamRecord) {
+				resultElements.add((StreamRecord<OUT>) e);
+			}
+		}
+		return resultElements;
+	}
+
+	/**
+	 * Calls
+	 * {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
+	 */
+	public void setup() throws Exception {
+		operator.setup(mockTask, config, new MockOutput());
+		setupCalled = true;
+	}
+
+	/**
+	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
+	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)}
+	 * if it was not called before.
+	 */
+	public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
+		if (!setupCalled) {
+			setup();
+		}
+		operator.initializeState(operatorStateHandles);
+		initializeCalled = true;
+	}
+
+
+	/**
+	 * Calls {@link StreamOperator#open()}. This also
+	 * calls {@link StreamOperator#setup(StreamTask, StreamConfig, Output)}
+	 * if it was not called before.
+	 */
+	public void open() throws Exception {
+		if (!initializeCalled) {
+			initializeState(null);
+		}
+		operator.open();
+	}
+
+	/**
+	 * Calls {@link StreamOperator#snapshotState(long, long, CheckpointStreamFactory)}.
+	 */
+	public OperatorSnapshotResult snapshot(long checkpointId, long timestamp) throws Exception {
+
+		CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory(
+				new JobID(),
+				"test_op");
+
+		return operator.snapshotState(checkpointId, timestamp, streamFactory);
+	}
+
+	/**
+	 * Calls {@link StreamCheckpointedOperator#snapshotState(FSDataOutputStream, long, long)} if
+	 * the operator implements this interface.
+	 */
+	@Deprecated
+	public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throws Exception {
+
+		CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory(
+				new JobID(),
+				"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);
+		if(operator instanceof StreamCheckpointedOperator) {
+			((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp);
+			return outStream.closeAndGetHandle();
+		} else {
+			throw new RuntimeException("Operator is not StreamCheckpointedOperator");
+		}
+	}
+
+	/**
+	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)} ()}
+	 */
+	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
+		operator.notifyOfCompletedCheckpoint(checkpointId);
+	}
+
+	/**
+	 * Calls {@link StreamCheckpointedOperator#restoreState(FSDataInputStream)} if
+	 * the operator implements this interface.
+	 */	@Deprecated
+	public void restore(StreamStateHandle snapshot) throws Exception {
+		if(operator instanceof StreamCheckpointedOperator) {
+			try (FSDataInputStream in = snapshot.openInputStream()) {
+				((StreamCheckpointedOperator) operator).restoreState(in);
+			}
+		} else {
+			throw new RuntimeException("Operator is not StreamCheckpointedOperator");
+		}
+	}
+
+	/**
+	 * Calls close and dispose on the operator.
+	 */
+	public void close() throws Exception {
+		operator.close();
+		operator.dispose();
+		if (processingTimeService != null) {
+			processingTimeService.shutdownService();
+		}
+		setupCalled = false;
+	}
+
+	public void setProcessingTime(long time) throws Exception {
+		processingTimeService.setCurrentTime(time);
+	}
+
+	public long getProcessingTime() {
+		return processingTimeService.getCurrentProcessingTime();
+	}
+
+	public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
+		this.config.setTimeCharacteristic(timeCharacteristic);
+	}
+
+	public TimeCharacteristic getTimeCharacteristic() {
+		return this.config.getTimeCharacteristic();
+	}
+
+	public boolean wasFailedExternally() {
+		return wasFailedExternally;
+	}
+
+	private class MockOutput implements Output<StreamRecord<OUT>> {
+
+		private TypeSerializer<OUT> outputSerializer;
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			outputList.add(mark);
+		}
+
+		@Override
+		public void emitLatencyMarker(LatencyMarker latencyMarker) {
+			outputList.add(latencyMarker);
+		}
+
+		@Override
+		public void collect(StreamRecord<OUT> element) {
+			if (outputSerializer == null) {
+				outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
+			}
+			if (element.hasTimestamp()) {
+				outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()),element.getTimestamp()));
+			} else {
+				outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue())));
+			}
+		}
+
+		@Override
+		public void close() {
+			// ignore
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 9c9d11b..99527e7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -50,7 +50,6 @@ import static org.mockito.Mockito.doAnswer;
 /**
  * Extension of {@link OneInputStreamOperatorTestHarness} that allows the operator to get
  * a {@link KeyedStateBackend}.
- *
  */
 public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 		extends OneInputStreamOperatorTestHarness<IN, OUT> {
@@ -171,7 +170,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 	}
 
 	/**
-	 * 
+	 *
 	 */
 	@Override
 	public void restore(StreamStateHandle snapshot) throws Exception {
@@ -189,21 +188,12 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 		}
 	}
 
-	/**
-	 * Calls close and dispose on the operator.
-	 */
-	public void close() throws Exception {
-		super.close();
-		if (keyedStateBackend != null) {
-			keyedStateBackend.dispose();
-		}
-	}
-
 	@Override
 	public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
-		if (null != operatorStateHandles) {
-			this.restoredKeyedState = operatorStateHandles.getManagedKeyedState();
+		if (operatorStateHandles != null) {
+			restoredKeyedState = operatorStateHandles.getManagedKeyedState();
 		}
+
 		super.initializeState(operatorStateHandles);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
new file mode 100644
index 0000000..2e9885c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Collection;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doAnswer;
+
+/**
+ * Extension of {@link TwoInputStreamOperatorTestHarness} that allows the operator to get
+ * a {@link KeyedStateBackend}.
+ */
+public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
+		extends TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
+
+	// in case the operator creates one we store it here so that we
+	// can snapshot its state
+	private AbstractKeyedStateBackend<?> keyedStateBackend = null;
+
+	// when we restore we keep the state here so that we can call restore
+	// when the operator requests the keyed state backend
+	private Collection<KeyGroupsStateHandle> restoredKeyedState = null;
+
+	public KeyedTwoInputStreamOperatorTestHarness(
+			TwoInputStreamOperator<IN1, IN2, OUT> operator,
+			final KeySelector<IN1, K> keySelector1,
+			final KeySelector<IN2, K> keySelector2,
+			TypeInformation<K> keyType) throws Exception {
+		super(operator);
+
+		ClosureCleaner.clean(keySelector1, false);
+		ClosureCleaner.clean(keySelector2, false);
+		config.setStatePartitioner(0, keySelector1);
+		config.setStatePartitioner(1, keySelector2);
+		config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+		config.setNumberOfKeyGroups(MAX_PARALLELISM);
+
+		setupMockTaskCreateKeyedBackend();
+	}
+
+	public KeyedTwoInputStreamOperatorTestHarness(
+			TwoInputStreamOperator<IN1, IN2, OUT> operator,
+			ExecutionConfig executionConfig,
+			KeySelector<IN1, K> keySelector1,
+			KeySelector<IN2, K> keySelector2,
+			TypeInformation<K> keyType) throws Exception {
+		super(operator, executionConfig);
+
+		ClosureCleaner.clean(keySelector1, false);
+		ClosureCleaner.clean(keySelector2, false);
+		config.setStatePartitioner(0, keySelector1);
+		config.setStatePartitioner(1, keySelector2);
+		config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+		config.setNumberOfKeyGroups(MAX_PARALLELISM);
+
+		setupMockTaskCreateKeyedBackend();
+	}
+
+	private void setupMockTaskCreateKeyedBackend() {
+
+		try {
+			doAnswer(new Answer<KeyedStateBackend>() {
+				@Override
+				public KeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
+
+					final TypeSerializer keySerializer = (TypeSerializer) invocationOnMock.getArguments()[0];
+					final int numberOfKeyGroups = (Integer) invocationOnMock.getArguments()[1];
+					final KeyGroupRange keyGroupRange = (KeyGroupRange) invocationOnMock.getArguments()[2];
+
+					if(keyedStateBackend != null) {
+						keyedStateBackend.close();
+					}
+
+					if (restoredKeyedState == null) {
+						keyedStateBackend = stateBackend.createKeyedStateBackend(
+								mockTask.getEnvironment(),
+								new JobID(),
+								"test_op",
+								keySerializer,
+								numberOfKeyGroups,
+								keyGroupRange,
+								mockTask.getEnvironment().getTaskKvStateRegistry());
+						return keyedStateBackend;
+					} else {
+						keyedStateBackend = stateBackend.restoreKeyedStateBackend(
+								mockTask.getEnvironment(),
+								new JobID(),
+								"test_op",
+								keySerializer,
+								numberOfKeyGroups,
+								keyGroupRange,
+								restoredKeyedState,
+								mockTask.getEnvironment().getTaskKvStateRegistry());
+						restoredKeyedState = null;
+						return keyedStateBackend;
+					}
+				}
+			}).when(mockTask).createKeyedStateBackend(any(TypeSerializer.class), anyInt(), any(KeyGroupRange.class));
+		} catch (Exception e) {
+			throw new RuntimeException(e.getMessage(), e);
+		}
+	}
+
+	@Override
+	public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
+		if (restoredKeyedState != null) {
+			restoredKeyedState = operatorStateHandles.getManagedKeyedState();
+		}
+
+		super.initializeState(operatorStateHandles);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 5b277bf..a3e095a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -18,89 +18,23 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.ClosableRegistry;
-import org.apache.flink.runtime.state.OperatorStateBackend;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.util.InstantiationUtil;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.RunnableFuture;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * A test harness for testing a {@link OneInputStreamOperator}.
  *
- * <p>
- * This mock task provides the operator with a basic runtime context and allows pushing elements
+ * <p>This mock task provides the operator with a basic runtime context and allows pushing elements
  * and watermarks into the operator. {@link java.util.Deque}s containing the emitted elements
  * and watermarks can be retrieved. You are free to modify these.
  */
-public class OneInputStreamOperatorTestHarness<IN, OUT> {
-
-	public static final int MAX_PARALLELISM = 10;
-
-	final OneInputStreamOperator<IN, OUT> operator;
-
-	final ConcurrentLinkedQueue<Object> outputList;
-
-	final StreamConfig config;
-
-	final ExecutionConfig executionConfig;
-
-	final TestProcessingTimeService processingTimeService;
-
-	StreamTask<?, ?> mockTask;
-
-	ClosableRegistry closableRegistry;
-
-	// use this as default for tests
-	AbstractStateBackend stateBackend = new MemoryStateBackend();
+public class OneInputStreamOperatorTestHarness<IN, OUT>
+		extends AbstractStreamOperatorTestHarness<OUT> {
 
-	private final Object checkpointLock;
-
-	/**
-	 * Whether setup() was called on the operator. This is reset when calling close().
-	 */
-	private boolean setupCalled = false;
-	private boolean initializeCalled = false;
-
-	private volatile boolean wasFailedExternally = false;
+	private final OneInputStreamOperator<IN, OUT> oneInputOperator;
 
 	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) throws Exception {
 		this(operator, new ExecutionConfig());
@@ -109,268 +43,24 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	public OneInputStreamOperatorTestHarness(
 			OneInputStreamOperator<IN, OUT> operator,
 			ExecutionConfig executionConfig) throws Exception {
-		this.operator = operator;
-		this.outputList = new ConcurrentLinkedQueue<>();
-		Configuration underlyingConfig = new Configuration();
-		this.config = new StreamConfig(underlyingConfig);
-		this.config.setCheckpointingEnabled(true);
-		this.executionConfig = executionConfig;
-		this.closableRegistry = new ClosableRegistry();
-
-		this.checkpointLock = new Object();
-
-		final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, executionConfig, MAX_PARALLELISM, 1, 0);
-		mockTask = mock(StreamTask.class);
-		processingTimeService = new TestProcessingTimeService();
-		processingTimeService.setCurrentTime(0);
-
-		when(mockTask.getName()).thenReturn("Mock Task");
-		when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
-		when(mockTask.getConfiguration()).thenReturn(config);
-		when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
-		when(mockTask.getEnvironment()).thenReturn(env);
-		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-		when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
-		when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
-
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) throws Throwable {
-				wasFailedExternally = true;
-				return null;
-			}
-		}).when(mockTask).handleAsyncException(any(String.class), any(Throwable.class));
-
-		try {
-			doAnswer(new Answer<CheckpointStreamFactory>() {
-				@Override
-				public CheckpointStreamFactory answer(InvocationOnMock invocationOnMock) throws Throwable {
-
-					final StreamOperator<?> operator = (StreamOperator<?>) invocationOnMock.getArguments()[0];
-					return stateBackend.createStreamFactory(new JobID(), operator.getClass().getSimpleName());
-				}
-			}).when(mockTask).createCheckpointStreamFactory(any(StreamOperator.class));
-		} catch (Exception e) {
-			throw new RuntimeException(e.getMessage(), e);
-		}
-
-		try {
-			doAnswer(new Answer<OperatorStateBackend>() {
-				@Override
-				public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
-					final StreamOperator<?> operator = (StreamOperator<?>) invocationOnMock.getArguments()[0];
-					final Collection<OperatorStateHandle> stateHandles = (Collection<OperatorStateHandle>) invocationOnMock.getArguments()[1];
-					OperatorStateBackend osb;
-					if (null == stateHandles) {
-						osb = stateBackend.createOperatorStateBackend(env, operator.getClass().getSimpleName());
-					} else {
-						osb = stateBackend.restoreOperatorStateBackend(env, operator.getClass().getSimpleName(), stateHandles);
-					}
-					mockTask.getCancelables().registerClosable(osb);
-					return osb;
-				}
-			}).when(mockTask).createOperatorStateBackend(any(StreamOperator.class), any(Collection.class));
-		} catch (Exception e) {
-			throw new RuntimeException(e.getMessage(), e);
-		}
-
-		doAnswer(new Answer<ProcessingTimeService>() {
-			@Override
-			public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable {
-				return processingTimeService;
-			}
-		}).when(mockTask).getProcessingTimeService();
-	}
-
-	public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
-		this.config.setTimeCharacteristic(timeCharacteristic);
-	}
-
-	public TimeCharacteristic getTimeCharacteristic() {
-		return this.config.getTimeCharacteristic();
-	}
-
-	public boolean wasFailedExternally() {
-		return wasFailedExternally;
-	}
-
-	public void setStateBackend(AbstractStateBackend stateBackend) {
-		this.stateBackend = stateBackend;
-	}
-
-	public Object getCheckpointLock() {
-		return mockTask.getCheckpointLock();
-	}
-
-	public Environment getEnvironment() {
-		return this.mockTask.getEnvironment();
-	}
-
-	/**
-	 * Get all the output from the task. This contains StreamRecords and Events interleaved.
-	 */
-	public ConcurrentLinkedQueue<Object> getOutput() {
-		return outputList;
-	}
-
-	/**
-	 * Get all the output from the task and clear the output buffer.
-	 * This contains only StreamRecords.
-	 */
-	@SuppressWarnings("unchecked")
-	public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() {
-		List<StreamRecord<? extends OUT>> resultElements = new LinkedList<>();
-		for (Object e: getOutput()) {
-			if (e instanceof StreamRecord) {
-				resultElements.add((StreamRecord<OUT>) e);
-			}
-		}
-		return resultElements;
-	}
-
-	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
-	 */
-	public void setup() throws Exception {
-		operator.setup(mockTask, config, new MockOutput());
-		setupCalled = true;
-	}
-
-	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)}
-	 * if it was not called before.
-	 */
-	public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
-		if (!setupCalled) {
-			setup();
-		}
-		operator.initializeState(operatorStateHandles);
-		initializeCalled = true;
-	}
-
-	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}.
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)} if it
-	 * was not called before.
-	 */
-	public void open() throws Exception {
-		if (!initializeCalled) {
-			initializeState(null);
-		}
-		operator.open();
-	}
-
-	/**
-	 *
-	 */
-	public OperatorSnapshotResult snapshot(long checkpointId, long timestamp) throws Exception {
-
-		CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory(
-				new JobID(),
-				"test_op");
-
-		return operator.snapshotState(checkpointId, timestamp, streamFactory);
-	}
-
-	/**
-	 *
-	 */
-	@Deprecated
-	public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throws Exception {
-
-		CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory(
-				new JobID(),
-				"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);
-		if(operator instanceof StreamCheckpointedOperator) {
-			((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp);
-			return outStream.closeAndGetHandle();
-		} else {
-			throw new RuntimeException("Operator is not StreamCheckpointedOperator");
-		}
-	}
-
-	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)} ()}
-	 */
-	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
-		operator.notifyOfCompletedCheckpoint(checkpointId);
-	}
+		super(operator, executionConfig);
 
-	/**
-	 *
-	 */
-	@Deprecated
-	public void restore(StreamStateHandle snapshot) throws Exception {
-		if(operator instanceof StreamCheckpointedOperator) {
-			try (FSDataInputStream in = snapshot.openInputStream()) {
-				((StreamCheckpointedOperator) operator).restoreState(in);
-			}
-		} else {
-			throw new RuntimeException("Operator is not StreamCheckpointedOperator");
-		}
-	}
-
-	/**
-	 * Calls close and dispose on the operator.
-	 */
-	public void close() throws Exception {
-		operator.close();
-		operator.dispose();
-		if (processingTimeService != null) {
-			processingTimeService.shutdownService();
-		}
-		setupCalled = false;
+		this.oneInputOperator = operator;
 	}
 
 	public void processElement(StreamRecord<IN> element) throws Exception {
 		operator.setKeyContextElement1(element);
-		operator.processElement(element);
+		oneInputOperator.processElement(element);
 	}
 
 	public void processElements(Collection<StreamRecord<IN>> elements) throws Exception {
 		for (StreamRecord<IN> element: elements) {
 			operator.setKeyContextElement1(element);
-			operator.processElement(element);
-		}
-	}
-
-	public void setProcessingTime(long time) throws Exception {
-		synchronized (checkpointLock) {
-			processingTimeService.setCurrentTime(time);
+			oneInputOperator.processElement(element);
 		}
 	}
 
 	public void processWatermark(Watermark mark) throws Exception {
-		operator.processWatermark(mark);
-	}
-
-	private class MockOutput implements Output<StreamRecord<OUT>> {
-
-		private TypeSerializer<OUT> outputSerializer;
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			outputList.add(mark);
-		}
-
-		@Override
-		public void emitLatencyMarker(LatencyMarker latencyMarker) {
-			outputList.add(latencyMarker);
-		}
-
-		@Override
-		public void collect(StreamRecord<OUT> element) {
-			if (outputSerializer == null) {
-				outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
-			}
-			outputList.add(new StreamRecord<OUT>(outputSerializer.copy(element.getValue()),
-					element.getTimestamp()));
-		}
-
-		@Override
-		public void close() {
-			// ignore
-		}
+		oneInputOperator.processWatermark(mark);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index 7df6848..95eea98 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -19,26 +19,9 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.ClosableRegistry;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * A test harness for testing a {@link TwoInputStreamOperator}.
@@ -48,122 +31,35 @@ import static org.mockito.Mockito.when;
  * and watermarks into the operator. {@link java.util.Deque}s containing the emitted elements
  * and watermarks can be retrieved. you are free to modify these.
  */
-public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
-
-	TwoInputStreamOperator<IN1, IN2, OUT> operator;
-
-	final ConcurrentLinkedQueue<Object> outputList;
-
-	final ExecutionConfig executionConfig;
-
-	final Object checkpointLock;
+public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT>extends AbstractStreamOperatorTestHarness<OUT> {
 
-	final ClosableRegistry closableRegistry;
+	private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator;
 
-	boolean initializeCalled = false;
-
-	public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) {
-		this(operator, new StreamConfig(new Configuration()));
+	public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) throws Exception {
+		this(operator, new ExecutionConfig());
 	}
 		
-	public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator, StreamConfig config) {
-		this.operator = operator;
-		this.outputList = new ConcurrentLinkedQueue<Object>();
-		this.executionConfig = new ExecutionConfig();
-		this.checkpointLock = new Object();
-		this.closableRegistry = new ClosableRegistry();
-
-		Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
-		StreamTask<?, ?> mockTask = mock(StreamTask.class);
-		when(mockTask.getName()).thenReturn("Mock Task");
-		when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
-		when(mockTask.getConfiguration()).thenReturn(config);
-		when(mockTask.getEnvironment()).thenReturn(env);
-		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-		when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
-
-		operator.setup(mockTask, new StreamConfig(new Configuration()), new MockOutput());
-	}
-
-	/**
-	 * Get all the output from the task. This contains StreamRecords and Events interleaved. Use
-	 * {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
-	 * to extract only the StreamRecords.
-	 */
-	public ConcurrentLinkedQueue<Object> getOutput() {
-		return outputList;
-	}
+	public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator, ExecutionConfig executionConfig) throws Exception {
+		super(operator, executionConfig);
 
-	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
-	 */
-	public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
-		operator.initializeState(operatorStateHandles);
-		initializeCalled = true;
-	}
-
-	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}.
-	 */
-	public void open() throws Exception {
-		if(!initializeCalled) {
-			initializeState(mock(OperatorStateHandles.class));
-		}
-
-		operator.open();
-	}
-
-	/**
-	 * Calls close on the operator.
-	 */
-	public void close() throws Exception {
-		operator.close();
+		this.twoInputOperator = operator;
 	}
 
 	public void processElement1(StreamRecord<IN1> element) throws Exception {
-		operator.processElement1(element);
+		twoInputOperator.setKeyContextElement1(element);
+		twoInputOperator.processElement1(element);
 	}
 
 	public void processElement2(StreamRecord<IN2> element) throws Exception {
-		operator.processElement2(element);
+		twoInputOperator.setKeyContextElement2(element);
+		twoInputOperator.processElement2(element);
 	}
 
 	public void processWatermark1(Watermark mark) throws Exception {
-		operator.processWatermark1(mark);
+		twoInputOperator.processWatermark1(mark);
 	}
 
 	public void processWatermark2(Watermark mark) throws Exception {
-		operator.processWatermark2(mark);
-	}
-
-	private class MockOutput implements Output<StreamRecord<OUT>> {
-
-		private TypeSerializer<OUT> outputSerializer;
-
-		@Override
-		@SuppressWarnings("unchecked")
-		public void emitWatermark(Watermark mark) {
-			outputList.add(mark);
-		}
-
-		@Override
-		public void emitLatencyMarker(LatencyMarker latencyMarker) {
-			outputList.add(latencyMarker);
-		}
-
-		@Override
-		@SuppressWarnings("unchecked")
-		public void collect(StreamRecord<OUT> element) {
-			if (outputSerializer == null) {
-				outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
-			}
-			outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()),
-					element.getTimestamp()));
-		}
-
-		@Override
-		public void close() {
-			// ignore
-		}
+		twoInputOperator.processWatermark2(mark);
 	}
 }