You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/08/09 01:55:51 UTC

[flink] branch master updated: [FLINK-28380][runtime] Produce one intermediate dataset for multiple consumer job vertices consuming the same data

This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 60bc87c0b83 [FLINK-28380][runtime] Produce one intermediate dataset for multiple consumer job vertices consuming the same data
60bc87c0b83 is described below

commit 60bc87c0b83149c4f19d7e54af2d967087a277fb
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Mon Jul 25 16:22:50 2022 +0800

    [FLINK-28380][runtime] Produce one intermediate dataset for multiple consumer job vertices consuming the same data
    
    This closes #20351.
---
 .../io/network/partition/ResultPartitionType.java  |  25 +++-
 .../streaming/api/graph/NonChainedOutput.java      | 146 +++++++++++++++++++++
 .../flink/streaming/api/graph/StreamConfig.java    |  43 +++---
 .../api/graph/StreamingJobGraphGenerator.java      | 141 +++++++++++++++-----
 .../partitioner/CustomPartitionerWrapper.java      |  20 +++
 .../streaming/runtime/tasks/OperatorChain.java     |  71 +++++-----
 .../flink/streaming/runtime/tasks/StreamTask.java  |  20 +--
 .../api/graph/StreamingJobGraphGeneratorTest.java  |  93 ++++++++++++-
 .../ForwardForConsecutiveHashPartitionerTest.java  |  26 ++--
 .../ForwardForUnspecifiedPartitionerTest.java      |  25 ++--
 .../runtime/tasks/StreamConfigChainer.java         |  69 +++++-----
 .../tasks/StreamTaskMailboxTestHarnessBuilder.java |  25 +++-
 .../runtime/tasks/StreamTaskTestHarness.java       |  30 +++--
 .../flink/streaming/util/MockStreamConfig.java     |  29 ++--
 .../runtime/partitioner/BinaryHashPartitioner.java |  20 +++
 15 files changed, 588 insertions(+), 195 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index c82e55a8321..830a2b30687 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -32,7 +32,7 @@ public enum ResultPartitionType {
      * {@link #PIPELINED} partitions), but only released through the scheduler, when it determines
      * that the partition is no longer needed.
      */
-    BLOCKING(false, false, ConsumingConstraint.BLOCKING, ReleaseBy.SCHEDULER),
+    BLOCKING(true, false, false, ConsumingConstraint.BLOCKING, ReleaseBy.SCHEDULER),
 
     /**
      * BLOCKING_PERSISTENT partitions are similar to {@link #BLOCKING} partitions, but have a
@@ -45,7 +45,7 @@ public enum ResultPartitionType {
      * scenarios, like when the TaskManager exits or when the TaskManager loses connection to
      * JobManager / ResourceManager for too long.
      */
-    BLOCKING_PERSISTENT(false, true, ConsumingConstraint.BLOCKING, ReleaseBy.SCHEDULER),
+    BLOCKING_PERSISTENT(true, false, true, ConsumingConstraint.BLOCKING, ReleaseBy.SCHEDULER),
 
     /**
      * A pipelined streaming data exchange. This is applicable to both bounded and unbounded
@@ -57,7 +57,7 @@ public enum ResultPartitionType {
      * <p>This result partition type may keep an arbitrary amount of data in-flight, in contrast to
      * the {@link #PIPELINED_BOUNDED} variant.
      */
-    PIPELINED(false, false, ConsumingConstraint.MUST_BE_PIPELINED, ReleaseBy.UPSTREAM),
+    PIPELINED(false, false, false, ConsumingConstraint.MUST_BE_PIPELINED, ReleaseBy.UPSTREAM),
 
     /**
      * Pipelined partitions with a bounded (local) buffer pool.
@@ -70,7 +70,8 @@ public enum ResultPartitionType {
      * <p>For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there
      * are no checkpoint barriers.
      */
-    PIPELINED_BOUNDED(true, false, ConsumingConstraint.MUST_BE_PIPELINED, ReleaseBy.UPSTREAM),
+    PIPELINED_BOUNDED(
+            false, true, false, ConsumingConstraint.MUST_BE_PIPELINED, ReleaseBy.UPSTREAM),
 
     /**
      * Pipelined partitions with a bounded (local) buffer pool to support downstream task to
@@ -81,7 +82,8 @@ public enum ResultPartitionType {
      * in that {@link #PIPELINED_APPROXIMATE} partition can be reconnected after down stream task
      * fails.
      */
-    PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM),
+    PIPELINED_APPROXIMATE(
+            false, true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM),
 
     /**
      * Hybrid partitions with a bounded (local) buffer pool to support downstream task to
@@ -89,7 +91,12 @@ public enum ResultPartitionType {
      *
      * <p>Hybrid partitions can be consumed any time, whether fully produced or not.
      */
-    HYBRID(false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER);
+    HYBRID(false, false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER);
+
+    /**
+     * Can this result partition be consumed by multiple downstream consumers for multiple times.
+     */
+    private final boolean isReconsumable;
 
     /** Does this partition use a limited number of (network) buffers? */
     private final boolean isBounded;
@@ -125,10 +132,12 @@ public enum ResultPartitionType {
 
     /** Specifies the behaviour of an intermediate result partition at runtime. */
     ResultPartitionType(
+            boolean isReconsumable,
             boolean isBounded,
             boolean isPersistent,
             ConsumingConstraint consumingConstraint,
             ReleaseBy releaseBy) {
+        this.isReconsumable = isReconsumable;
         this.isBounded = isBounded;
         this.isPersistent = isPersistent;
         this.consumingConstraint = consumingConstraint;
@@ -200,4 +209,8 @@ public enum ResultPartitionType {
     public boolean supportCompression() {
         return isBlockingOrBlockingPersistentResultPartition() || this == HYBRID;
     }
+
+    public boolean isReconsumable() {
+        return isReconsumable;
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java
new file mode 100644
index 00000000000..20d357c14e4
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.util.OutputTag;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Used by operator chain and represents a non-chained output of the corresponding stream operator.
+ */
+@Internal
+public class NonChainedOutput implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** Is unaligned checkpoint supported. */
+    private final boolean supportsUnalignedCheckpoints;
+
+    /** ID of the producer {@link StreamNode}. */
+    private final int sourceNodeId;
+
+    /** Parallelism of the consumer vertex. */
+    private final int consumerParallelism;
+
+    /** Max parallelism of the consumer vertex. */
+    private final int consumerMaxParallelism;
+
+    /** Buffer flush timeout of this output. */
+    private final long bufferTimeout;
+
+    /** ID of the produced intermediate dataset. */
+    private final IntermediateDataSetID dataSetId;
+
+    /** Whether this intermediate dataset is a persistent dataset or not. */
+    private final boolean isPersistentDataSet;
+
+    /** The side-output tag (if any). */
+    private final OutputTag<?> outputTag;
+
+    /** The corresponding data partitioner. */
+    private final StreamPartitioner<?> partitioner;
+
+    /** Target {@link ResultPartitionType}. */
+    private final ResultPartitionType partitionType;
+
+    public NonChainedOutput(
+            boolean supportsUnalignedCheckpoints,
+            int sourceNodeId,
+            int consumerParallelism,
+            int consumerMaxParallelism,
+            long bufferTimeout,
+            boolean isPersistentDataSet,
+            IntermediateDataSetID dataSetId,
+            OutputTag<?> outputTag,
+            StreamPartitioner<?> partitioner,
+            ResultPartitionType partitionType) {
+        this.supportsUnalignedCheckpoints = supportsUnalignedCheckpoints;
+        this.sourceNodeId = sourceNodeId;
+        this.consumerParallelism = consumerParallelism;
+        this.consumerMaxParallelism = consumerMaxParallelism;
+        this.bufferTimeout = bufferTimeout;
+        this.isPersistentDataSet = isPersistentDataSet;
+        this.dataSetId = dataSetId;
+        this.outputTag = outputTag;
+        this.partitioner = partitioner;
+        this.partitionType = partitionType;
+    }
+
+    public boolean supportsUnalignedCheckpoints() {
+        return supportsUnalignedCheckpoints;
+    }
+
+    public int getSourceNodeId() {
+        return sourceNodeId;
+    }
+
+    public int getConsumerParallelism() {
+        return consumerParallelism;
+    }
+
+    public int getConsumerMaxParallelism() {
+        return consumerMaxParallelism;
+    }
+
+    public long getBufferTimeout() {
+        return bufferTimeout;
+    }
+
+    public IntermediateDataSetID getDataSetId() {
+        return dataSetId;
+    }
+
+    public IntermediateDataSetID getPersistentDataSetId() {
+        return isPersistentDataSet ? dataSetId : null;
+    }
+
+    public OutputTag<?> getOutputTag() {
+        return outputTag;
+    }
+
+    public StreamPartitioner<?> getPartitioner() {
+        return partitioner;
+    }
+
+    public ResultPartitionType getPartitionType() {
+        return partitionType;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        NonChainedOutput output = (NonChainedOutput) o;
+        return Objects.equals(dataSetId, output.dataSetId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(dataSetId);
+    }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 8c85b4343d2..57cb1bca1ba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.util.CorruptConfigurationException;
 import org.apache.flink.runtime.state.CheckpointStorage;
@@ -91,8 +92,8 @@ public class StreamConfig implements Serializable {
     private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out";
     private static final String TYPE_SERIALIZER_SIDEOUT_PREFIX = "typeSerializer_sideout_";
     private static final String ITERATON_WAIT = "iterationWait";
-    private static final String NONCHAINED_OUTPUTS = "nonChainedOutputs";
-    private static final String EDGES_IN_ORDER = "edgesInOrder";
+    private static final String OP_NONCHAINED_OUTPUTS = "opNonChainedOutputs";
+    private static final String VERTEX_NONCHAINED_OUTPUTS = "vertexNonChainedOutputs";
     private static final String IN_STREAM_EDGES = "inStreamEdges";
     private static final String OPERATOR_NAME = "operatorName";
     private static final String OPERATOR_ID = "operatorID";
@@ -434,15 +435,16 @@ public class StreamConfig implements Serializable {
         return config.getInteger(NUMBER_OF_OUTPUTS, 0);
     }
 
-    public void setNonChainedOutputs(List<StreamEdge> outputvertexIDs) {
-        toBeSerializedConfigObjects.put(NONCHAINED_OUTPUTS, outputvertexIDs);
+    /** Sets the operator level non-chained outputs. */
+    public void setOperatorNonChainedOutputs(List<NonChainedOutput> nonChainedOutputs) {
+        toBeSerializedConfigObjects.put(OP_NONCHAINED_OUTPUTS, nonChainedOutputs);
     }
 
-    public List<StreamEdge> getNonChainedOutputs(ClassLoader cl) {
+    public List<NonChainedOutput> getOperatorNonChainedOutputs(ClassLoader cl) {
         try {
-            List<StreamEdge> nonChainedOutputs =
-                    InstantiationUtil.readObjectFromConfig(this.config, NONCHAINED_OUTPUTS, cl);
-            return nonChainedOutputs == null ? new ArrayList<StreamEdge>() : nonChainedOutputs;
+            List<NonChainedOutput> nonChainedOutputs =
+                    InstantiationUtil.readObjectFromConfig(this.config, OP_NONCHAINED_OUTPUTS, cl);
+            return nonChainedOutputs == null ? new ArrayList<>() : nonChainedOutputs;
         } catch (Exception e) {
             throw new StreamTaskException("Could not instantiate non chained outputs.", e);
         }
@@ -520,15 +522,20 @@ public class StreamConfig implements Serializable {
                 ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, alignedCheckpointTimeout);
     }
 
-    public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
-        toBeSerializedConfigObjects.put(EDGES_IN_ORDER, outEdgeList);
+    /**
+     * Sets the job vertex level non-chained outputs. The given output list must have the same order
+     * with {@link JobVertex#getProducedDataSets()}.
+     */
+    public void setVertexNonChainedOutputs(List<NonChainedOutput> nonChainedOutputs) {
+        toBeSerializedConfigObjects.put(VERTEX_NONCHAINED_OUTPUTS, nonChainedOutputs);
     }
 
-    public List<StreamEdge> getOutEdgesInOrder(ClassLoader cl) {
+    public List<NonChainedOutput> getVertexNonChainedOutputs(ClassLoader cl) {
         try {
-            List<StreamEdge> outEdgesInOrder =
-                    InstantiationUtil.readObjectFromConfig(this.config, EDGES_IN_ORDER, cl);
-            return outEdgesInOrder == null ? new ArrayList<StreamEdge>() : outEdgesInOrder;
+            List<NonChainedOutput> nonChainedOutputs =
+                    InstantiationUtil.readObjectFromConfig(
+                            this.config, VERTEX_NONCHAINED_OUTPUTS, cl);
+            return nonChainedOutputs == null ? new ArrayList<>() : nonChainedOutputs;
         } catch (Exception e) {
             throw new StreamTaskException("Could not instantiate outputs in order.", e);
         }
@@ -721,11 +728,11 @@ public class StreamConfig implements Serializable {
         builder.append("=======================");
         builder.append("\nNumber of non-chained inputs: ").append(getNumberOfNetworkInputs());
         builder.append("\nNumber of non-chained outputs: ").append(getNumberOfOutputs());
-        builder.append("\nOutput names: ").append(getNonChainedOutputs(cl));
+        builder.append("\nOutput names: ").append(getOperatorNonChainedOutputs(cl));
         builder.append("\nPartitioning:");
-        for (StreamEdge output : getNonChainedOutputs(cl)) {
-            int outputname = output.getTargetId();
-            builder.append("\n\t").append(outputname).append(": ").append(output.getPartitioner());
+        for (NonChainedOutput output : getOperatorNonChainedOutputs(cl)) {
+            String outputName = output.getDataSetId().toString();
+            builder.append("\n\t").append(outputName).append(": ").append(output.getPartitioner());
         }
 
         builder.append("\nChained subtasks: ").append(getChainedOutputs(cl));
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 4cf9ed7c990..e44ed175626 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -97,9 +97,11 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -176,6 +178,8 @@ public class StreamingJobGraphGenerator {
     // Futures for the serialization of operator coordinators
     private final List<CompletableFuture<Void>> coordinatorSerializationFutures = new ArrayList<>();
 
+    private final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs;
+
     private StreamingJobGraphGenerator(
             StreamGraph streamGraph, @Nullable JobID jobID, Executor serializationExecutor) {
         this.streamGraph = streamGraph;
@@ -192,6 +196,7 @@ public class StreamingJobGraphGenerator {
         this.chainedInputOutputFormats = new HashMap<>();
         this.physicalEdgesInOrder = new ArrayList<>();
         this.serializationExecutor = Preconditions.checkNotNull(serializationExecutor);
+        this.opIntermediateOutputs = new HashMap<>();
 
         jobGraph = new JobGraph(jobID, streamGraph.getJobName());
     }
@@ -677,11 +682,15 @@ public class StreamingJobGraphGenerator {
                 config.setChainIndex(chainIndex);
                 config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
 
+                LinkedHashSet<NonChainedOutput> transitiveOutputs = new LinkedHashSet<>();
                 for (StreamEdge edge : transitiveOutEdges) {
-                    connect(startNodeId, edge);
+                    NonChainedOutput output =
+                            opIntermediateOutputs.get(edge.getSourceId()).get(edge);
+                    transitiveOutputs.add(output);
+                    connect(startNodeId, edge, output);
                 }
 
-                config.setOutEdgesInOrder(transitiveOutEdges);
+                config.setVertexNonChainedOutputs(new ArrayList<>(transitiveOutputs));
                 config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
 
             } else {
@@ -937,8 +946,10 @@ public class StreamingJobGraphGenerator {
 
         config.setStreamOperatorFactory(vertex.getOperatorFactory());
 
-        config.setNumberOfOutputs(nonChainableOutputs.size());
-        config.setNonChainedOutputs(nonChainableOutputs);
+        List<NonChainedOutput> deduplicatedOutputs =
+                mayReuseNonChainedOutputs(vertexID, nonChainableOutputs);
+        config.setNumberOfOutputs(deduplicatedOutputs.size());
+        config.setOperatorNonChainedOutputs(deduplicatedOutputs);
         config.setChainedOutputs(chainableOutputs);
 
         config.setTimeCharacteristic(streamGraph.getTimeCharacteristic());
@@ -975,6 +986,75 @@ public class StreamingJobGraphGenerator {
         vertexConfigs.put(vertexID, config);
     }
 
+    private List<NonChainedOutput> mayReuseNonChainedOutputs(
+            int vertexId, List<StreamEdge> consumerEdges) {
+        if (consumerEdges.isEmpty()) {
+            return new ArrayList<>();
+        }
+        List<NonChainedOutput> outputs = new ArrayList<>(consumerEdges.size());
+        Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge =
+                opIntermediateOutputs.computeIfAbsent(vertexId, ignored -> new HashMap<>());
+        for (StreamEdge consumerEdge : consumerEdges) {
+            checkState(vertexId == consumerEdge.getSourceId(), "Vertex id must be the same.");
+            int consumerParallelism =
+                    streamGraph.getStreamNode(consumerEdge.getTargetId()).getParallelism();
+            int consumerMaxParallelism =
+                    streamGraph.getStreamNode(consumerEdge.getTargetId()).getMaxParallelism();
+            StreamPartitioner<?> partitioner = consumerEdge.getPartitioner();
+            ResultPartitionType partitionType = getResultPartitionType(consumerEdge);
+            IntermediateDataSetID dataSetId = new IntermediateDataSetID();
+
+            boolean isPersistentDataSet =
+                    isPersistentIntermediateDataset(partitionType, consumerEdge);
+            if (isPersistentDataSet) {
+                partitionType = ResultPartitionType.BLOCKING_PERSISTENT;
+                dataSetId = consumerEdge.getIntermediateDatasetIdToProduce();
+            }
+
+            NonChainedOutput output =
+                    new NonChainedOutput(
+                            consumerEdge.supportsUnalignedCheckpoints(),
+                            consumerEdge.getSourceId(),
+                            consumerParallelism,
+                            consumerMaxParallelism,
+                            consumerEdge.getBufferTimeout(),
+                            isPersistentDataSet,
+                            dataSetId,
+                            consumerEdge.getOutputTag(),
+                            partitioner,
+                            partitionType);
+            if (!partitionType.isReconsumable()) {
+                outputs.add(output);
+                outputsConsumedByEdge.put(consumerEdge, output);
+            } else {
+                NonChainedOutput reusableOutput = null;
+                for (NonChainedOutput outputCandidate : outputsConsumedByEdge.values()) {
+                    // the target output can be reused if they have the same partitioner and
+                    // consumer parallelism, reusing the same output can improve performance
+                    if (outputCandidate.getPartitionType().isReconsumable()
+                            && consumerParallelism == outputCandidate.getConsumerParallelism()
+                            && consumerMaxParallelism == outputCandidate.getConsumerMaxParallelism()
+                            && outputCandidate.getPartitionType() == partitionType
+                            && Objects.equals(
+                                    outputCandidate.getPersistentDataSetId(),
+                                    consumerEdge.getIntermediateDatasetIdToProduce())
+                            && Objects.equals(
+                                    outputCandidate.getOutputTag(), consumerEdge.getOutputTag())
+                            && Objects.equals(partitioner, outputCandidate.getPartitioner())) {
+                        reusableOutput = outputCandidate;
+                        outputsConsumedByEdge.put(consumerEdge, reusableOutput);
+                        break;
+                    }
+                }
+                if (reusableOutput == null) {
+                    outputs.add(output);
+                    outputsConsumedByEdge.put(consumerEdge, output);
+                }
+            }
+        }
+        return outputs;
+    }
+
     private void tryConvertPartitionerForDynamicGraph(
             List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) {
 
@@ -1026,7 +1106,7 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    private void connect(Integer headOfChain, StreamEdge edge) {
+    private void connect(Integer headOfChain, StreamEdge edge, NonChainedOutput output) {
 
         physicalEdgesInOrder.add(edge);
 
@@ -1039,37 +1119,13 @@ public class StreamingJobGraphGenerator {
 
         downStreamConfig.setNumberOfNetworkInputs(downStreamConfig.getNumberOfNetworkInputs() + 1);
 
-        StreamPartitioner<?> partitioner = edge.getPartitioner();
-
-        ResultPartitionType resultPartitionType;
-        switch (edge.getExchangeMode()) {
-            case PIPELINED:
-                resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
-                break;
-            case BATCH:
-                resultPartitionType = ResultPartitionType.BLOCKING;
-                break;
-            case HYBRID:
-                resultPartitionType = ResultPartitionType.HYBRID;
-                break;
-            case UNDEFINED:
-                resultPartitionType = determineResultPartitionType(partitioner);
-                break;
-            default:
-                throw new UnsupportedOperationException(
-                        "Data exchange mode " + edge.getExchangeMode() + " is not supported yet.");
-        }
+        StreamPartitioner<?> partitioner = output.getPartitioner();
+        ResultPartitionType resultPartitionType = output.getPartitionType();
 
         if (resultPartitionType == ResultPartitionType.HYBRID) {
             hasHybridResultPartition = true;
         }
 
-        IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
-        if (isPersistentIntermediateDataset(resultPartitionType, edge)) {
-            resultPartitionType = ResultPartitionType.BLOCKING_PERSISTENT;
-            intermediateDataSetID = edge.getIntermediateDatasetIdToProduce();
-        }
-
         checkBufferTimeout(resultPartitionType, edge);
 
         JobEdge jobEdge;
@@ -1079,7 +1135,7 @@ public class StreamingJobGraphGenerator {
                             headVertex,
                             DistributionPattern.POINTWISE,
                             resultPartitionType,
-                            intermediateDataSetID,
+                            opIntermediateOutputs.get(edge.getSourceId()).get(edge).getDataSetId(),
                             partitioner.isBroadcast());
         } else {
             jobEdge =
@@ -1087,7 +1143,7 @@ public class StreamingJobGraphGenerator {
                             headVertex,
                             DistributionPattern.ALL_TO_ALL,
                             resultPartitionType,
-                            intermediateDataSetID,
+                            opIntermediateOutputs.get(edge.getSourceId()).get(edge).getDataSetId(),
                             partitioner.isBroadcast());
         }
 
@@ -1125,7 +1181,24 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    private ResultPartitionType determineResultPartitionType(StreamPartitioner<?> partitioner) {
+    private ResultPartitionType getResultPartitionType(StreamEdge edge) {
+        switch (edge.getExchangeMode()) {
+            case PIPELINED:
+                return ResultPartitionType.PIPELINED_BOUNDED;
+            case BATCH:
+                return ResultPartitionType.BLOCKING;
+            case HYBRID:
+                return ResultPartitionType.HYBRID;
+            case UNDEFINED:
+                return determineUndefinedResultPartitionType(edge.getPartitioner());
+            default:
+                throw new UnsupportedOperationException(
+                        "Data exchange mode " + edge.getExchangeMode() + " is not supported yet.");
+        }
+    }
+
+    private ResultPartitionType determineUndefinedResultPartitionType(
+            StreamPartitioner<?> partitioner) {
         switch (streamGraph.getGlobalStreamExchangeMode()) {
             case ALL_EDGES_BLOCKING:
                 return ResultPartitionType.BLOCKING;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
index 2bf229e1c12..e3c5cb442a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.InstantiationUtil;
 
 import java.io.IOException;
+import java.util.Objects;
 
 /**
  * Partitioner that selects the channel with a user defined partitioner function on a key.
@@ -76,6 +77,25 @@ public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
         }
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final CustomPartitionerWrapper<?, ?> that = (CustomPartitionerWrapper<?, ?>) o;
+        return numberOfChannels == that.numberOfChannels
+                && Objects.equals(partitioner, that.partitioner)
+                && Objects.equals(keySelector, that.keySelector);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), partitioner, keySelector);
+    }
+
     @Override
     public boolean isPointwise() {
         return false;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index d5f94d42412..dc41173dd8e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.io.network.api.StopMode;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
@@ -40,6 +41,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.BoundedMultiInput;
@@ -164,10 +166,11 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
 
         // create the final output stream writers
         // we iterate through all the out edges from this job vertex and create a stream output
-        List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
-        Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap =
-                new HashMap<>(outEdgesInOrder.size());
-        this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
+        List<NonChainedOutput> outputsInOrder =
+                configuration.getVertexNonChainedOutputs(userCodeClassloader);
+        Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs =
+                new HashMap<>(outputsInOrder.size());
+        this.streamOutputs = new RecordWriterOutput<?>[outputsInOrder.size()];
         this.finishedOnRestoreInput =
                 this.isTaskDeployedAsFinished()
                         ? new FinishedOnRestoreInput(
@@ -178,11 +181,11 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
         boolean success = false;
         try {
             createChainOutputs(
-                    outEdgesInOrder,
+                    outputsInOrder,
                     recordWriterDelegate,
                     chainedConfigs,
                     containingTask,
-                    streamOutputMap);
+                    recordWriterOutputs);
 
             // we create the chain of operators and grab the collector that leads into the chain
             List<StreamOperatorWrapper<?, ?>> allOpWrappers =
@@ -193,7 +196,7 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
                             configuration,
                             chainedConfigs,
                             userCodeClassloader,
-                            streamOutputMap,
+                            recordWriterOutputs,
                             allOpWrappers,
                             containingTask.getMailboxExecutorFactory());
 
@@ -499,40 +502,41 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
     // ------------------------------------------------------------------------
 
     private void createChainOutputs(
-            List<StreamEdge> outEdgesInOrder,
+            List<NonChainedOutput> outputsInOrder,
             RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
             Map<Integer, StreamConfig> chainedConfigs,
             StreamTask<OUT, OP> containingTask,
-            Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {
-        for (int i = 0; i < outEdgesInOrder.size(); i++) {
-            StreamEdge outEdge = outEdgesInOrder.get(i);
+            Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs) {
+        for (int i = 0; i < outputsInOrder.size(); ++i) {
+            NonChainedOutput output = outputsInOrder.get(i);
 
-            RecordWriterOutput<?> streamOutput =
+            RecordWriterOutput<?> recordWriterOutput =
                     createStreamOutput(
                             recordWriterDelegate.getRecordWriter(i),
-                            outEdge,
-                            chainedConfigs.get(outEdge.getSourceId()),
+                            output,
+                            chainedConfigs.get(output.getSourceNodeId()),
                             containingTask.getEnvironment());
 
-            this.streamOutputs[i] = streamOutput;
-            streamOutputMap.put(outEdge, streamOutput);
+            this.streamOutputs[i] = recordWriterOutput;
+            recordWriterOutputs.put(output.getDataSetId(), recordWriterOutput);
         }
     }
 
     private RecordWriterOutput<OUT> createStreamOutput(
             RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
-            StreamEdge edge,
+            NonChainedOutput streamOutput,
             StreamConfig upStreamConfig,
             Environment taskEnvironment) {
-        OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput
+        OutputTag sideOutputTag =
+                streamOutput.getOutputTag(); // OutputTag, return null if not sideOutput
 
         TypeSerializer outSerializer;
 
-        if (edge.getOutputTag() != null) {
+        if (streamOutput.getOutputTag() != null) {
             // side output
             outSerializer =
                     upStreamConfig.getTypeSerializerSideOut(
-                            edge.getOutputTag(),
+                            streamOutput.getOutputTag(),
                             taskEnvironment.getUserCodeClassLoader().asClassLoader());
         } else {
             // main output
@@ -546,7 +550,7 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
                         recordWriter,
                         outSerializer,
                         sideOutputTag,
-                        edge.supportsUnalignedCheckpoints()));
+                        streamOutput.supportsUnalignedCheckpoints()));
     }
 
     @SuppressWarnings("rawtypes")
@@ -647,18 +651,19 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
             StreamConfig operatorConfig,
             Map<Integer, StreamConfig> chainedConfigs,
             ClassLoader userCodeClassloader,
-            Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
+            Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs,
             List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
             MailboxExecutorFactory mailboxExecutorFactory) {
-        List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs =
-                new ArrayList<>(4);
+        List<WatermarkGaugeExposingOutput<StreamRecord<T>>> allOutputs = new ArrayList<>(4);
 
         // create collectors for the network outputs
-        for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
+        for (NonChainedOutput streamOutput :
+                operatorConfig.getOperatorNonChainedOutputs(userCodeClassloader)) {
             @SuppressWarnings("unchecked")
-            RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
+            RecordWriterOutput<T> recordWriterOutput =
+                    (RecordWriterOutput<T>) recordWriterOutputs.get(streamOutput.getDataSetId());
 
-            allOutputs.add(new Tuple2<>(output, outputEdge));
+            allOutputs.add(recordWriterOutput);
         }
 
         // Create collectors for the chained outputs
@@ -672,22 +677,22 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
                             chainedOpConfig,
                             chainedConfigs,
                             userCodeClassloader,
-                            streamOutputs,
+                            recordWriterOutputs,
                             allOperatorWrappers,
                             outputEdge.getOutputTag(),
                             mailboxExecutorFactory);
-            allOutputs.add(new Tuple2<>(output, outputEdge));
+            allOutputs.add(output);
         }
 
         if (allOutputs.size() == 1) {
-            return allOutputs.get(0).f0;
+            return allOutputs.get(0);
         } else {
             // send to N outputs. Note that this includes the special case
             // of sending to zero outputs
             @SuppressWarnings({"unchecked"})
             Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
             for (int i = 0; i < allOutputs.size(); i++) {
-                asArray[i] = allOutputs.get(i).f0;
+                asArray[i] = allOutputs.get(i);
             }
 
             // This is the inverse of creating the normal ChainingOutput.
@@ -710,7 +715,7 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
             StreamConfig operatorConfig,
             Map<Integer, StreamConfig> chainedConfigs,
             ClassLoader userCodeClassloader,
-            Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
+            Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs,
             List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
             OutputTag<IN> outputTag,
             MailboxExecutorFactory mailboxExecutorFactory) {
@@ -722,7 +727,7 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
                         operatorConfig,
                         chainedConfigs,
                         userCodeClassloader,
-                        streamOutputs,
+                        recordWriterOutputs,
                         allOperatorWrappers,
                         mailboxExecutorFactory);
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2e0610113b9..6e1080fba55 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -78,8 +78,8 @@ import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -1593,26 +1593,26 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
                     StreamConfig configuration, Environment environment) {
         List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters =
                 new ArrayList<>();
-        List<StreamEdge> outEdgesInOrder =
-                configuration.getOutEdgesInOrder(
+        List<NonChainedOutput> outputsInOrder =
+                configuration.getVertexNonChainedOutputs(
                         environment.getUserCodeClassLoader().asClassLoader());
 
-        for (int i = 0; i < outEdgesInOrder.size(); i++) {
-            StreamEdge edge = outEdgesInOrder.get(i);
+        int index = 0;
+        for (NonChainedOutput streamOutput : outputsInOrder) {
             recordWriters.add(
                     createRecordWriter(
-                            edge,
-                            i,
+                            streamOutput,
+                            index++,
                             environment,
                             environment.getTaskInfo().getTaskNameWithSubtasks(),
-                            edge.getBufferTimeout()));
+                            streamOutput.getBufferTimeout()));
         }
         return recordWriters;
     }
 
     @SuppressWarnings("unchecked")
     private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
-            StreamEdge edge,
+            NonChainedOutput streamOutput,
             int outputIndex,
             Environment environment,
             String taskNameWithSubtask,
@@ -1625,7 +1625,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
         try {
             outputPartitioner =
                     InstantiationUtil.clone(
-                            (StreamPartitioner<OUT>) edge.getPartitioner(),
+                            (StreamPartitioner<OUT>) streamOutput.getPartitioner(),
                             environment.getUserCodeClassLoader().asClassLoader());
         } catch (Exception e) {
             ExceptionUtils.rethrow(e);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index b38046bd7d2..4b6dd73ca7c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -49,6 +49,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
 import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobType;
@@ -123,6 +125,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE;
 import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.areOperatorsChainable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
@@ -869,9 +872,9 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
         for (JobVertex vertex : jobGraph.getVertices()) {
             final StreamConfig streamConfig = new StreamConfig(vertex.getConfiguration());
-            for (StreamEdge streamEdge :
-                    streamConfig.getOutEdgesInOrder(this.getClass().getClassLoader())) {
-                assertThat(streamEdge.getBufferTimeout(), equalTo(-1L));
+            for (NonChainedOutput output :
+                    streamConfig.getVertexNonChainedOutputs(this.getClass().getClassLoader())) {
+                assertThat(output.getBufferTimeout(), equalTo(-1L));
             }
         }
     }
@@ -1606,6 +1609,90 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                 .isEqualTo(cacheTransformation.getDatasetId());
     }
 
+    /**
+     * Tests that multiple downstream consumer vertices can reuse the same intermediate blocking
+     * dataset if they have the same parallelism and partitioner.
+     */
+    @Test
+    public void testIntermediateDataSetReuse() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setBufferTimeout(-1);
+        DataStream<Integer> source = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+        // these two vertices can reuse the same intermediate dataset
+        source.rebalance().addSink(new DiscardingSink<>()).setParallelism(2).name("sink1");
+        source.rebalance().addSink(new DiscardingSink<>()).setParallelism(2).name("sink2");
+
+        // this can not reuse the same intermediate dataset because of different parallelism
+        source.rebalance().addSink(new DiscardingSink<>()).setParallelism(3);
+
+        // this can not reuse the same intermediate dataset because of different partitioner
+        source.broadcast().addSink(new DiscardingSink<>()).setParallelism(2);
+
+        // these two vertices can reuse the same intermediate dataset because of the pipelined edge
+        source.forward().addSink(new DiscardingSink<>()).setParallelism(1).disableChaining();
+        source.forward().addSink(new DiscardingSink<>()).setParallelism(1).disableChaining();
+
+        DataStream<Integer> mapStream = source.forward().map(value -> value).setParallelism(1);
+
+        // these two vertices can reuse the same intermediate dataset
+        mapStream.broadcast().addSink(new DiscardingSink<>()).setParallelism(2).name("sink3");
+        mapStream.broadcast().addSink(new DiscardingSink<>()).setParallelism(2).name("sink4");
+
+        StreamGraph streamGraph = env.getStreamGraph();
+        streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.FORWARD_EDGES_PIPELINED);
+        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+        List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
+        Assertions.assertThat(vertices.size()).isEqualTo(9);
+
+        JobVertex sourceVertex = vertices.get(0);
+        List<IntermediateDataSetID> producedDataSet =
+                sourceVertex.getProducedDataSets().stream()
+                        .map(IntermediateDataSet::getId)
+                        .collect(Collectors.toList());
+        Assertions.assertThat(producedDataSet.size()).isEqualTo(6);
+
+        JobVertex sinkVertex1 = checkNotNull(findJobVertexWithName(vertices, "sink1"));
+        JobVertex sinkVertex2 = checkNotNull(findJobVertexWithName(vertices, "sink2"));
+        JobVertex sinkVertex3 = checkNotNull(findJobVertexWithName(vertices, "sink3"));
+        JobVertex sinkVertex4 = checkNotNull(findJobVertexWithName(vertices, "sink4"));
+
+        Assertions.assertThat(sinkVertex2.getInputs().get(0).getSource().getId())
+                .isEqualTo(sinkVertex1.getInputs().get(0).getSource().getId());
+        Assertions.assertThat(sinkVertex4.getInputs().get(0).getSource().getId())
+                .isEqualTo(sinkVertex3.getInputs().get(0).getSource().getId());
+        Assertions.assertThat(sinkVertex3.getInputs().get(0).getSource().getId())
+                .isNotEqualTo(sinkVertex1.getInputs().get(0).getSource().getId());
+
+        StreamConfig streamConfig = new StreamConfig(sourceVertex.getConfiguration());
+        List<IntermediateDataSetID> nonChainedOutputs =
+                streamConfig.getOperatorNonChainedOutputs(getClass().getClassLoader()).stream()
+                        .map(NonChainedOutput::getDataSetId)
+                        .collect(Collectors.toList());
+        Assertions.assertThat(nonChainedOutputs.size()).isEqualTo(5);
+        Assertions.assertThat(
+                        nonChainedOutputs.contains(
+                                sinkVertex3.getInputs().get(0).getSource().getId()))
+                .isFalse();
+
+        List<IntermediateDataSetID> streamOutputsInOrder =
+                streamConfig.getVertexNonChainedOutputs(getClass().getClassLoader()).stream()
+                        .map(NonChainedOutput::getDataSetId)
+                        .collect(Collectors.toList());
+        Assertions.assertThat(streamOutputsInOrder.size()).isEqualTo(6);
+        Assertions.assertThat(streamOutputsInOrder.toArray()).isEqualTo(producedDataSet.toArray());
+    }
+
+    private static JobVertex findJobVertexWithName(List<JobVertex> vertices, String name) {
+        for (JobVertex jobVertex : vertices) {
+            if (jobVertex.getName().contains(name)) {
+                return jobVertex;
+            }
+        }
+        return null;
+    }
+
     private JobGraph createJobGraphWithDescription(
             StreamExecutionEnvironment env, String... inputNames) {
         env.setParallelism(1);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java
index 9e0fd6cf19b..c33adbd92a8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java
@@ -19,24 +19,22 @@ package org.apache.flink.streaming.runtime.partitioner;
 
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
 
 import java.util.List;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
 /** Test for {@link ForwardForConsecutiveHashPartitioner}. */
-public class ForwardForConsecutiveHashPartitionerTest extends TestLogger {
+class ForwardForConsecutiveHashPartitionerTest extends TestLogger {
 
     @Test
-    public void testConvertToForwardPartitioner() {
+    void testConvertToForwardPartitioner() {
         testConvertToForwardPartitioner(StreamExchangeMode.BATCH);
         testConvertToForwardPartitioner(StreamExchangeMode.PIPELINED);
         testConvertToForwardPartitioner(StreamExchangeMode.UNDEFINED);
@@ -50,16 +48,16 @@ public class ForwardForConsecutiveHashPartitionerTest extends TestLogger {
                         new ForwardForConsecutiveHashPartitioner<>(
                                 new KeyGroupStreamPartitioner<>(record -> 0L, 100)));
         List<JobVertex> jobVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
-        assertThat(jobVertices.size(), is(1));
+        Assertions.assertThat(jobVertices.size()).isEqualTo(1);
         JobVertex vertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
 
         StreamConfig sourceConfig = new StreamConfig(vertex.getConfiguration());
         StreamEdge edge = sourceConfig.getChainedOutputs(getClass().getClassLoader()).get(0);
-        assertThat(edge.getPartitioner(), instanceOf(ForwardPartitioner.class));
+        Assertions.assertThat(edge.getPartitioner()).isInstanceOf(ForwardPartitioner.class);
     }
 
     @Test
-    public void testConvertToHashPartitioner() {
+    void testConvertToHashPartitioner() {
         testConvertToHashPartitioner(StreamExchangeMode.BATCH);
         testConvertToHashPartitioner(StreamExchangeMode.PIPELINED);
         testConvertToHashPartitioner(StreamExchangeMode.UNDEFINED);
@@ -73,11 +71,13 @@ public class ForwardForConsecutiveHashPartitionerTest extends TestLogger {
                         new ForwardForConsecutiveHashPartitioner<>(
                                 new KeyGroupStreamPartitioner<>(record -> 0L, 100)));
         List<JobVertex> jobVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
-        assertThat(jobVertices.size(), is(2));
+        Assertions.assertThat(jobVertices.size()).isEqualTo(2);
         JobVertex sourceVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
 
         StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
-        StreamEdge edge = sourceConfig.getNonChainedOutputs(getClass().getClassLoader()).get(0);
-        assertThat(edge.getPartitioner(), instanceOf(KeyGroupStreamPartitioner.class));
+        NonChainedOutput output =
+                sourceConfig.getOperatorNonChainedOutputs(getClass().getClassLoader()).get(0);
+        Assertions.assertThat(output.getPartitioner())
+                .isInstanceOf(KeyGroupStreamPartitioner.class);
     }
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForUnspecifiedPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForUnspecifiedPartitionerTest.java
index c5b846cea1a..1f736ad19de 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForUnspecifiedPartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForUnspecifiedPartitionerTest.java
@@ -19,46 +19,45 @@ package org.apache.flink.streaming.runtime.partitioner;
 
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
 
 import java.util.List;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
 /** Test for {@link ForwardForUnspecifiedPartitioner}. */
-public class ForwardForUnspecifiedPartitionerTest extends TestLogger {
+class ForwardForUnspecifiedPartitionerTest extends TestLogger {
 
     @Test
-    public void testConvertToForwardPartitioner() {
+    void testConvertToForwardPartitioner() {
         JobGraph jobGraph =
                 StreamPartitionerTestUtils.createJobGraph(
                         "group1", "group1", new ForwardForUnspecifiedPartitioner<>());
         List<JobVertex> jobVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
-        assertThat(jobVertices.size(), is(1));
+        Assertions.assertThat(jobVertices.size()).isEqualTo(1);
         JobVertex vertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
 
         StreamConfig sourceConfig = new StreamConfig(vertex.getConfiguration());
         StreamEdge edge = sourceConfig.getChainedOutputs(getClass().getClassLoader()).get(0);
-        assertThat(edge.getPartitioner(), instanceOf(ForwardPartitioner.class));
+        Assertions.assertThat(edge.getPartitioner()).isInstanceOf(ForwardPartitioner.class);
     }
 
     @Test
-    public void testConvertToRescalePartitioner() {
+    void testConvertToRescalePartitioner() {
         JobGraph jobGraph =
                 StreamPartitionerTestUtils.createJobGraph(
                         "group1", "group2", new ForwardForUnspecifiedPartitioner<>());
         List<JobVertex> jobVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
-        assertThat(jobVertices.size(), is(2));
+        Assertions.assertThat(jobVertices.size()).isEqualTo(2);
         JobVertex sourceVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
 
         StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
-        StreamEdge edge = sourceConfig.getNonChainedOutputs(getClass().getClassLoader()).get(0);
-        assertThat(edge.getPartitioner(), instanceOf(RescalePartitioner.class));
+        NonChainedOutput output =
+                sourceConfig.getOperatorNonChainedOutputs(getClass().getClassLoader()).get(0);
+        Assertions.assertThat(output.getPartitioner()).isInstanceOf(RescalePartitioner.class);
     }
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
index 5efb7914744..37520e0515c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -21,7 +21,10 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamNode;
@@ -166,36 +169,34 @@ public class StreamConfigChainer<OWNER> {
 
     public OWNER finish() {
         checkState(chainIndex > 0, "Use finishForSingletonOperatorChain");
-        List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
+        List<NonChainedOutput> outEdgesInOrder = new LinkedList<>();
 
         StreamNode sourceVertex =
                 new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null);
         for (int i = 0; i < numberOfNonChainedOutputs; ++i) {
-            StreamEdge streamEdge =
-                    new StreamEdge(
-                            sourceVertex,
-                            new StreamNode(
-                                    chainIndex + i,
-                                    null,
-                                    null,
-                                    (StreamOperator<?>) null,
-                                    null,
-                                    null),
-                            0,
+            NonChainedOutput streamOutput =
+                    new NonChainedOutput(
+                            true,
+                            sourceVertex.getId(),
+                            1,
+                            1,
+                            100,
+                            false,
+                            new IntermediateDataSetID(),
+                            null,
                             new BroadcastPartitioner<>(),
-                            null);
-            streamEdge.setBufferTimeout(1);
-            outEdgesInOrder.add(streamEdge);
+                            ResultPartitionType.PIPELINED_BOUNDED);
+            outEdgesInOrder.add(streamOutput);
         }
 
         tailConfig.setChainEnd();
         tailConfig.setNumberOfOutputs(numberOfNonChainedOutputs);
-        tailConfig.setOutEdgesInOrder(outEdgesInOrder);
-        tailConfig.setNonChainedOutputs(outEdgesInOrder);
+        tailConfig.setVertexNonChainedOutputs(outEdgesInOrder);
+        tailConfig.setOperatorNonChainedOutputs(outEdgesInOrder);
         chainedConfigs.values().forEach(StreamConfig::serializeAllConfigs);
 
         headConfig.setAndSerializeTransitiveChainedTaskConfigs(chainedConfigs);
-        headConfig.setOutEdgesInOrder(outEdgesInOrder);
+        headConfig.setVertexNonChainedOutputs(outEdgesInOrder);
         headConfig.serializeAllConfigs();
 
         return owner;
@@ -209,7 +210,7 @@ public class StreamConfigChainer<OWNER> {
                 new AbstractStreamOperator<OUT>() {
                     private static final long serialVersionUID = 1L;
                 };
-        List<StreamEdge> outEdgesInOrder = new LinkedList<>();
+        List<NonChainedOutput> streamOutputs = new LinkedList<>();
 
         StreamNode sourceVertexDummy =
                 new StreamNode(
@@ -220,31 +221,27 @@ public class StreamConfigChainer<OWNER> {
                         "source dummy",
                         SourceStreamTask.class);
         for (int i = 0; i < numberOfNonChainedOutputs; ++i) {
-            StreamNode targetVertexDummy =
-                    new StreamNode(
-                            MAIN_NODE_ID + 1 + i,
-                            "group",
+            streamOutputs.add(
+                    new NonChainedOutput(
+                            true,
+                            sourceVertexDummy.getId(),
+                            1,
+                            1,
+                            100,
+                            false,
+                            new IntermediateDataSetID(),
                             null,
-                            dummyOperator,
-                            "target dummy",
-                            SourceStreamTask.class);
-
-            outEdgesInOrder.add(
-                    new StreamEdge(
-                            sourceVertexDummy,
-                            targetVertexDummy,
-                            0,
                             new BroadcastPartitioner<>(),
-                            null));
+                            ResultPartitionType.PIPELINED_BOUNDED));
         }
 
         headConfig.setVertexID(0);
         headConfig.setNumberOfOutputs(1);
-        headConfig.setOutEdgesInOrder(outEdgesInOrder);
-        headConfig.setNonChainedOutputs(outEdgesInOrder);
+        headConfig.setVertexNonChainedOutputs(streamOutputs);
+        headConfig.setOperatorNonChainedOutputs(streamOutputs);
         chainedConfigs.values().forEach(StreamConfig::serializeAllConfigs);
         headConfig.setAndSerializeTransitiveChainedTaskConfigs(chainedConfigs);
-        headConfig.setOutEdgesInOrder(outEdgesInOrder);
+        headConfig.setVertexNonChainedOutputs(streamOutputs);
         headConfig.setTypeSerializerOut(outputSerializer);
         headConfig.serializeAllConfigs();
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
index 299db5d10a3..7fb875ada35 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
@@ -27,8 +27,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
@@ -43,6 +45,7 @@ import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
 import org.apache.flink.runtime.throughput.BufferDebloatConfiguration;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig;
 import org.apache.flink.streaming.api.graph.StreamConfig.NetworkInputConfig;
@@ -340,7 +343,7 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
                         ? StreamConfigChainer.MAIN_NODE_ID
                         : Collections.max(transitiveChainedTaskConfigs.keySet());
 
-        List<StreamEdge> outEdgesInOrder = new LinkedList<>();
+        List<StreamEdge> chainedOutputs = new LinkedList<>();
 
         StreamEdge sourceToMainEdge =
                 new StreamEdge(
@@ -355,12 +358,26 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
                         0,
                         new ForwardPartitioner<>(),
                         null);
-        outEdgesInOrder.add(sourceToMainEdge);
+        chainedOutputs.add(sourceToMainEdge);
+
+        List<NonChainedOutput> streamOutputsInOrder = new LinkedList<>();
+        streamOutputsInOrder.add(
+                new NonChainedOutput(
+                        true,
+                        sourceToMainEdge.getSourceId(),
+                        1,
+                        1,
+                        100,
+                        false,
+                        new IntermediateDataSetID(),
+                        null,
+                        new ForwardPartitioner<>(),
+                        ResultPartitionType.PIPELINED_BOUNDED));
 
         StreamConfig sourceConfig = new StreamConfig(new Configuration());
         sourceConfig.setTimeCharacteristic(streamConfig.getTimeCharacteristic());
-        sourceConfig.setOutEdgesInOrder(outEdgesInOrder);
-        sourceConfig.setChainedOutputs(outEdgesInOrder);
+        sourceConfig.setVertexNonChainedOutputs(streamOutputsInOrder);
+        sourceConfig.setChainedOutputs(chainedOutputs);
         sourceConfig.setTypeSerializerOut(sourceInput.getSourceSerializer());
         sourceConfig.setOperatorID(sourceInput.getOperatorId());
         sourceConfig.setStreamOperatorFactory(sourceInput.getSourceOperatorFactory());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 5a9930ef709..805b267726b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -34,7 +34,9 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.EndOfData;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.StopMode;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.memory.MemoryManager;
@@ -50,8 +52,8 @@ import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
@@ -237,24 +239,26 @@ public class StreamTaskTestHarness<OUT> {
                     private static final long serialVersionUID = 1L;
                 };
 
-        List<StreamEdge> outEdgesInOrder = new LinkedList<>();
+        List<NonChainedOutput> streamOutputs = new LinkedList<>();
         StreamNode sourceVertexDummy =
                 new StreamNode(
                         0, "group", null, dummyOperator, "source dummy", SourceStreamTask.class);
-        StreamNode targetVertexDummy =
-                new StreamNode(
-                        1, "group", null, dummyOperator, "target dummy", SourceStreamTask.class);
 
-        outEdgesInOrder.add(
-                new StreamEdge(
-                        sourceVertexDummy,
-                        targetVertexDummy,
-                        0,
+        streamOutputs.add(
+                new NonChainedOutput(
+                        true,
+                        sourceVertexDummy.getId(),
+                        1,
+                        1,
+                        100,
+                        false,
+                        new IntermediateDataSetID(),
+                        null,
                         new BroadcastPartitioner<>(),
-                        null /* output tag */));
+                        ResultPartitionType.PIPELINED_BOUNDED));
 
-        streamConfig.setOutEdgesInOrder(outEdgesInOrder);
-        streamConfig.setNonChainedOutputs(outEdgesInOrder);
+        streamConfig.setVertexNonChainedOutputs(streamOutputs);
+        streamConfig.setOperatorNonChainedOutputs(streamOutputs);
         streamConfig.serializeAllConfigs();
     }
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
index 11453a61106..3080e55f512 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
@@ -19,9 +19,11 @@ package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -51,21 +53,24 @@ public class MockStreamConfig extends StreamConfig {
 
         StreamNode sourceVertex =
                 new StreamNode(0, null, null, dummyOperator, "source", SourceStreamTask.class);
-        StreamNode targetVertex =
-                new StreamNode(1, null, null, dummyOperator, "target", SourceStreamTask.class);
 
-        List<StreamEdge> outEdgesInOrder = new ArrayList<>(numberOfOutputs);
+        List<NonChainedOutput> streamOutputs = new ArrayList<>(numberOfOutputs);
         for (int i = 0; i < numberOfOutputs; i++) {
-            outEdgesInOrder.add(
-                    new StreamEdge(
-                            sourceVertex,
-                            targetVertex,
-                            numberOfOutputs,
+            streamOutputs.add(
+                    new NonChainedOutput(
+                            true,
+                            sourceVertex.getId(),
+                            1,
+                            1,
+                            100,
+                            false,
+                            new IntermediateDataSetID(),
+                            null,
                             new BroadcastPartitioner<>(),
-                            null));
+                            ResultPartitionType.PIPELINED_BOUNDED));
         }
-        setOutEdgesInOrder(outEdgesInOrder);
-        setNonChainedOutputs(outEdgesInOrder);
+        setVertexNonChainedOutputs(streamOutputs);
+        setOperatorNonChainedOutputs(streamOutputs);
         serializeAllConfigs();
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/partitioner/BinaryHashPartitioner.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/partitioner/BinaryHashPartitioner.java
index e04ce75ee9c..faa18cafebf 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/partitioner/BinaryHashPartitioner.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/partitioner/BinaryHashPartitioner.java
@@ -76,6 +76,26 @@ public class BinaryHashPartitioner extends StreamPartitioner<RowData> {
         return false;
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final BinaryHashPartitioner that = (BinaryHashPartitioner) o;
+        return numberOfChannels == that.numberOfChannels
+                && Arrays.equals(hashFieldNames, that.hashFieldNames);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + Arrays.hashCode(hashFieldNames);
+        return result;
+    }
+
     @Override
     public String toString() {
         return "HASH" + Arrays.toString(hashFieldNames);