You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/03/25 14:07:15 UTC
[flink] 10/11: [FLINK-16316][operators] Implement new
AbstractStreamOperatorV2 as a replacement for AbstractStreamOperator
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 16a0334879ac468b8cb5bda67116b5eab6549d1a
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Feb 26 09:39:53 2020 +0100
[FLINK-16316][operators] Implement new AbstractStreamOperatorV2 as a replacement for AbstractStreamOperator
The new base class for operators tries to address couple of limitations in the AbstractStreamOperator like:
- lack of support for multiple inputs
- setup(...) method
---
.../api/operators/AbstractStreamOperator.java | 10 +-
...Operator.java => AbstractStreamOperatorV2.java} | 288 +++++++--------------
.../api/operators/StreamOperatorParameters.java | 6 +-
.../runtime/tasks/MultipleInputStreamTaskTest.java | 65 ++++-
.../util/TestBoundedMultipleInputOperator.java | 8 +-
.../streaming/runtime/MultipleInputITCase.java | 34 +--
6 files changed, 182 insertions(+), 229 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 6507ce9..61925f4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -72,7 +72,11 @@ import java.util.Optional;
* the timer service, timer callbacks are also guaranteed not to be called concurrently with
* methods on {@code StreamOperator}.
*
- * @param <OUT> The output type of the operator
+ * <p>Note, this class is going to be removed and replaced in the future by {@link AbstractStreamOperatorV2}.
+ * However as {@link AbstractStreamOperatorV2} is currently experimental, {@link AbstractStreamOperator}
+ * has not been deprecated just yet.
+ *
+ * @param <OUT> The output type of the operator.
*/
@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
@@ -385,15 +389,18 @@ public abstract class AbstractStreamOperator<OUT>
* to interact with systems such as broadcast variables and managed state. This also allows
* to register timers.
*/
+ @VisibleForTesting
public StreamingRuntimeContext getRuntimeContext() {
return runtimeContext;
}
@SuppressWarnings("unchecked")
+ @VisibleForTesting
public <K> KeyedStateBackend<K> getKeyedStateBackend() {
return stateHandler.getKeyedStateBackend();
}
+ @VisibleForTesting
public OperatorStateBackend getOperatorStateBackend() {
return stateHandler.getOperatorStateBackend();
}
@@ -402,6 +409,7 @@ public abstract class AbstractStreamOperator<OUT>
* Returns the {@link ProcessingTimeService} responsible for getting the current
* processing time and registering timers.
*/
+ @VisibleForTesting
public ProcessingTimeService getProcessingTimeService() {
return processingTimeService;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
similarity index 66%
copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index 6507ce9..a4ad021 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.api.operators;
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.KeyedStateStore;
@@ -56,118 +56,98 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Serializable;
+import java.util.Arrays;
import java.util.Locale;
import java.util.Optional;
/**
- * Base class for all stream operators. Operators that contain a user function should extend the class
- * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class).
+ * New base class for all stream operators, intended to eventually replace {@link AbstractStreamOperator}.
+ * Currently intended to work smoothly just with {@link MultipleInputStreamOperator}.
*
- * <p>For concrete implementations, one of the following two interfaces must also be implemented, to
- * mark the operator as unary or binary:
- * {@link OneInputStreamOperator} or {@link TwoInputStreamOperator}.
+ * <p>One note-able difference in comparison to {@link AbstractStreamOperator} is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like {@link StreamTask}.
*
- * <p>Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
- * the timer service, timer callbacks are also guaranteed not to be called concurrently with
- * methods on {@code StreamOperator}.
+ * <p>Methods are guaranteed not to be called concurrently.
*
* @param <OUT> The output type of the operator
*/
-@PublicEvolving
-public abstract class AbstractStreamOperator<OUT>
- implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, CheckpointedStreamOperator, Serializable {
-
- private static final long serialVersionUID = 1L;
-
+@Experimental
+public abstract class AbstractStreamOperatorV2<OUT> implements StreamOperator<OUT>, CheckpointedStreamOperator {
/** The logger used by the operator class and its subclasses. */
- protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperator.class);
-
- // ----------- configuration properties -------------
-
- // A sane default for most operators
- protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
-
- // ---------------- runtime fields ------------------
-
- /** The task that contains this operator (and other operators in the same chain). */
- private transient StreamTask<?, ?> container;
-
- protected transient StreamConfig config;
-
- protected transient Output<StreamRecord<OUT>> output;
-
- /** The runtime context for UDFs. */
- private transient StreamingRuntimeContext runtimeContext;
-
- // ---------------- key/value state ------------------
-
- /**
- * {@code KeySelector} for extracting a key from an element being processed. This is used to
- * scope keyed state to a key. This is null if the operator is not a keyed operator.
- *
- * <p>This is for elements from the first input.
- */
- private transient KeySelector<?, ?> stateKeySelector1;
-
- /**
- * {@code KeySelector} for extracting a key from an element being processed. This is used to
- * scope keyed state to a key. This is null if the operator is not a keyed operator.
- *
- * <p>This is for elements from the second input.
- */
- private transient KeySelector<?, ?> stateKeySelector2;
+ protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperatorV2.class);
- private transient StreamOperatorStateHandler stateHandler;
-
- private transient InternalTimeServiceManager<?> timeServiceManager;
-
- // --------------- Metrics ---------------------------
+ protected final StreamConfig config;
+ protected final Output<StreamRecord<OUT>> output;
+ private final StreamingRuntimeContext runtimeContext;
+ private final ExecutionConfig executionConfig;
+ private final ClassLoader userCodeClassLoader;
+ private final CloseableRegistry cancelables;
+ private final long[] inputWatermarks;
/** Metric group for the operator. */
- protected transient OperatorMetricGroup metrics;
-
- protected transient LatencyStats latencyStats;
+ protected final OperatorMetricGroup metrics;
+ protected final LatencyStats latencyStats;
+ protected final ProcessingTimeService processingTimeService;
- // ---------------- time handler ------------------
-
- protected transient ProcessingTimeService processingTimeService;
-
- // ---------------- two-input operator watermarks ------------------
+ private StreamOperatorStateHandler stateHandler;
+ private InternalTimeServiceManager<?> timeServiceManager;
// We keep track of watermarks from both inputs, the combined input is the minimum
// Once the minimum advances we emit a new watermark for downstream operators
private long combinedWatermark = Long.MIN_VALUE;
- private long input1Watermark = Long.MIN_VALUE;
- private long input2Watermark = Long.MIN_VALUE;
-
- // ------------------------------------------------------------------------
- // Life Cycle
- // ------------------------------------------------------------------------
- @Override
- public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
- final Environment environment = containingTask.getEnvironment();
- this.container = containingTask;
- this.config = config;
+ public AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, int numberOfInputs) {
+ inputWatermarks = new long[numberOfInputs];
+ Arrays.fill(inputWatermarks, Long.MIN_VALUE);
+ final Environment environment = parameters.getContainingTask().getEnvironment();
+ config = parameters.getStreamConfig();
+ CountingOutput<OUT> countingOutput;
+ OperatorMetricGroup operatorMetricGroup;
try {
- OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName());
- this.output = new CountingOutput(output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
+ operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName());
+ countingOutput = new CountingOutput(parameters.getOutput(), operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
if (config.isChainStart()) {
operatorMetricGroup.getIOMetricGroup().reuseInputMetricsForTask();
}
if (config.isChainEnd()) {
operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
}
- this.metrics = operatorMetricGroup;
} catch (Exception e) {
LOG.warn("An error occurred while instantiating task metrics.", e);
- this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
- this.output = output;
+ countingOutput = null;
+ operatorMetricGroup = null;
+ }
+
+ if (countingOutput == null || operatorMetricGroup == null) {
+ metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
+ output = parameters.getOutput();
+ }
+ else {
+ metrics = operatorMetricGroup;
+ output = countingOutput;
}
+ latencyStats = createLatencyStats(
+ environment.getTaskManagerInfo().getConfiguration(),
+ parameters.getContainingTask().getIndexInSubtaskGroup());
+
+ processingTimeService = Preconditions.checkNotNull(parameters.getProcessingTimeService());
+ executionConfig = parameters.getContainingTask().getExecutionConfig();
+ userCodeClassLoader = parameters.getContainingTask().getUserCodeClassLoader();
+ cancelables = parameters.getContainingTask().getCancelables();
+
+ runtimeContext = new StreamingRuntimeContext(
+ environment,
+ environment.getAccumulatorRegistry().getUserMap(),
+ operatorMetricGroup,
+ getOperatorID(),
+ processingTimeService,
+ null);
+ }
+
+ private LatencyStats createLatencyStats(Configuration taskManagerConfig, int indexInSubtaskGroup) {
try {
- Configuration taskManagerConfig = environment.getTaskManagerInfo().getConfiguration();
int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
if (historySize <= 0) {
LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
@@ -187,40 +167,20 @@ public abstract class AbstractStreamOperator<OUT>
granularity);
}
TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent();
- this.latencyStats = new LatencyStats(jobMetricGroup.addGroup("latency"),
+ return new LatencyStats(jobMetricGroup.addGroup("latency"),
historySize,
- container.getIndexInSubtaskGroup(),
+ indexInSubtaskGroup,
getOperatorID(),
granularity);
} catch (Exception e) {
LOG.warn("An error occurred while instantiating latency metrics.", e);
- this.latencyStats = new LatencyStats(
+ return new LatencyStats(
UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"),
1,
0,
new OperatorID(),
LatencyStats.Granularity.SINGLE);
}
-
- this.runtimeContext = new StreamingRuntimeContext(
- environment,
- environment.getAccumulatorRegistry().getUserMap(),
- getMetricGroup(),
- getOperatorID(),
- getProcessingTimeService(),
- null);
-
- stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
- stateKeySelector2 = config.getStatePartitioner(1, getUserCodeClassloader());
- }
-
- /**
- * @deprecated The {@link ProcessingTimeService} instance should be passed by the operator
- * constructor and this method will be removed along with {@link SetupableStreamOperator}.
- */
- @Deprecated
- public void setProcessingTimeService(ProcessingTimeService processingTimeService) {
- this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
}
@Override
@@ -230,14 +190,8 @@ public abstract class AbstractStreamOperator<OUT>
@Override
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception {
-
final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
- final StreamTask<?, ?> containingTask =
- Preconditions.checkNotNull(getContainingTask());
- final CloseableRegistry streamTaskCloseableRegistry =
- Preconditions.checkNotNull(containingTask.getCancelables());
-
final StreamOperatorStateContext context =
streamTaskStateManager.streamOperatorStateContext(
getOperatorID(),
@@ -245,13 +199,12 @@ public abstract class AbstractStreamOperator<OUT>
getProcessingTimeService(),
this,
keySerializer,
- streamTaskCloseableRegistry,
+ cancelables,
metrics);
- stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry);
+ stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), cancelables);
timeServiceManager = context.internalTimerServiceManager();
stateHandler.initializeOperatorState(this);
- runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));
}
/**
@@ -301,10 +254,7 @@ public abstract class AbstractStreamOperator<OUT>
}
@Override
- public final OperatorSnapshotFutures snapshotState(
- long checkpointId,
- long timestamp,
- CheckpointOptions checkpointOptions,
+ public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory) throws Exception {
return stateHandler.snapshotState(
this,
@@ -350,19 +300,15 @@ public abstract class AbstractStreamOperator<OUT>
* @return The job's execution config.
*/
public ExecutionConfig getExecutionConfig() {
- return container.getExecutionConfig();
+ return executionConfig;
}
public StreamConfig getOperatorConfig() {
return config;
}
- public StreamTask<?, ?> getContainingTask() {
- return container;
- }
-
public ClassLoader getUserCodeClassloader() {
- return container.getUserCodeClassLoader();
+ return userCodeClassLoader;
}
/**
@@ -390,10 +336,12 @@ public abstract class AbstractStreamOperator<OUT>
}
@SuppressWarnings("unchecked")
+ @VisibleForTesting
public <K> KeyedStateBackend<K> getKeyedStateBackend() {
- return stateHandler.getKeyedStateBackend();
+ return (KeyedStateBackend<K>) stateHandler.getKeyedStateBackend();
}
+ @VisibleForTesting
public OperatorStateBackend getOperatorStateBackend() {
return stateHandler.getOperatorStateBackend();
}
@@ -402,6 +350,7 @@ public abstract class AbstractStreamOperator<OUT>
* Returns the {@link ProcessingTimeService} responsible for getting the current
* processing time and registering timers.
*/
+ @VisibleForTesting
public ProcessingTimeService getProcessingTimeService() {
return processingTimeService;
}
@@ -417,8 +366,8 @@ public abstract class AbstractStreamOperator<OUT>
}
protected <N, S extends State, T> S getOrCreateKeyedState(
- TypeSerializer<N> namespaceSerializer,
- StateDescriptor<S, T> stateDescriptor) throws Exception {
+ TypeSerializer<N> namespaceSerializer,
+ StateDescriptor<S, T> stateDescriptor) throws Exception {
return stateHandler.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
}
@@ -429,25 +378,13 @@ public abstract class AbstractStreamOperator<OUT>
* @throws Exception Thrown, if the state backend cannot create the key/value state.
*/
protected <S extends State, N> S getPartitionedState(
- N namespace,
- TypeSerializer<N> namespaceSerializer,
- StateDescriptor<S, ?> stateDescriptor) throws Exception {
+ N namespace,
+ TypeSerializer<N> namespaceSerializer,
+ StateDescriptor<S, ?> stateDescriptor) throws Exception {
return stateHandler.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
}
- @Override
- @SuppressWarnings({"unchecked", "rawtypes"})
- public void setKeyContextElement1(StreamRecord record) throws Exception {
- setKeyContextElement(record, stateKeySelector1);
- }
-
- @Override
- @SuppressWarnings({"unchecked", "rawtypes"})
- public void setKeyContextElement2(StreamRecord record) throws Exception {
- setKeyContextElement(record, stateKeySelector2);
- }
-
- private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
+ protected <T> void internalSetKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
if (selector != null) {
Object key = selector.getKey(record.getValue());
setCurrentKey(key);
@@ -464,43 +401,11 @@ public abstract class AbstractStreamOperator<OUT>
return stateHandler.getCurrentKey();
}
- public KeyedStateStore getKeyedStateStore() {
+ public Optional<KeyedStateStore> getKeyedStateStore() {
if (stateHandler == null) {
- return null;
+ return Optional.empty();
}
- return stateHandler.getKeyedStateStore().orElse(null);
- }
-
- // ------------------------------------------------------------------------
- // Context and chaining properties
- // ------------------------------------------------------------------------
-
- @Override
- public final void setChainingStrategy(ChainingStrategy strategy) {
- this.chainingStrategy = strategy;
- }
-
- @Override
- public final ChainingStrategy getChainingStrategy() {
- return chainingStrategy;
- }
-
- // ------------------------------------------------------------------------
- // Metrics
- // ------------------------------------------------------------------------
-
- // ------- One input stream
- public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
- reportOrForwardLatencyMarker(latencyMarker);
- }
-
- // ------- Two input stream
- public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception {
- reportOrForwardLatencyMarker(latencyMarker);
- }
-
- public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception {
- reportOrForwardLatencyMarker(latencyMarker);
+ return stateHandler.getKeyedStateStore();
}
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
@@ -536,6 +441,7 @@ public abstract class AbstractStreamOperator<OUT>
*
* @param <N> The type of the timer namespace.
*/
+ @VisibleForTesting
public <K, N> InternalTimerService<N> getInternalTimerService(
String name,
TypeSerializer<N> namespaceSerializer,
@@ -558,18 +464,12 @@ public abstract class AbstractStreamOperator<OUT>
output.emitWatermark(mark);
}
- public void processWatermark1(Watermark mark) throws Exception {
- input1Watermark = mark.getTimestamp();
- long newMin = Math.min(input1Watermark, input2Watermark);
- if (newMin > combinedWatermark) {
- combinedWatermark = newMin;
- processWatermark(new Watermark(combinedWatermark));
+ protected void reportWatermark(Watermark mark, int inputId) throws Exception {
+ inputWatermarks[inputId] = mark.getTimestamp();
+ long newMin = mark.getTimestamp();
+ for (long inputWatermark : inputWatermarks) {
+ newMin = Math.min(inputWatermark, newMin);
}
- }
-
- public void processWatermark2(Watermark mark) throws Exception {
- input2Watermark = mark.getTimestamp();
- long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
processWatermark(new Watermark(combinedWatermark));
@@ -591,6 +491,16 @@ public abstract class AbstractStreamOperator<OUT>
return timeServiceManager == null ? 0 : timeServiceManager.numEventTimeTimers();
}
+ @Override
+ public void setKeyContextElement1(StreamRecord<?> record) throws Exception {
+ throw new IllegalStateException("This method should never be called. Use Input class instead");
+ }
+
+ @Override
+ public void setKeyContextElement2(StreamRecord<?> record) throws Exception {
+ throw new IllegalStateException("This method should never be called. Use Input class instead");
+ }
+
protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() {
return Optional.ofNullable(timeServiceManager);
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
index 1f6f1a0..5aaffcc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
@@ -25,9 +25,9 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
/**
- * Helper class to construct {@link StreamOperatorBase}. Wraps couple of internal parameters
- * to simplify for users construction of classes extending {@link StreamOperatorBase} and to
- * allow for backward compatible changes in the {@link StreamOperatorBase}'s constructor.
+ * Helper class to construct {@link AbstractStreamOperatorV2}. Wraps couple of internal parameters
+ * to simplify for users construction of classes extending {@link AbstractStreamOperatorV2} and to
+ * allow for backward compatible changes in the {@link AbstractStreamOperatorV2}'s constructor.
*
* @param <OUT> The output type of an operator that will be constructed using {@link StreamOperatorParameters}.
*/
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index fc0144b..88c4f9a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -30,9 +30,12 @@ import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.runtime.io.InputStatus;
import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
@@ -68,7 +71,7 @@ public class MultipleInputStreamTaskTest {
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.INT_TYPE_INFO)
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
- .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperator())
+ .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory())
.build()) {
long initialTime = 0L;
@@ -98,7 +101,7 @@ public class MultipleInputStreamTaskTest {
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
- .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperator())
+ .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory())
.build()) {
ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
long initialTime = 0L;
@@ -148,7 +151,7 @@ public class MultipleInputStreamTaskTest {
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
- .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperator())
+ .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory())
.build()) {
ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
long initialTime = 0L;
@@ -217,7 +220,7 @@ public class MultipleInputStreamTaskTest {
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
- .setupOperatorChain(new DuplicatingOperator())
+ .setupOperatorChain(new DuplicatingOperatorFactory())
.chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.finish()
@@ -246,9 +249,13 @@ public class MultipleInputStreamTaskTest {
}
}
- static class DuplicatingOperator extends AbstractStreamOperator<String>
+ static class DuplicatingOperator extends AbstractStreamOperatorV2<String>
implements MultipleInputStreamOperator<String> {
+ public DuplicatingOperator(StreamOperatorParameters<String> parameters) {
+ super(parameters, 3);
+ }
+
@Override
public List<Input> getInputs() {
return Arrays.asList(new DuplicatingInput(), new DuplicatingInput(), new DuplicatingInput());
@@ -270,7 +277,7 @@ public class MultipleInputStreamTaskTest {
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
- .setupOperatorChain(new TestBoundedMultipleInputOperator("Operator0"))
+ .setupOperatorChain(new TestBoundedMultipleInputOperatorFactory())
.chain(new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.finish()
.build();
@@ -317,7 +324,7 @@ public class MultipleInputStreamTaskTest {
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
- .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperator())
+ .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory())
.build()) {
ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
@@ -352,12 +359,16 @@ public class MultipleInputStreamTaskTest {
// This must only be used in one test, otherwise the static fields will be changed
// by several tests concurrently
private static class MapToStringMultipleInputOperator
- extends AbstractStreamOperator<String> implements MultipleInputStreamOperator<String> {
+ extends AbstractStreamOperatorV2<String> implements MultipleInputStreamOperator<String> {
private static final long serialVersionUID = 1L;
private boolean openCalled;
private boolean closeCalled;
+ public MapToStringMultipleInputOperator(StreamOperatorParameters<String> parameters) {
+ super(parameters, 3);
+ }
+
@Override
public void open() throws Exception {
super.open();
@@ -418,5 +429,41 @@ public class MultipleInputStreamTaskTest {
return value.toString();
}
}
+
+ private static class TestBoundedMultipleInputOperatorFactory extends AbstractStreamOperatorFactory<String> {
+ @Override
+ public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
+ return (T) new TestBoundedMultipleInputOperator("Operator0", parameters);
+ }
+
+ @Override
+ public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
+ return TestBoundedMultipleInputOperator.class;
+ }
+ }
+
+ private static class DuplicatingOperatorFactory extends AbstractStreamOperatorFactory<String> {
+ @Override
+ public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
+ return (T) new DuplicatingOperator(parameters);
+ }
+
+ @Override
+ public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
+ return DuplicatingOperator.class;
+ }
+ }
+
+ private static class MapToStringMultipleInputOperatorFactory extends AbstractStreamOperatorFactory<String> {
+ @Override
+ public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
+ return (T) new MapToStringMultipleInputOperator(parameters);
+ }
+
+ @Override
+ public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
+ return MapToStringMultipleInputOperator.class;
+ }
+ }
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java
index 89d242a..e6ebe84 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java
@@ -18,10 +18,11 @@
package org.apache.flink.streaming.util;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.Arrays;
@@ -30,14 +31,15 @@ import java.util.List;
/**
* A test operator class implementing {@link BoundedMultiInput}.
*/
-public class TestBoundedMultipleInputOperator extends AbstractStreamOperator<String>
+public class TestBoundedMultipleInputOperator extends AbstractStreamOperatorV2<String>
implements MultipleInputStreamOperator<String>, BoundedMultiInput {
private static final long serialVersionUID = 1L;
private final String name;
- public TestBoundedMultipleInputOperator(String name) {
+ public TestBoundedMultipleInputOperator(String name, StreamOperatorParameters<String> parameters) {
+ super(parameters, 3);
this.name = name;
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java
index f81bac9..32b6424 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java
@@ -21,12 +21,11 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -84,13 +83,14 @@ public class MultipleInputITCase extends AbstractTestBase {
/**
* 3 input operator that sums all of it inputs.
- * TODO: provide non {@link SetupableStreamOperator} variant of {@link AbstractStreamOperator}?
- * TODO: provide non {@link AbstractStreamOperator} seems to pre-override processWatermark1/2 and other
- * methods that are not defined there?
*/
- public static class SumAllInputOperator extends AbstractStreamOperator<Long> implements MultipleInputStreamOperator<Long> {
+ public static class SumAllInputOperator extends AbstractStreamOperatorV2<Long> implements MultipleInputStreamOperator<Long> {
private long sum;
+ public SumAllInputOperator(StreamOperatorParameters<Long> parameters) {
+ super(parameters, 3);
+ }
+
@Override
public List<Input> getInputs() {
return Arrays.asList(
@@ -114,29 +114,15 @@ public class MultipleInputITCase extends AbstractTestBase {
/**
* Factory for {@link SumAllInputOperator}.
*/
- public static class SumAllInputOperatorFactory implements StreamOperatorFactory<Long> {
- private ChainingStrategy chainingStrategy;
-
+ public static class SumAllInputOperatorFactory extends AbstractStreamOperatorFactory<Long> {
@Override
public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> parameters) {
- SumAllInputOperator sumAllInputOperator = new SumAllInputOperator();
- sumAllInputOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
- return (T) sumAllInputOperator;
- }
-
- @Override
- public void setChainingStrategy(ChainingStrategy chainingStrategy) {
- this.chainingStrategy = chainingStrategy;
- }
-
- @Override
- public ChainingStrategy getChainingStrategy() {
- return chainingStrategy;
+ return (T) new SumAllInputOperator(parameters);
}
@Override
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
- throw new UnsupportedOperationException();
+ return SumAllInputOperator.class;
}
}
}