You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/03/25 14:07:15 UTC

[flink] 10/11: [FLINK-16316][operators] Implement new AbstractStreamOperatorV2 as a replacement for AbstractStreamOperator

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 16a0334879ac468b8cb5bda67116b5eab6549d1a
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Feb 26 09:39:53 2020 +0100

    [FLINK-16316][operators] Implement new AbstractStreamOperatorV2 as a replacement for AbstractStreamOperator
    
    The new base class for operators tries to address couple of limitations in the AbstractStreamOperator like:
    - lack of support for multiple inputs
    - setup(...) method
---
 .../api/operators/AbstractStreamOperator.java      |  10 +-
 ...Operator.java => AbstractStreamOperatorV2.java} | 288 +++++++--------------
 .../api/operators/StreamOperatorParameters.java    |   6 +-
 .../runtime/tasks/MultipleInputStreamTaskTest.java |  65 ++++-
 .../util/TestBoundedMultipleInputOperator.java     |   8 +-
 .../streaming/runtime/MultipleInputITCase.java     |  34 +--
 6 files changed, 182 insertions(+), 229 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 6507ce9..61925f4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -72,7 +72,11 @@ import java.util.Optional;
  * the timer service, timer callbacks are also guaranteed not to be called concurrently with
  * methods on {@code StreamOperator}.
  *
- * @param <OUT> The output type of the operator
+ * <p>Note, this class is going to be removed and replaced in the future by {@link AbstractStreamOperatorV2}.
+ * However as {@link AbstractStreamOperatorV2} is currently experimental, {@link AbstractStreamOperator}
+ * has not been deprecated just yet.
+ *
+ * @param <OUT> The output type of the operator.
  */
 @PublicEvolving
 public abstract class AbstractStreamOperator<OUT>
@@ -385,15 +389,18 @@ public abstract class AbstractStreamOperator<OUT>
 	 * to interact with systems such as broadcast variables and managed state. This also allows
 	 * to register timers.
 	 */
+	@VisibleForTesting
 	public StreamingRuntimeContext getRuntimeContext() {
 		return runtimeContext;
 	}
 
 	@SuppressWarnings("unchecked")
+	@VisibleForTesting
 	public <K> KeyedStateBackend<K> getKeyedStateBackend() {
 		return stateHandler.getKeyedStateBackend();
 	}
 
+	@VisibleForTesting
 	public OperatorStateBackend getOperatorStateBackend() {
 		return stateHandler.getOperatorStateBackend();
 	}
@@ -402,6 +409,7 @@ public abstract class AbstractStreamOperator<OUT>
 	 * Returns the {@link ProcessingTimeService} responsible for getting the current
 	 * processing time and registering timers.
 	 */
+	@VisibleForTesting
 	public ProcessingTimeService getProcessingTimeService() {
 		return processingTimeService;
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
similarity index 66%
copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index 6507ce9..a4ad021 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.KeyedStateStore;
@@ -56,118 +56,98 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Locale;
 import java.util.Optional;
 
 /**
- * Base class for all stream operators. Operators that contain a user function should extend the class
- * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class).
+ * New base class for all stream operators, intended to eventually replace {@link AbstractStreamOperator}.
+ * Currently intended to work smoothly just with {@link MultipleInputStreamOperator}.
  *
- * <p>For concrete implementations, one of the following two interfaces must also be implemented, to
- * mark the operator as unary or binary:
- * {@link OneInputStreamOperator} or {@link TwoInputStreamOperator}.
+ * <p>One note-able difference in comparison to {@link AbstractStreamOperator} is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like {@link StreamTask}.
  *
- * <p>Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
- * the timer service, timer callbacks are also guaranteed not to be called concurrently with
- * methods on {@code StreamOperator}.
+ * <p>Methods are guaranteed not to be called concurrently.
  *
  * @param <OUT> The output type of the operator
  */
-@PublicEvolving
-public abstract class AbstractStreamOperator<OUT>
-		implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, CheckpointedStreamOperator, Serializable {
-
-	private static final long serialVersionUID = 1L;
-
+@Experimental
+public abstract class AbstractStreamOperatorV2<OUT> implements StreamOperator<OUT>, CheckpointedStreamOperator {
 	/** The logger used by the operator class and its subclasses. */
-	protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperator.class);
-
-	// ----------- configuration properties -------------
-
-	// A sane default for most operators
-	protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
-
-	// ---------------- runtime fields ------------------
-
-	/** The task that contains this operator (and other operators in the same chain). */
-	private transient StreamTask<?, ?> container;
-
-	protected transient StreamConfig config;
-
-	protected transient Output<StreamRecord<OUT>> output;
-
-	/** The runtime context for UDFs. */
-	private transient StreamingRuntimeContext runtimeContext;
-
-	// ---------------- key/value state ------------------
-
-	/**
-	 * {@code KeySelector} for extracting a key from an element being processed. This is used to
-	 * scope keyed state to a key. This is null if the operator is not a keyed operator.
-	 *
-	 * <p>This is for elements from the first input.
-	 */
-	private transient KeySelector<?, ?> stateKeySelector1;
-
-	/**
-	 * {@code KeySelector} for extracting a key from an element being processed. This is used to
-	 * scope keyed state to a key. This is null if the operator is not a keyed operator.
-	 *
-	 * <p>This is for elements from the second input.
-	 */
-	private transient KeySelector<?, ?> stateKeySelector2;
+	protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperatorV2.class);
 
-	private transient StreamOperatorStateHandler stateHandler;
-
-	private transient InternalTimeServiceManager<?> timeServiceManager;
-
-	// --------------- Metrics ---------------------------
+	protected final StreamConfig config;
+	protected final Output<StreamRecord<OUT>> output;
+	private final StreamingRuntimeContext runtimeContext;
+	private final ExecutionConfig executionConfig;
+	private final ClassLoader userCodeClassLoader;
+	private final CloseableRegistry cancelables;
+	private final long[] inputWatermarks;
 
 	/** Metric group for the operator. */
-	protected transient OperatorMetricGroup metrics;
-
-	protected transient LatencyStats latencyStats;
+	protected final OperatorMetricGroup metrics;
+	protected final LatencyStats latencyStats;
+	protected final ProcessingTimeService processingTimeService;
 
-	// ---------------- time handler ------------------
-
-	protected transient ProcessingTimeService processingTimeService;
-
-	// ---------------- two-input operator watermarks ------------------
+	private StreamOperatorStateHandler stateHandler;
+	private InternalTimeServiceManager<?> timeServiceManager;
 
 	// We keep track of watermarks from both inputs, the combined input is the minimum
 	// Once the minimum advances we emit a new watermark for downstream operators
 	private long combinedWatermark = Long.MIN_VALUE;
-	private long input1Watermark = Long.MIN_VALUE;
-	private long input2Watermark = Long.MIN_VALUE;
-
-	// ------------------------------------------------------------------------
-	//  Life Cycle
-	// ------------------------------------------------------------------------
 
-	@Override
-	public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
-		final Environment environment = containingTask.getEnvironment();
-		this.container = containingTask;
-		this.config = config;
+	public AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, int numberOfInputs) {
+		inputWatermarks = new long[numberOfInputs];
+		Arrays.fill(inputWatermarks, Long.MIN_VALUE);
+		final Environment environment = parameters.getContainingTask().getEnvironment();
+		config = parameters.getStreamConfig();
+		CountingOutput<OUT> countingOutput;
+		OperatorMetricGroup operatorMetricGroup;
 		try {
-			OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName());
-			this.output = new CountingOutput(output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
+			operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName());
+			countingOutput = new CountingOutput(parameters.getOutput(), operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
 			if (config.isChainStart()) {
 				operatorMetricGroup.getIOMetricGroup().reuseInputMetricsForTask();
 			}
 			if (config.isChainEnd()) {
 				operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
 			}
-			this.metrics = operatorMetricGroup;
 		} catch (Exception e) {
 			LOG.warn("An error occurred while instantiating task metrics.", e);
-			this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
-			this.output = output;
+			countingOutput = null;
+			operatorMetricGroup = null;
+		}
+
+		if (countingOutput == null || operatorMetricGroup == null) {
+			metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
+			output = parameters.getOutput();
+		}
+		else {
+			metrics = operatorMetricGroup;
+			output = countingOutput;
 		}
 
+		latencyStats = createLatencyStats(
+			environment.getTaskManagerInfo().getConfiguration(),
+			parameters.getContainingTask().getIndexInSubtaskGroup());
+
+		processingTimeService = Preconditions.checkNotNull(parameters.getProcessingTimeService());
+		executionConfig = parameters.getContainingTask().getExecutionConfig();
+		userCodeClassLoader = parameters.getContainingTask().getUserCodeClassLoader();
+		cancelables = parameters.getContainingTask().getCancelables();
+
+		runtimeContext = new StreamingRuntimeContext(
+			environment,
+			environment.getAccumulatorRegistry().getUserMap(),
+			operatorMetricGroup,
+			getOperatorID(),
+			processingTimeService,
+			null);
+	}
+
+	private LatencyStats createLatencyStats(Configuration taskManagerConfig, int indexInSubtaskGroup) {
 		try {
-			Configuration taskManagerConfig = environment.getTaskManagerInfo().getConfiguration();
 			int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
 			if (historySize <= 0) {
 				LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
@@ -187,40 +167,20 @@ public abstract class AbstractStreamOperator<OUT>
 					granularity);
 			}
 			TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent();
-			this.latencyStats = new LatencyStats(jobMetricGroup.addGroup("latency"),
+			return new LatencyStats(jobMetricGroup.addGroup("latency"),
 				historySize,
-				container.getIndexInSubtaskGroup(),
+				indexInSubtaskGroup,
 				getOperatorID(),
 				granularity);
 		} catch (Exception e) {
 			LOG.warn("An error occurred while instantiating latency metrics.", e);
-			this.latencyStats = new LatencyStats(
+			return new LatencyStats(
 				UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"),
 				1,
 				0,
 				new OperatorID(),
 				LatencyStats.Granularity.SINGLE);
 		}
-
-		this.runtimeContext = new StreamingRuntimeContext(
-			environment,
-			environment.getAccumulatorRegistry().getUserMap(),
-			getMetricGroup(),
-			getOperatorID(),
-			getProcessingTimeService(),
-			null);
-
-		stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
-		stateKeySelector2 = config.getStatePartitioner(1, getUserCodeClassloader());
-	}
-
-	/**
-	 * @deprecated The {@link ProcessingTimeService} instance should be passed by the operator
-	 * constructor and this method will be removed along with {@link SetupableStreamOperator}.
-	 */
-	@Deprecated
-	public void setProcessingTimeService(ProcessingTimeService processingTimeService) {
-		this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
 	}
 
 	@Override
@@ -230,14 +190,8 @@ public abstract class AbstractStreamOperator<OUT>
 
 	@Override
 	public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception {
-
 		final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
 
-		final StreamTask<?, ?> containingTask =
-			Preconditions.checkNotNull(getContainingTask());
-		final CloseableRegistry streamTaskCloseableRegistry =
-			Preconditions.checkNotNull(containingTask.getCancelables());
-
 		final StreamOperatorStateContext context =
 			streamTaskStateManager.streamOperatorStateContext(
 				getOperatorID(),
@@ -245,13 +199,12 @@ public abstract class AbstractStreamOperator<OUT>
 				getProcessingTimeService(),
 				this,
 				keySerializer,
-				streamTaskCloseableRegistry,
+				cancelables,
 				metrics);
 
-		stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry);
+		stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), cancelables);
 		timeServiceManager = context.internalTimerServiceManager();
 		stateHandler.initializeOperatorState(this);
-		runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));
 	}
 
 	/**
@@ -301,10 +254,7 @@ public abstract class AbstractStreamOperator<OUT>
 	}
 
 	@Override
-	public final OperatorSnapshotFutures snapshotState(
-			long checkpointId,
-			long timestamp,
-			CheckpointOptions checkpointOptions,
+	public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
 			CheckpointStreamFactory factory) throws Exception {
 		return stateHandler.snapshotState(
 			this,
@@ -350,19 +300,15 @@ public abstract class AbstractStreamOperator<OUT>
 	 * @return The job's execution config.
 	 */
 	public ExecutionConfig getExecutionConfig() {
-		return container.getExecutionConfig();
+		return executionConfig;
 	}
 
 	public StreamConfig getOperatorConfig() {
 		return config;
 	}
 
-	public StreamTask<?, ?> getContainingTask() {
-		return container;
-	}
-
 	public ClassLoader getUserCodeClassloader() {
-		return container.getUserCodeClassLoader();
+		return userCodeClassLoader;
 	}
 
 	/**
@@ -390,10 +336,12 @@ public abstract class AbstractStreamOperator<OUT>
 	}
 
 	@SuppressWarnings("unchecked")
+	@VisibleForTesting
 	public <K> KeyedStateBackend<K> getKeyedStateBackend() {
-		return stateHandler.getKeyedStateBackend();
+		return (KeyedStateBackend<K>) stateHandler.getKeyedStateBackend();
 	}
 
+	@VisibleForTesting
 	public OperatorStateBackend getOperatorStateBackend() {
 		return stateHandler.getOperatorStateBackend();
 	}
@@ -402,6 +350,7 @@ public abstract class AbstractStreamOperator<OUT>
 	 * Returns the {@link ProcessingTimeService} responsible for getting the current
 	 * processing time and registering timers.
 	 */
+	@VisibleForTesting
 	public ProcessingTimeService getProcessingTimeService() {
 		return processingTimeService;
 	}
@@ -417,8 +366,8 @@ public abstract class AbstractStreamOperator<OUT>
 	}
 
 	protected <N, S extends State, T> S getOrCreateKeyedState(
-			TypeSerializer<N> namespaceSerializer,
-			StateDescriptor<S, T> stateDescriptor) throws Exception {
+		TypeSerializer<N> namespaceSerializer,
+		StateDescriptor<S, T> stateDescriptor) throws Exception {
 		return stateHandler.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
 	}
 
@@ -429,25 +378,13 @@ public abstract class AbstractStreamOperator<OUT>
 	 * @throws Exception Thrown, if the state backend cannot create the key/value state.
 	 */
 	protected <S extends State, N> S getPartitionedState(
-			N namespace,
-			TypeSerializer<N> namespaceSerializer,
-			StateDescriptor<S, ?> stateDescriptor) throws Exception {
+		N namespace,
+		TypeSerializer<N> namespaceSerializer,
+		StateDescriptor<S, ?> stateDescriptor) throws Exception {
 		return stateHandler.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
 	}
 
-	@Override
-	@SuppressWarnings({"unchecked", "rawtypes"})
-	public void setKeyContextElement1(StreamRecord record) throws Exception {
-		setKeyContextElement(record, stateKeySelector1);
-	}
-
-	@Override
-	@SuppressWarnings({"unchecked", "rawtypes"})
-	public void setKeyContextElement2(StreamRecord record) throws Exception {
-		setKeyContextElement(record, stateKeySelector2);
-	}
-
-	private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
+	protected <T> void internalSetKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
 		if (selector != null) {
 			Object key = selector.getKey(record.getValue());
 			setCurrentKey(key);
@@ -464,43 +401,11 @@ public abstract class AbstractStreamOperator<OUT>
 		return stateHandler.getCurrentKey();
 	}
 
-	public KeyedStateStore getKeyedStateStore() {
+	public Optional<KeyedStateStore> getKeyedStateStore() {
 		if (stateHandler == null) {
-			return null;
+			return Optional.empty();
 		}
-		return stateHandler.getKeyedStateStore().orElse(null);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Context and chaining properties
-	// ------------------------------------------------------------------------
-
-	@Override
-	public final void setChainingStrategy(ChainingStrategy strategy) {
-		this.chainingStrategy = strategy;
-	}
-
-	@Override
-	public final ChainingStrategy getChainingStrategy() {
-		return chainingStrategy;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Metrics
-	// ------------------------------------------------------------------------
-
-	// ------- One input stream
-	public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
-		reportOrForwardLatencyMarker(latencyMarker);
-	}
-
-	// ------- Two input stream
-	public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception {
-		reportOrForwardLatencyMarker(latencyMarker);
-	}
-
-	public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception {
-		reportOrForwardLatencyMarker(latencyMarker);
+		return stateHandler.getKeyedStateStore();
 	}
 
 	protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
@@ -536,6 +441,7 @@ public abstract class AbstractStreamOperator<OUT>
 	 *
 	 * @param <N> The type of the timer namespace.
 	 */
+	@VisibleForTesting
 	public <K, N> InternalTimerService<N> getInternalTimerService(
 			String name,
 			TypeSerializer<N> namespaceSerializer,
@@ -558,18 +464,12 @@ public abstract class AbstractStreamOperator<OUT>
 		output.emitWatermark(mark);
 	}
 
-	public void processWatermark1(Watermark mark) throws Exception {
-		input1Watermark = mark.getTimestamp();
-		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark) {
-			combinedWatermark = newMin;
-			processWatermark(new Watermark(combinedWatermark));
+	protected void reportWatermark(Watermark mark, int inputId) throws Exception {
+		inputWatermarks[inputId] = mark.getTimestamp();
+		long newMin = mark.getTimestamp();
+		for (long inputWatermark : inputWatermarks) {
+			newMin = Math.min(inputWatermark, newMin);
 		}
-	}
-
-	public void processWatermark2(Watermark mark) throws Exception {
-		input2Watermark = mark.getTimestamp();
-		long newMin = Math.min(input1Watermark, input2Watermark);
 		if (newMin > combinedWatermark) {
 			combinedWatermark = newMin;
 			processWatermark(new Watermark(combinedWatermark));
@@ -591,6 +491,16 @@ public abstract class AbstractStreamOperator<OUT>
 		return timeServiceManager == null ? 0 : timeServiceManager.numEventTimeTimers();
 	}
 
+	@Override
+	public void setKeyContextElement1(StreamRecord<?> record) throws Exception {
+		throw new IllegalStateException("This method should never be called. Use Input class instead");
+	}
+
+	@Override
+	public void setKeyContextElement2(StreamRecord<?> record) throws Exception {
+		throw new IllegalStateException("This method should never be called. Use Input class instead");
+	}
+
 	protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() {
 		return Optional.ofNullable(timeServiceManager);
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
index 1f6f1a0..5aaffcc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
@@ -25,9 +25,9 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 /**
- * Helper  class to construct {@link StreamOperatorBase}. Wraps couple of internal parameters
- * to simplify for users construction of classes extending {@link StreamOperatorBase} and to
- * allow for backward compatible changes in the {@link StreamOperatorBase}'s constructor.
+ * Helper  class to construct {@link AbstractStreamOperatorV2}. Wraps couple of internal parameters
+ * to simplify for users construction of classes extending {@link AbstractStreamOperatorV2} and to
+ * allow for backward compatible changes in the {@link AbstractStreamOperatorV2}'s constructor.
  *
  * @param <OUT> The output type of an operator that will be constructed using {@link StreamOperatorParameters}.
  */
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index fc0144b..88c4f9a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -30,9 +30,12 @@ import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
 import org.apache.flink.streaming.api.operators.Input;
 import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
 import org.apache.flink.streaming.runtime.io.InputStatus;
 import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
@@ -68,7 +71,7 @@ public class MultipleInputStreamTaskTest {
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.INT_TYPE_INFO)
 				.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
-				.setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperator())
+				.setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory())
 				.build()) {
 
 			long initialTime = 0L;
@@ -98,7 +101,7 @@ public class MultipleInputStreamTaskTest {
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
 				.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
 				.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
-				.setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperator())
+				.setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory())
 				.build()) {
 			ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
 			long initialTime = 0L;
@@ -148,7 +151,7 @@ public class MultipleInputStreamTaskTest {
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
 				.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
 				.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
-				.setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperator())
+				.setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory())
 				.build()) {
 			ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
 			long initialTime = 0L;
@@ -217,7 +220,7 @@ public class MultipleInputStreamTaskTest {
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
-				.setupOperatorChain(new DuplicatingOperator())
+				.setupOperatorChain(new DuplicatingOperatorFactory())
 				.chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
 				.chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
 				.finish()
@@ -246,9 +249,13 @@ public class MultipleInputStreamTaskTest {
 		}
 	}
 
-	static class DuplicatingOperator extends AbstractStreamOperator<String>
+	static class DuplicatingOperator extends AbstractStreamOperatorV2<String>
 		implements MultipleInputStreamOperator<String> {
 
+		public DuplicatingOperator(StreamOperatorParameters<String> parameters) {
+			super(parameters, 3);
+		}
+
 		@Override
 		public List<Input> getInputs() {
 			return Arrays.asList(new DuplicatingInput(), new DuplicatingInput(), new DuplicatingInput());
@@ -270,7 +277,7 @@ public class MultipleInputStreamTaskTest {
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
-				.setupOperatorChain(new TestBoundedMultipleInputOperator("Operator0"))
+				.setupOperatorChain(new TestBoundedMultipleInputOperatorFactory())
 				.chain(new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
 				.finish()
 				.build();
@@ -317,7 +324,7 @@ public class MultipleInputStreamTaskTest {
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
-					.setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperator())
+					.setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory())
 					.build()) {
 			ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
 
@@ -352,12 +359,16 @@ public class MultipleInputStreamTaskTest {
 	// This must only be used in one test, otherwise the static fields will be changed
 	// by several tests concurrently
 	private static class MapToStringMultipleInputOperator
-			extends AbstractStreamOperator<String> implements MultipleInputStreamOperator<String> {
+			extends AbstractStreamOperatorV2<String> implements MultipleInputStreamOperator<String> {
 		private static final long serialVersionUID = 1L;
 
 		private boolean openCalled;
 		private boolean closeCalled;
 
+		public MapToStringMultipleInputOperator(StreamOperatorParameters<String> parameters) {
+			super(parameters, 3);
+		}
+
 		@Override
 		public void open() throws Exception {
 			super.open();
@@ -418,5 +429,41 @@ public class MultipleInputStreamTaskTest {
 			return value.toString();
 		}
 	}
+
+	private static class TestBoundedMultipleInputOperatorFactory extends AbstractStreamOperatorFactory<String> {
+		@Override
+		public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
+			return (T) new TestBoundedMultipleInputOperator("Operator0", parameters);
+		}
+
+		@Override
+		public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
+			return TestBoundedMultipleInputOperator.class;
+		}
+	}
+
+	private static class DuplicatingOperatorFactory extends AbstractStreamOperatorFactory<String> {
+		@Override
+		public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
+			return (T) new DuplicatingOperator(parameters);
+		}
+
+		@Override
+		public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
+			return DuplicatingOperator.class;
+		}
+	}
+
+	private static class MapToStringMultipleInputOperatorFactory extends AbstractStreamOperatorFactory<String> {
+		@Override
+		public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
+			return (T) new MapToStringMultipleInputOperator(parameters);
+		}
+
+		@Override
+		public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
+			return MapToStringMultipleInputOperator.class;
+		}
+	}
 }
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java
index 89d242a..e6ebe84 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
 import org.apache.flink.streaming.api.operators.BoundedMultiInput;
 import org.apache.flink.streaming.api.operators.Input;
 import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Arrays;
@@ -30,14 +31,15 @@ import java.util.List;
 /**
  * A test operator class implementing {@link BoundedMultiInput}.
  */
-public class TestBoundedMultipleInputOperator extends AbstractStreamOperator<String>
+public class TestBoundedMultipleInputOperator extends AbstractStreamOperatorV2<String>
 	implements MultipleInputStreamOperator<String>, BoundedMultiInput {
 
 	private static final long serialVersionUID = 1L;
 
 	private final String name;
 
-	public TestBoundedMultipleInputOperator(String name) {
+	public TestBoundedMultipleInputOperator(String name, StreamOperatorParameters<String> parameters) {
+		super(parameters, 3);
 		this.name = name;
 	}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java
index f81bac9..32b6424 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java
@@ -21,12 +21,11 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
 import org.apache.flink.streaming.api.operators.Input;
 import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -84,13 +83,14 @@ public class MultipleInputITCase extends AbstractTestBase {
 
 	/**
 	 * 3 input operator that sums all of it inputs.
-	 * TODO: provide non {@link SetupableStreamOperator} variant of {@link AbstractStreamOperator}?
-	 * TODO: provide non {@link AbstractStreamOperator} seems to pre-override processWatermark1/2 and other
-	 * methods that are not defined there?
 	 */
-	public static class SumAllInputOperator extends AbstractStreamOperator<Long> implements MultipleInputStreamOperator<Long> {
+	public static class SumAllInputOperator extends AbstractStreamOperatorV2<Long> implements MultipleInputStreamOperator<Long> {
 		private long sum;
 
+		public SumAllInputOperator(StreamOperatorParameters<Long> parameters) {
+			super(parameters, 3);
+		}
+
 		@Override
 		public List<Input> getInputs() {
 			return Arrays.asList(
@@ -114,29 +114,15 @@ public class MultipleInputITCase extends AbstractTestBase {
 	/**
 	 * Factory for {@link SumAllInputOperator}.
 	 */
-	public static class SumAllInputOperatorFactory implements StreamOperatorFactory<Long> {
-		private ChainingStrategy chainingStrategy;
-
+	public static class SumAllInputOperatorFactory extends AbstractStreamOperatorFactory<Long> {
 		@Override
 		public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> parameters) {
-			SumAllInputOperator sumAllInputOperator = new SumAllInputOperator();
-			sumAllInputOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
-			return (T) sumAllInputOperator;
-		}
-
-		@Override
-		public void setChainingStrategy(ChainingStrategy chainingStrategy) {
-			this.chainingStrategy = chainingStrategy;
-		}
-
-		@Override
-		public ChainingStrategy getChainingStrategy() {
-			return chainingStrategy;
+			return (T) new SumAllInputOperator(parameters);
 		}
 
 		@Override
 		public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
-			throw new UnsupportedOperationException();
+			return SumAllInputOperator.class;
 		}
 	}
 }