You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/25 08:10:15 UTC

[GitHub] [flink] wsry opened a new pull request, #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

wsry opened a new pull request, #20350:
URL: https://github.com/apache/flink/pull/20350

   ## What is the purpose of the change
   
   Currently, one intermediate dataset can only be consumed by one downstream consumer vertex. If there are multiple consumer vertices consuming the same output of the same upstream vertex, multiple intermediate datasets will be produced. We can optimize this behavior to produce only one intermediate dataset which can be shared by multiple consumer vertices. As the first step, we should allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side. (Note that this optimization only works for blocking shuffle because pipelined shuffle result partition can not be consumed multiple times)
   
   ## Brief change log
   
     - Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side
   
   
   ## Verifying this change
   
   This change added tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20350:
URL: https://github.com/apache/flink/pull/20350#discussion_r935529490


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java:
##########
@@ -88,6 +95,10 @@ public boolean isEmpty() {
         return resultPartitions.isEmpty();
     }
 
+    public int getNumConsumers() {

Review Comment:
   It's better to add a comment to explain that the number of consumers of `ConsumedPartitionGroup` can be different even if  they contains the same `IntermediateResultPartition`, in dynamic graph cases.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java:
##########
@@ -145,13 +144,17 @@ private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNums(
 
         for (int i = 0; i < producedDataSets.size(); i++) {
             IntermediateDataSet producedDataSet = producedDataSets.get(i);
-            JobEdge outputEdge = checkNotNull(producedDataSet.getConsumer());
-            ExecutionJobVertex consumerJobVertex = ejvs.apply(outputEdge.getTarget().getID());
-            int maxNum =
-                    EdgeManagerBuildUtil.computeMaxEdgesToTargetExecutionVertex(
-                            ejv.getParallelism(),
-                            consumerJobVertex.getParallelism(),
-                            outputEdge.getDistributionPattern());
+            int maxNum = 0;
+            for (JobEdge outputEdge : producedDataSet.getConsumers()) {
+                ExecutionJobVertex consumerJobVertex = ejvs.apply(outputEdge.getTarget().getID());
+                maxNum =
+                        Math.max(
+                                maxNum,
+                                EdgeManagerBuildUtil.computeMaxEdgesToTargetExecutionVertex(
+                                        ejv.getParallelism(),
+                                        consumerJobVertex.getParallelism(),
+                                        outputEdge.getDistributionPattern()));

Review Comment:
   Maybe just compute it from the first consumer group? This method is only used for non-dynamic graph, and therefore the consumer vertices' parallelisms and distribution patterns must be the same. It's also better to add a comment to explain the assumption here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java:
##########
@@ -56,6 +67,26 @@ public IntermediateResultPartition(
         this.producer = producer;
         this.partitionId = new IntermediateResultPartitionID(totalResult.getId(), partitionNumber);
         this.edgeManager = edgeManager;
+        this.consumerVertices = totalResult.getConsumerVertices();
+    }
+
+    public void releaseConsumedPartitionGroup(ConsumedPartitionGroup partitionGroup) {
+        releasedPartitionGroups.add(partitionGroup);
+    }
+
+    public boolean canBeReleased() {
+        if (releasedPartitionGroups.size()
+                != edgeManager.getNumberOfConsumedPartitionGroupsById(partitionId)) {
+            return false;
+        }
+        for (JobVertexID jobVertexID : consumerVertices) {

Review Comment:
   nit: jobVertexID -> jobVertexId



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java:
##########
@@ -162,20 +176,52 @@ int getNumParallelProducers() {
         return numParallelProducers;
     }
 
-    ExecutionJobVertex getConsumerExecutionJobVertex() {
-        final JobEdge consumer = checkNotNull(intermediateDataSet.getConsumer());
-        final JobVertexID consumerJobVertexId = consumer.getTarget().getID();
-        return checkNotNull(getProducer().getGraph().getJobVertex(consumerJobVertexId));
+    int getConsumersParallelism() {
+        List<JobEdge> consumers = intermediateDataSet.getConsumers();
+        checkState(!consumers.isEmpty());
+
+        InternalExecutionGraphAccessor graph = getProducer().getGraph();
+        int consumersParallelism =
+                graph.getJobVertex(consumers.get(0).getTarget().getID()).getParallelism();
+        if (consumers.size() == 1) {
+            return consumersParallelism;
+        }
+
+        // sanity check, all consumer vertices must have the same parallelism
+        for (JobVertexID jobVertexID : consumerVertices) {

Review Comment:
   Better to add some comments to explain the assumption that
   - the parallelisms will all be -1 for vertices that are not assigned a parallelism initially (this also means the parallelisms of the downstream vertices are not decided yet at this moment)
   - the parallelisms must be the same for vertices that are initially assigned a parallelism
   
   Above are just my understanding and you may need to organize them in a clear way.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -528,4 +546,150 @@ public static void verifyGeneratedExecutionJobVertex(
             subtaskIndex++;
         }
     }
+
+    public static SchedulerBase createSchedulerAndDeploy(

Review Comment:
   This method should be added to `SchedulerTestingUtils`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java:
##########
@@ -251,11 +279,24 @@ public static ExecutionGraph createExecutionGraph(
             v2.setMaxParallelism(consumerMaxParallelism);
         }
 
-        v2.connectNewDataSetAsInput(v1, distributionPattern, ResultPartitionType.BLOCKING);
+        final JobVertex v3 = new JobVertex("v3");

Review Comment:
   To double confirm, is this change helps to verify that the previous test can still pass with no exception with multi consumer job vertices?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java:
##########
@@ -56,6 +67,26 @@ public IntermediateResultPartition(
         this.producer = producer;
         this.partitionId = new IntermediateResultPartitionID(totalResult.getId(), partitionNumber);
         this.edgeManager = edgeManager;
+        this.consumerVertices = totalResult.getConsumerVertices();
+    }
+
+    public void releaseConsumedPartitionGroup(ConsumedPartitionGroup partitionGroup) {
+        releasedPartitionGroups.add(partitionGroup);
+    }
+
+    public boolean canBeReleased() {
+        if (releasedPartitionGroups.size()
+                != edgeManager.getNumberOfConsumedPartitionGroupsById(partitionId)) {
+            return false;
+        }
+        for (JobVertexID jobVertexID : consumerVertices) {
+            // for adaptive scheduler, if any consumer vertex is still not initialized, this result

Review Comment:
   adaptive scheduler -> dynamic graph
   
   This is more accurate.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java:
##########
@@ -74,36 +74,21 @@ public class JobEdge implements java.io.Serializable {
      * @param source The data set that is at the source of this edge.
      * @param target The operation that is at the target of this edge.
      * @param distributionPattern The pattern that defines how the connection behaves in parallel.
+     * @param isBroadcast Whether the source broadcasts data to the target.
      */
     public JobEdge(
-            IntermediateDataSet source, JobVertex target, DistributionPattern distributionPattern) {
+            IntermediateDataSet source,
+            JobVertex target,
+            DistributionPattern distributionPattern,
+            boolean isBroadcast) {

Review Comment:
   With this change, `source` can be final and is never null. `sourceId` is no longer needed and should be retrieved from the `source`. `isIdReference` can be removed, as well as its usages.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java:
##########
@@ -47,6 +49,15 @@ public class IntermediateResultPartition {
     /** Whether this partition has produced some data. */
     private boolean hasDataProduced = false;
 
+    /**
+     * Released {@link ConsumedPartitionGroup}s for this result partition. This result partition can
+     * be released if all {@link ConsumedPartitionGroup}s are released.
+     */
+    private final Set<ConsumedPartitionGroup> releasedPartitionGroups = new HashSet<>();
+
+    /** All consumer job vertex ids of the corresponding {@link IntermediateResult}. */
+    private final List<JobVertexID> consumerVertices;

Review Comment:
   It's better to add a `getConsumerJobVertices` method to return `totalResult.getConsumerVertices()`, instead of maintain the list by itself. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##########
@@ -718,31 +715,34 @@ public CompletableFuture<?> suspend() {
     }
 
     private void updatePartitionConsumers(final IntermediateResultPartition partition) {
-        final Optional<ConsumerVertexGroup> consumerVertexGroup =
-                partition.getConsumerVertexGroupOptional();
-        if (!consumerVertexGroup.isPresent()) {
+        final List<ConsumerVertexGroup> consumerVertexGroups = partition.getConsumerVertexGroups();
+        if (consumerVertexGroups.isEmpty()) {
             return;
         }
-        for (ExecutionVertexID consumerVertexId : consumerVertexGroup.get()) {
-            final ExecutionVertex consumerVertex =
-                    vertex.getExecutionGraphAccessor().getExecutionVertexOrThrow(consumerVertexId);
-            final Execution consumer = consumerVertex.getCurrentExecutionAttempt();
-            final ExecutionState consumerState = consumer.getState();
-
-            // ----------------------------------------------------------------
-            // Consumer is recovering or running => send update message now
-            // Consumer is deploying => cache the partition info which would be
-            // sent after switching to running
-            // ----------------------------------------------------------------
-            if (consumerState == DEPLOYING
-                    || consumerState == RUNNING
-                    || consumerState == INITIALIZING) {
-                final PartitionInfo partitionInfo = createPartitionInfo(partition);
-
-                if (consumerState == DEPLOYING) {
-                    consumerVertex.cachePartitionInfo(partitionInfo);
-                } else {
-                    consumer.sendUpdatePartitionInfoRpcCall(Collections.singleton(partitionInfo));
+        for (ConsumerVertexGroup consumerVertexGroup : consumerVertexGroups) {
+            for (ExecutionVertexID consumerVertexId : consumerVertexGroup) {
+                final ExecutionVertex consumerVertex =

Review Comment:
   The `consumerVertex` can be visited multiple times. It's better to compute a set of vertices and iterate over them later.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/BlockingResultPartitionReleaseTest.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.TestingBlobWriter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSchedulerAndDeploy;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.transitionTasksToFinished;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests that blocking result partitions are properly released. */
+class BlockingResultPartitionReleaseTest {
+
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private ScheduledExecutorService scheduledExecutorService;
+    private ComponentMainThreadExecutor mainThreadExecutor;
+    private ManuallyTriggeredScheduledExecutorService ioExecutor;
+
+    @BeforeEach
+    void setup() {
+        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+        mainThreadExecutor =
+                ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        scheduledExecutorService);
+        ioExecutor = new ManuallyTriggeredScheduledExecutorService();
+    }
+
+    @AfterEach
+    void teardown() {
+        if (scheduledExecutorService != null) {
+            scheduledExecutorService.shutdownNow();
+        }
+    }
+
+    @Test
+    void testMultipleConsumersForAdaptiveScheduler() throws Exception {
+        testResultPartitionConsumedByMultiConsumers(true);
+    }
+
+    @Test
+    void testMultipleConsumersForDefaultScheduler() throws Exception {
+        testResultPartitionConsumedByMultiConsumers(false);
+    }
+
+    private void testResultPartitionConsumedByMultiConsumers(boolean isAdaptive) throws Exception {
+        int parallelism = 2;
+        JobID jobId = new JobID();
+        JobVertex producer = ExecutionGraphTestUtils.createNoOpVertex("producer", parallelism);
+        JobVertex consumer1 = ExecutionGraphTestUtils.createNoOpVertex("consumer1", parallelism);
+        JobVertex consumer2 = ExecutionGraphTestUtils.createNoOpVertex("consumer2", parallelism);
+
+        TestingPartitionTracker partitionTracker = new TestingPartitionTracker();
+        SchedulerBase scheduler =
+                createSchedulerAndDeploy(
+                        isAdaptive,
+                        jobId,
+                        producer,
+                        new JobVertex[] {consumer1, consumer2},
+                        DistributionPattern.ALL_TO_ALL,
+                        new TestingBlobWriter(Integer.MAX_VALUE),
+                        mainThreadExecutor,
+                        ioExecutor,
+                        partitionTracker,
+                        EXECUTOR_RESOURCE.getExecutor());
+        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
+
+        assertThat(partitionTracker.releasedPartitions).isEmpty();
+
+        CompletableFuture.runAsync(
+                        () -> transitionTasksToFinished(executionGraph, consumer1.getID()),
+                        mainThreadExecutor)
+                .join();
+        ioExecutor.triggerAll();
+
+        assertThat(partitionTracker.releasedPartitions).isEmpty();
+
+        CompletableFuture.runAsync(
+                        () -> transitionTasksToFinished(executionGraph, consumer2.getID()),
+                        mainThreadExecutor)
+                .join();
+        ioExecutor.triggerAll();
+
+        assertThat(partitionTracker.releasedPartitions.size()).isEqualTo(parallelism);
+        for (int i = 0; i < parallelism; ++i) {
+            ExecutionJobVertex ejv = checkNotNull(executionGraph.getJobVertex(producer.getID()));
+            assertThat(

Review Comment:
   Looks to me what should be verified here is that the `releasedPartitions` include all the partitions produced by the producer. It can be written as:
   ```
           ExecutionJobVertex ejv = checkNotNull(executionGraph.getJobVertex(producer.getID()));
           assertThat(
                           partitionTracker.releasedPartitions.stream()
                                   .map(ResultPartitionID::getPartitionId))
                   .containsExactlyInAnyOrder(
                           Arrays.stream(ejv.getProducedDataSets()[0].getPartitions())
                                   .map(IntermediateResultPartition::getPartitionId)
                                   .toArray(IntermediateResultPartitionID[]::new));
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/EdgeManagerTest.java:
##########
@@ -85,13 +76,72 @@ public void testGetConsumedPartitionGroup() throws Exception {
         assertEquals(groupRetrievedByDownstreamVertex, groupRetrievedByIntermediateResultPartition);
 
         ConsumedPartitionGroup groupRetrievedByScheduledResultPartition =
-                scheduler
-                        .getExecutionGraph()
-                        .getSchedulingTopology()
+                eg.getSchedulingTopology()
                         .getResultPartition(consumedPartition.getPartitionId())
                         .getConsumedPartitionGroups()
                         .get(0);
 
         assertEquals(groupRetrievedByDownstreamVertex, groupRetrievedByScheduledResultPartition);
     }
+
+    @Test
+    public void testCalculateNumberOfConsumers() throws Exception {
+        testCalculateNumberOfConsumers(5, 2, ALL_TO_ALL, new int[] {2, 2});
+        testCalculateNumberOfConsumers(5, 2, POINTWISE, new int[] {1, 1});
+        testCalculateNumberOfConsumers(2, 5, ALL_TO_ALL, new int[] {5, 5, 5, 5, 5});
+        testCalculateNumberOfConsumers(2, 5, POINTWISE, new int[] {3, 3, 3, 2, 2});
+        testCalculateNumberOfConsumers(5, 5, ALL_TO_ALL, new int[] {5, 5, 5, 5, 5});
+        testCalculateNumberOfConsumers(5, 5, POINTWISE, new int[] {1, 1, 1, 1, 1});
+    }
+
+    private void testCalculateNumberOfConsumers(
+            int producerParallelism,
+            int consumerParallelism,
+            DistributionPattern distributionPattern,
+            int[] expectedConsumers)
+            throws Exception {
+        JobVertex producer = new JobVertex("producer");
+        JobVertex consumer = new JobVertex("consumer");
+        ExecutionGraph eg =
+                buildExecutionGraph(
+                        producer,
+                        consumer,
+                        producerParallelism,
+                        consumerParallelism,
+                        distributionPattern);
+        List<ConsumedPartitionGroup> partitionGroups =
+                Arrays.stream(checkNotNull(eg.getJobVertex(consumer.getID())).getTaskVertices())
+                        .flatMap(ev -> ev.getAllConsumedPartitionGroups().stream())
+                        .collect(Collectors.toList());
+        int index = 0;
+        for (ConsumedPartitionGroup partitionGroup : partitionGroups) {
+            Assertions.assertThat(partitionGroup.getNumConsumers())
+                    .isEqualTo(expectedConsumers[index++]);
+        }
+    }
+
+    private DefaultExecutionGraph buildExecutionGraph(
+            JobVertex producer,
+            JobVertex consumer,
+            int producerParallelism,
+            int consumerParallelism,
+            DistributionPattern distributionPattern)
+            throws Exception {
+        producer.setParallelism(producerParallelism);
+        consumer.setParallelism(consumerParallelism);
+
+        producer.setInvokableClass(NoOpInvokable.class);
+        consumer.setInvokableClass(NoOpInvokable.class);
+
+        consumer.connectNewDataSetAsInput(
+                producer, distributionPattern, ResultPartitionType.BLOCKING);
+
+        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(producer, consumer);
+        SchedulerBase scheduler =
+                SchedulerTestingUtils.createScheduler(
+                        jobGraph,
+                        ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        EXECUTOR_RESOURCE.getExecutor());
+        return (DefaultExecutionGraph) scheduler.getExecutionGraph();

Review Comment:
   Seems there is no need to convert it to a `DefaultExecutionGraph`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java:
##########
@@ -47,6 +49,15 @@ public class IntermediateResultPartition {
     /** Whether this partition has produced some data. */
     private boolean hasDataProduced = false;
 
+    /**
+     * Released {@link ConsumedPartitionGroup}s for this result partition. This result partition can
+     * be released if all {@link ConsumedPartitionGroup}s are released.
+     */
+    private final Set<ConsumedPartitionGroup> releasedPartitionGroups = new HashSet<>();

Review Comment:
   Looks to me it's enough to recording the number of releasable groups?
   
   I also think it's better to name it as `releasablePartitionGroups`, and rename `releaseConsumedPartitionGroup()` to `markPartitionGroupReleasable()`, the comments should be updated as well.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/BlockingResultPartitionReleaseTest.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.TestingBlobWriter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSchedulerAndDeploy;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.transitionTasksToFinished;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests that blocking result partitions are properly released. */
+class BlockingResultPartitionReleaseTest {
+
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private ScheduledExecutorService scheduledExecutorService;
+    private ComponentMainThreadExecutor mainThreadExecutor;
+    private ManuallyTriggeredScheduledExecutorService ioExecutor;
+
+    @BeforeEach
+    void setup() {
+        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+        mainThreadExecutor =
+                ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        scheduledExecutorService);
+        ioExecutor = new ManuallyTriggeredScheduledExecutorService();
+    }
+
+    @AfterEach
+    void teardown() {
+        if (scheduledExecutorService != null) {
+            scheduledExecutorService.shutdownNow();
+        }
+    }
+
+    @Test
+    void testMultipleConsumersForAdaptiveScheduler() throws Exception {

Review Comment:
   -> testMultipleConsumersForAdaptiveBatchScheduler



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -528,4 +546,150 @@ public static void verifyGeneratedExecutionJobVertex(
             subtaskIndex++;
         }
     }
+
+    public static SchedulerBase createSchedulerAndDeploy(
+            boolean isAdaptive,
+            JobID jobId,
+            JobVertex producer,
+            JobVertex[] consumers,
+            DistributionPattern distributionPattern,
+            BlobWriter blobWriter,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            ScheduledExecutorService ioExecutor,
+            JobMasterPartitionTracker partitionTracker,
+            ScheduledExecutorService scheduledExecutor)
+            throws Exception {
+        final List<JobVertex> vertices = new ArrayList<>(Collections.singletonList(producer));
+        IntermediateDataSetID dataSetId = new IntermediateDataSetID();
+        for (JobVertex consumer : consumers) {
+            consumer.connectNewDataSetAsInput(
+                    producer, distributionPattern, ResultPartitionType.BLOCKING, dataSetId, false);
+            vertices.add(consumer);
+        }
+
+        final SchedulerBase scheduler =
+                createScheduler(
+                        isAdaptive,
+                        jobId,
+                        vertices,
+                        blobWriter,
+                        mainThreadExecutor,
+                        ioExecutor,
+                        partitionTracker,
+                        scheduledExecutor);
+        final ExecutionGraph executionGraph = scheduler.getExecutionGraph();
+        if (isAdaptive) {
+            initializeExecutionJobVertex(producer.getID(), executionGraph, mainThreadExecutor);
+        }
+        final TestingLogicalSlotBuilder slotBuilder = new TestingLogicalSlotBuilder();
+
+        CompletableFuture.runAsync(
+                        () -> {
+                            try {
+                                // Deploy upstream source vertices
+                                deployTasks(executionGraph, producer.getID(), slotBuilder);
+                                // Transition upstream vertices into FINISHED
+                                transitionTasksToFinished(executionGraph, producer.getID());
+                                // Deploy downstream sink vertices
+                                for (JobVertex consumer : consumers) {
+                                    if (isAdaptive) {
+                                        initializeExecutionJobVertex(
+                                                consumer.getID(), executionGraph);
+                                    }
+                                    deployTasks(executionGraph, consumer.getID(), slotBuilder);
+                                }
+                            } catch (Exception e) {
+                                throw new RuntimeException("Exceptions shouldn't happen here.", e);
+                            }
+                        },
+                        mainThreadExecutor)
+                .join();
+        return scheduler;
+    }
+
+    private static void initializeExecutionJobVertex(
+            JobVertexID jobVertex,
+            ExecutionGraph executionGraph,
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        CompletableFuture.runAsync(
+                        () -> initializeExecutionJobVertex(jobVertex, executionGraph),
+                        mainThreadExecutor)
+                .join();
+    }
+
+    private static void initializeExecutionJobVertex(
+            JobVertexID jobVertex, ExecutionGraph executionGraph) {
+        try {
+            executionGraph.initializeJobVertex(
+                    executionGraph.getJobVertex(jobVertex), System.currentTimeMillis());
+            executionGraph.notifyNewlyInitializedJobVertices(
+                    Collections.singletonList(executionGraph.getJobVertex(jobVertex)));
+        } catch (JobException exception) {
+            throw new RuntimeException(exception);
+        }
+    }
+
+    private static DefaultScheduler createScheduler(
+            boolean isAdaptive,
+            JobID jobId,
+            List<JobVertex> jobVertices,
+            BlobWriter blobWriter,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            ScheduledExecutorService ioExecutor,
+            JobMasterPartitionTracker partitionTracker,
+            ScheduledExecutorService scheduledExecutor)
+            throws Exception {
+        final JobGraph jobGraph =
+                JobGraphBuilder.newBatchJobGraphBuilder()
+                        .setJobId(jobId)
+                        .addJobVertices(jobVertices)
+                        .build();
+
+        final DefaultSchedulerBuilder builder =
+                new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, scheduledExecutor)
+                        .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0))
+                        .setBlobWriter(blobWriter)
+                        .setIoExecutor(ioExecutor)
+                        .setPartitionTracker(partitionTracker);
+        return isAdaptive ? builder.buildAdaptiveBatchJobScheduler() : builder.build();
+    }
+
+    private static void deployTasks(
+            ExecutionGraph executionGraph,
+            JobVertexID jobVertexID,
+            TestingLogicalSlotBuilder slotBuilder)
+            throws JobException, ExecutionException, InterruptedException {
+
+        for (ExecutionVertex vertex :
+                Objects.requireNonNull(executionGraph.getJobVertex(jobVertexID))
+                        .getTaskVertices()) {
+            LogicalSlot slot = slotBuilder.createTestingLogicalSlot();
+
+            Execution execution = vertex.getCurrentExecutionAttempt();
+            execution.registerProducedPartitions(slot.getTaskManagerLocation()).get();
+            execution.transitionState(ExecutionState.SCHEDULED);
+
+            vertex.tryAssignResource(slot);
+            vertex.deploy();
+        }
+    }
+
+    public static void transitionTasksToFinished(

Review Comment:
   It's better to move these 2 methods to be around `ExecutionGraphTestUtils#finishAllVertices()` and rename them and implement them (if possible) in a similar way.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -528,4 +546,150 @@ public static void verifyGeneratedExecutionJobVertex(
             subtaskIndex++;
         }
     }
+
+    public static SchedulerBase createSchedulerAndDeploy(
+            boolean isAdaptive,
+            JobID jobId,
+            JobVertex producer,
+            JobVertex[] consumers,
+            DistributionPattern distributionPattern,
+            BlobWriter blobWriter,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            ScheduledExecutorService ioExecutor,
+            JobMasterPartitionTracker partitionTracker,
+            ScheduledExecutorService scheduledExecutor)
+            throws Exception {
+        final List<JobVertex> vertices = new ArrayList<>(Collections.singletonList(producer));
+        IntermediateDataSetID dataSetId = new IntermediateDataSetID();
+        for (JobVertex consumer : consumers) {
+            consumer.connectNewDataSetAsInput(
+                    producer, distributionPattern, ResultPartitionType.BLOCKING, dataSetId, false);
+            vertices.add(consumer);
+        }
+
+        final SchedulerBase scheduler =
+                createScheduler(
+                        isAdaptive,
+                        jobId,
+                        vertices,
+                        blobWriter,
+                        mainThreadExecutor,
+                        ioExecutor,
+                        partitionTracker,
+                        scheduledExecutor);
+        final ExecutionGraph executionGraph = scheduler.getExecutionGraph();
+        if (isAdaptive) {
+            initializeExecutionJobVertex(producer.getID(), executionGraph, mainThreadExecutor);

Review Comment:
   Can we move it into the `runAsync` below? Then we can remove the `initializeExecutionJobVertex(
               JobVertexID jobVertex,
               ExecutionGraph executionGraph,
               ComponentMainThreadExecutor mainThreadExecutor)`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java:
##########
@@ -294,42 +324,7 @@ private void testRemoveCacheForPointwiseEdgeAfterFailover(
         assertEquals(expectedAfter, blobWriter.numberOfBlobs());
     }
 
-    private DefaultScheduler createSchedulerAndDeploy(

Review Comment:
   It's better to keep this method for the convenience of tests in this test class. It can just invoke the `createSchedulerAndDeploy` in the util class though.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java:
##########
@@ -47,6 +49,15 @@ public class IntermediateResultPartition {
     /** Whether this partition has produced some data. */
     private boolean hasDataProduced = false;
 
+    /**
+     * Released {@link ConsumedPartitionGroup}s for this result partition. This result partition can
+     * be released if all {@link ConsumedPartitionGroup}s are released.
+     */
+    private final Set<ConsumedPartitionGroup> releasedPartitionGroups = new HashSet<>();
+
+    /** All consumer job vertex ids of the corresponding {@link IntermediateResult}. */
+    private final List<JobVertexID> consumerVertices;

Review Comment:
   Or maybe we do not need this method because it is used only once, in `canBeReleased()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wsry commented on pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
wsry commented on PR #20350:
URL: https://github.com/apache/flink/pull/20350#issuecomment-1207396637

   > Thanks for addressing all the comments! @wsry The change looks good to me. Would you squash it? I will merge it after CI gives green.
   
   @zhuzhurk Thanks a lot. Rebased and squashed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wsry commented on a diff in pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
wsry commented on code in PR #20350:
URL: https://github.com/apache/flink/pull/20350#discussion_r938361767


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -528,4 +546,150 @@ public static void verifyGeneratedExecutionJobVertex(
             subtaskIndex++;
         }
     }
+
+    public static SchedulerBase createSchedulerAndDeploy(
+            boolean isAdaptive,
+            JobID jobId,
+            JobVertex producer,
+            JobVertex[] consumers,
+            DistributionPattern distributionPattern,
+            BlobWriter blobWriter,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            ScheduledExecutorService ioExecutor,
+            JobMasterPartitionTracker partitionTracker,
+            ScheduledExecutorService scheduledExecutor)
+            throws Exception {
+        final List<JobVertex> vertices = new ArrayList<>(Collections.singletonList(producer));
+        IntermediateDataSetID dataSetId = new IntermediateDataSetID();
+        for (JobVertex consumer : consumers) {
+            consumer.connectNewDataSetAsInput(
+                    producer, distributionPattern, ResultPartitionType.BLOCKING, dataSetId, false);
+            vertices.add(consumer);
+        }
+
+        final SchedulerBase scheduler =
+                createScheduler(
+                        isAdaptive,
+                        jobId,
+                        vertices,
+                        blobWriter,
+                        mainThreadExecutor,
+                        ioExecutor,
+                        partitionTracker,
+                        scheduledExecutor);
+        final ExecutionGraph executionGraph = scheduler.getExecutionGraph();
+        if (isAdaptive) {
+            initializeExecutionJobVertex(producer.getID(), executionGraph, mainThreadExecutor);
+        }
+        final TestingLogicalSlotBuilder slotBuilder = new TestingLogicalSlotBuilder();
+
+        CompletableFuture.runAsync(
+                        () -> {
+                            try {
+                                // Deploy upstream source vertices
+                                deployTasks(executionGraph, producer.getID(), slotBuilder);
+                                // Transition upstream vertices into FINISHED
+                                transitionTasksToFinished(executionGraph, producer.getID());
+                                // Deploy downstream sink vertices
+                                for (JobVertex consumer : consumers) {
+                                    if (isAdaptive) {
+                                        initializeExecutionJobVertex(
+                                                consumer.getID(), executionGraph);
+                                    }
+                                    deployTasks(executionGraph, consumer.getID(), slotBuilder);
+                                }
+                            } catch (Exception e) {
+                                throw new RuntimeException("Exceptions shouldn't happen here.", e);
+                            }
+                        },
+                        mainThreadExecutor)
+                .join();
+        return scheduler;
+    }
+
+    private static void initializeExecutionJobVertex(
+            JobVertexID jobVertex,
+            ExecutionGraph executionGraph,
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        CompletableFuture.runAsync(
+                        () -> initializeExecutionJobVertex(jobVertex, executionGraph),
+                        mainThreadExecutor)
+                .join();
+    }
+
+    private static void initializeExecutionJobVertex(
+            JobVertexID jobVertex, ExecutionGraph executionGraph) {
+        try {
+            executionGraph.initializeJobVertex(
+                    executionGraph.getJobVertex(jobVertex), System.currentTimeMillis());
+            executionGraph.notifyNewlyInitializedJobVertices(
+                    Collections.singletonList(executionGraph.getJobVertex(jobVertex)));
+        } catch (JobException exception) {
+            throw new RuntimeException(exception);
+        }
+    }
+
+    private static DefaultScheduler createScheduler(
+            boolean isAdaptive,
+            JobID jobId,
+            List<JobVertex> jobVertices,
+            BlobWriter blobWriter,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            ScheduledExecutorService ioExecutor,
+            JobMasterPartitionTracker partitionTracker,
+            ScheduledExecutorService scheduledExecutor)
+            throws Exception {
+        final JobGraph jobGraph =
+                JobGraphBuilder.newBatchJobGraphBuilder()
+                        .setJobId(jobId)
+                        .addJobVertices(jobVertices)
+                        .build();
+
+        final DefaultSchedulerBuilder builder =
+                new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, scheduledExecutor)
+                        .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0))
+                        .setBlobWriter(blobWriter)
+                        .setIoExecutor(ioExecutor)
+                        .setPartitionTracker(partitionTracker);
+        return isAdaptive ? builder.buildAdaptiveBatchJobScheduler() : builder.build();
+    }
+
+    private static void deployTasks(
+            ExecutionGraph executionGraph,
+            JobVertexID jobVertexID,
+            TestingLogicalSlotBuilder slotBuilder)
+            throws JobException, ExecutionException, InterruptedException {
+
+        for (ExecutionVertex vertex :
+                Objects.requireNonNull(executionGraph.getJobVertex(jobVertexID))
+                        .getTaskVertices()) {
+            LogicalSlot slot = slotBuilder.createTestingLogicalSlot();
+
+            Execution execution = vertex.getCurrentExecutionAttempt();
+            execution.registerProducedPartitions(slot.getTaskManagerLocation()).get();
+            execution.transitionState(ExecutionState.SCHEDULED);
+
+            vertex.tryAssignResource(slot);
+            vertex.deploy();
+        }
+    }
+
+    public static void transitionTasksToFinished(

Review Comment:
   1. Moved and renamed the methods.
   2. Use similar implementation will throw exception and fail the tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20350:
URL: https://github.com/apache/flink/pull/20350#discussion_r938553227


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java:
##########
@@ -191,18 +181,17 @@ public void testUpdateTopology() throws Exception {
 
         executionGraph.initializeJobVertex(ejv1, 0L);
         adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv1));
-        assertThat(IterableUtils.toStream(adapter.getVertices()).count(), is(3L));
+        Assertions.assertThat(IterableUtils.toStream(adapter.getVertices()).count()).isEqualTo(3L);

Review Comment:
   Seems this comment is not fully addressed. There is no need to convert an `Iterable` to `Stream` to do `hasSize` check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #20350:
URL: https://github.com/apache/flink/pull/20350#discussion_r936187326


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -73,7 +75,7 @@ public class JobVertex implements java.io.Serializable {
     private final List<OperatorIDPair> operatorIDs;
 
     /** List of produced data sets, one per writer. */

Review Comment:
   The annotation should also be modified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20350:
URL: https://github.com/apache/flink/pull/20350#discussion_r935083583


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java:
##########
@@ -106,11 +95,11 @@ public void testExecutionAttemptIdInTwoIdenticalJobsIsNotSame() throws Exception
         eg1.attachJobGraph(ordered);
         eg2.attachJobGraph(ordered);
 
-        assertThat(
-                Sets.intersection(
-                        eg1.getRegisteredExecutions().keySet(),
-                        eg2.getRegisteredExecutions().keySet()),
-                is(empty()));
+        Assertions.assertThat(

Review Comment:
   It's better to add `Assertions.assertThat` to static import
   This also applies to other test files.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java:
##########
@@ -57,31 +58,24 @@
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
-import static junit.framework.TestCase.assertSame;
-import static junit.framework.TestCase.assertTrue;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionGraph;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
 import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
 import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
 import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
 
 /** Unit tests for {@link DefaultExecutionTopology}. */
-public class DefaultExecutionTopologyTest extends TestLogger {
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+class DefaultExecutionTopologyTest extends TestLogger {

Review Comment:
   JUnit5 test should not extend TestLogger



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java:
##########
@@ -35,55 +35,48 @@
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.TestLogger;
 
 import org.assertj.core.api.Assertions;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 /** Tests for {@link IntermediateResultPartition}. */
 public class IntermediateResultPartitionTest extends TestLogger {

Review Comment:
   Now the test is not needed to be public and not needed to extend TestLogger.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java:
##########
@@ -29,164 +29,132 @@
 import org.apache.flink.runtime.scheduler.VertexParallelismStore;
 import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Collections;
 import java.util.concurrent.ScheduledExecutorService;
 
-import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
 /** Test for {@link ExecutionJobVertex} */
-public class ExecutionJobVertexTest {
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+class ExecutionJobVertexTest {
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     @Test
-    public void testParallelismGreaterThanMaxParallelism() {
+    void testParallelismGreaterThanMaxParallelism() {
         JobVertex jobVertex = new JobVertex("testVertex");
         jobVertex.setInvokableClass(AbstractInvokable.class);
         // parallelism must be smaller than the max parallelism
         jobVertex.setParallelism(172);
         jobVertex.setMaxParallelism(4);
 
-        assertThrows(
-                "higher than the max parallelism",
-                JobException.class,
-                () -> ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex));
+        Assertions.assertThatThrownBy(
+                        () -> ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex))
+                .isInstanceOf(JobException.class)
+                .hasMessageContaining("higher than the max parallelism");
     }
 
     @Test
-    public void testLazyInitialization() throws Exception {
+    void testLazyInitialization() throws Exception {
         final int parallelism = 3;
         final int configuredMaxParallelism = 12;
         final ExecutionJobVertex ejv =
                 createDynamicExecutionJobVertex(parallelism, configuredMaxParallelism, -1);
 
-        assertThat(ejv.getParallelism(), is(parallelism));
-        assertThat(ejv.getMaxParallelism(), is(configuredMaxParallelism));
-        assertThat(ejv.isInitialized(), is(false));
+        Assertions.assertThat(ejv.getParallelism()).isEqualTo(parallelism);
+        Assertions.assertThat(ejv.getMaxParallelism()).isEqualTo(configuredMaxParallelism);
+        Assertions.assertThat(ejv.isInitialized()).isFalse();
 
-        assertThat(ejv.getTaskVertices().length, is(0));
+        Assertions.assertThat(ejv.getTaskVertices()).isEmpty();
 
-        try {
-            ejv.getInputs();
-            Assert.fail("failure is expected");
-        } catch (IllegalStateException e) {
-            // ignore
-        }
+        Assertions.assertThatThrownBy(ejv::getInputs).isInstanceOf(IllegalStateException.class);
 
-        try {
-            ejv.getProducedDataSets();
-            Assert.fail("failure is expected");
-        } catch (IllegalStateException e) {
-            // ignore
-        }
+        Assertions.assertThatThrownBy(ejv::getProducedDataSets)
+                .isInstanceOf(IllegalStateException.class);
 
-        try {
-            ejv.getSplitAssigner();
-            Assert.fail("failure is expected");
-        } catch (IllegalStateException e) {
-            // ignore
-        }
+        Assertions.assertThatThrownBy(ejv::getSplitAssigner)
+                .isInstanceOf(IllegalStateException.class);
 
-        try {
-            ejv.getOperatorCoordinators();
-            Assert.fail("failure is expected");
-        } catch (IllegalStateException e) {
-            // ignore
-        }
+        Assertions.assertThatThrownBy(ejv::getOperatorCoordinators)
+                .isInstanceOf(IllegalStateException.class);
 
-        try {
-            ejv.connectToPredecessors(Collections.emptyMap());
-            Assert.fail("failure is expected");
-        } catch (IllegalStateException e) {
-            // ignore
-        }
+        Assertions.assertThatThrownBy(() -> ejv.connectToPredecessors(Collections.emptyMap()))
+                .isInstanceOf(IllegalStateException.class);
 
-        try {
-            ejv.executionVertexFinished();
-            Assert.fail("failure is expected");
-        } catch (IllegalStateException e) {
-            // ignore
-        }
+        Assertions.assertThatThrownBy(ejv::executionVertexFinished)
+                .isInstanceOf(IllegalStateException.class);
 
-        try {
-            ejv.executionVertexUnFinished();
-            Assert.fail("failure is expected");
-        } catch (IllegalStateException e) {
-            // ignore
-        }
+        Assertions.assertThatThrownBy(ejv::executionVertexUnFinished)
+                .isInstanceOf(IllegalStateException.class);
 
         initializeVertex(ejv);
 
-        assertThat(ejv.isInitialized(), is(true));
-        assertThat(ejv.getTaskVertices().length, is(3));
-        assertThat(ejv.getInputs().size(), is(0));
-        assertThat(ejv.getProducedDataSets().length, is(1));
-        assertThat(ejv.getOperatorCoordinators().size(), is(0));
+        Assertions.assertThat(ejv.isInitialized()).isTrue();
+        Assertions.assertThat(ejv.getTaskVertices().length).isEqualTo(3);
+        Assertions.assertThat(ejv.getInputs().size()).isEqualTo(0);

Review Comment:
   nit: can use `isEmpty()`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java:
##########
@@ -365,23 +327,21 @@ public void testRegisterConsumedPartitionGroupToEdgeManager() throws Exception {
         IntermediateResultPartition partition1 = result.getPartitions()[0];
         IntermediateResultPartition partition2 = result.getPartitions()[1];
 
-        assertEquals(
-                partition1.getConsumedPartitionGroups().get(0),
-                partition2.getConsumedPartitionGroups().get(0));
+        Assertions.assertThat(partition2.getConsumedPartitionGroups().get(0))
+                .isEqualTo(partition1.getConsumedPartitionGroups().get(0));
 
         ConsumedPartitionGroup consumedPartitionGroup =
                 partition1.getConsumedPartitionGroups().get(0);
         Set<IntermediateResultPartitionID> partitionIds = new HashSet<>();
         for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
             partitionIds.add(partitionId);
         }
-        assertThat(
-                partitionIds,
-                containsInAnyOrder(partition1.getPartitionId(), partition2.getPartitionId()));
+        Assertions.assertThat(partitionIds)
+                .contains(partition1.getPartitionId(), partition2.getPartitionId());

Review Comment:
   contains -> containsExactlyInAnyOrder



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java:
##########
@@ -191,18 +181,17 @@ public void testUpdateTopology() throws Exception {
 
         executionGraph.initializeJobVertex(ejv1, 0L);
         adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv1));
-        assertThat(IterableUtils.toStream(adapter.getVertices()).count(), is(3L));
+        Assertions.assertThat(IterableUtils.toStream(adapter.getVertices()).count()).isEqualTo(3L);

Review Comment:
   can be
   ```
   assertThat(adapter.getVertices()).hasSize(3);
   ```
   
   This also applies to the verifications below.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java:
##########
@@ -52,50 +53,48 @@
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSchedulerAndDeploy;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.transitionTaskToFinished;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.transitionTasksToFinished;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 
 /**
  * Tests for removing cached {@link ShuffleDescriptor}s when the related partitions are no longer
  * valid. Currently, there are two scenarios as illustrated in {@link
  * IntermediateResult#clearCachedInformationForPartitionGroup}.
  */
-public class RemoveCachedShuffleDescriptorTest extends TestLogger {
+class RemoveCachedShuffleDescriptorTest extends TestLogger {

Review Comment:
   no need to extend TestLogger (and it will not work because it uses junit4 rules)



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java:
##########
@@ -75,25 +71,26 @@ public void setUp() {
     }
 
     @Test
-    public void testGetPartitionState() {
+    void testGetPartitionState() {
         for (ResultPartitionState state : ResultPartitionState.values()) {
             resultPartitionState.setResultPartitionState(state);
-            assertEquals(state, resultPartition.getState());
+            Assertions.assertThat(resultPartition.getState()).isEqualTo(state);
         }
     }
 
     @Test
-    public void testGetConsumerVertexGroup() {
+    void testGetConsumerVertexGroup() {
 
-        assertTrue(resultPartition.getConsumerVertexGroups().isEmpty());
+        Assertions.assertThat(resultPartition.getConsumerVertexGroups()).isEmpty();
 
         // test update consumers
         ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
         consumerVertexGroups.put(
                 resultPartition.getId(),
                 Collections.singletonList(ConsumerVertexGroup.fromSingleVertex(executionVertexId)));
-        assertFalse(resultPartition.getConsumerVertexGroups().isEmpty());
-        assertThat(resultPartition.getConsumerVertexGroups().get(0), contains(executionVertexId));
+        Assertions.assertThat(resultPartition.getConsumerVertexGroups().isEmpty()).isFalse();

Review Comment:
   nit: can use `isNotEmpty()`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java:
##########
@@ -29,19 +29,19 @@
 import org.apache.flink.util.IterableUtils;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
 
 import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
-import static org.junit.Assert.assertEquals;
 
 /** Unit tests for {@link DefaultExecutionVertex}. */
-public class DefaultExecutionVertexTest extends TestLogger {
+class DefaultExecutionVertexTest extends TestLogger {

Review Comment:
   should no longer extend TestLogger



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtilTest.java:
##########
@@ -83,14 +81,14 @@ public void testIsolatedVertices() throws Exception {
      * </pre>
      */
     @Test
-    public void testVariousResultPartitionTypesBetweenVertices() throws Exception {
+    void testVariousResultPartitionTypesBetweenVertices() throws Exception {
         testThreeVerticesConnectSequentially(false, true, 1, 2);
         testThreeVerticesConnectSequentially(false, false, 0);
         testThreeVerticesConnectSequentially(true, true, 1, 3);
     }
 
     private void testThreeVerticesConnectSequentially(
-            boolean isForward1, boolean isForward2, int numOfGroups, int... groupSizes)
+            boolean isForward1, boolean isForward2, int numOfGroups, Integer... groupSizes)

Review Comment:
   Why is this change needed?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java:
##########
@@ -37,14 +38,9 @@
 import java.util.function.Supplier;
 
 import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 /** Unit tests for {@link DefaultResultPartition}. */
-public class DefaultResultPartitionTest extends TestLogger {
+class DefaultResultPartitionTest extends TestLogger {

Review Comment:
   This also applies to the other test classes



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java:
##########
@@ -122,50 +116,46 @@ public void testResultPartitionStateSupplier() {
         final DefaultResultPartition schedulingResultPartition =
                 adapter.getResultPartition(intermediateResultPartition.getPartitionId());
 
-        assertEquals(ResultPartitionState.CREATED, schedulingResultPartition.getState());
+        Assertions.assertThat(schedulingResultPartition.getState())
+                .isEqualTo(ResultPartitionState.CREATED);
 
         intermediateResultPartition.markDataProduced();
-        assertEquals(ResultPartitionState.CONSUMABLE, schedulingResultPartition.getState());
+        Assertions.assertThat(schedulingResultPartition.getState())
+                .isEqualTo(ResultPartitionState.CONSUMABLE);
     }
 
     @Test
-    public void testGetVertexOrThrow() {
-        try {
-            adapter.getVertex(new ExecutionVertexID(new JobVertexID(), 0));
-            fail("get not exist vertex");
-        } catch (IllegalArgumentException exception) {
-            // expected
-        }
+    void testGetVertexOrThrow() {
+        Assertions.assertThatThrownBy(
+                        () -> adapter.getVertex(new ExecutionVertexID(new JobVertexID(), 0)))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testResultPartitionOrThrow() {
-        try {
-            adapter.getResultPartition(new IntermediateResultPartitionID());
-            fail("get not exist result partition");
-        } catch (IllegalArgumentException exception) {
-            // expected
-        }
+    void testResultPartitionOrThrow() {
+        Assertions.assertThatThrownBy(
+                        () -> adapter.getResultPartition(new IntermediateResultPartitionID()))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testGetAllPipelinedRegions() {
+    void testGetAllPipelinedRegions() {
         final Iterable<DefaultSchedulingPipelinedRegion> allPipelinedRegions =
                 adapter.getAllPipelinedRegions();
-        assertEquals(1, Iterables.size(allPipelinedRegions));
+        Assertions.assertThat(Iterables.size(allPipelinedRegions)).isEqualTo(1);

Review Comment:
   can be 
   ```
   assertThat(allPipelinedRegions).hasSize(1);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java:
##########
@@ -37,14 +38,9 @@
 import java.util.function.Supplier;
 
 import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 /** Unit tests for {@link DefaultResultPartition}. */
-public class DefaultResultPartitionTest extends TestLogger {
+class DefaultResultPartitionTest extends TestLogger {

Review Comment:
   should no longer extend TestLogger



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java:
##########
@@ -86,101 +80,76 @@ public void testMultipleConsumersVertices() {
     }
 
     @Test
-    public void testConnectDirectly() {
+    void testConnectDirectly() {
         JobVertex source = new JobVertex("source");
         JobVertex target = new JobVertex("target");
         target.connectNewDataSetAsInput(
                 source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 
-        assertTrue(source.isInputVertex());
-        assertFalse(source.isOutputVertex());
-        assertFalse(target.isInputVertex());
-        assertTrue(target.isOutputVertex());
+        Assertions.assertThat(source.isInputVertex()).isTrue();
+        Assertions.assertThat(source.isOutputVertex()).isFalse();
+        Assertions.assertThat(target.isInputVertex()).isFalse();
+        Assertions.assertThat(target.isOutputVertex()).isTrue();
 
-        assertEquals(1, source.getNumberOfProducedIntermediateDataSets());
-        assertEquals(1, target.getNumberOfInputs());
+        Assertions.assertThat(source.getNumberOfProducedIntermediateDataSets()).isEqualTo(1);
+        Assertions.assertThat(target.getNumberOfInputs()).isEqualTo(1);
 
-        assertEquals(target.getInputs().get(0).getSource(), source.getProducedDataSets().get(0));
+        Assertions.assertThat(source.getProducedDataSets().get(0))
+                .isEqualTo(target.getInputs().get(0).getSource());
 
-        assertEquals(target, source.getProducedDataSets().get(0).getConsumers().get(0).getTarget());
+        Assertions.assertThat(source.getProducedDataSets().get(0).getConsumers().get(0).getTarget())
+                .isEqualTo(target);
     }
 
     @Test
-    public void testOutputFormat() {
-        try {
-            final InputOutputFormatVertex vertex = new InputOutputFormatVertex("Name");
-
-            OperatorID operatorID = new OperatorID();
-            Configuration parameters = new Configuration();
-            parameters.setString("test_key", "test_value");
-            new InputOutputFormatContainer(Thread.currentThread().getContextClassLoader())
-                    .addOutputFormat(operatorID, new TestingOutputFormat(parameters))
-                    .addParameters(operatorID, parameters)
-                    .write(new TaskConfig(vertex.getConfiguration()));
-
-            final ClassLoader cl = new TestClassLoader();
-
-            try {
-                vertex.initializeOnMaster(cl);
-                fail("Did not throw expected exception.");
-            } catch (TestException e) {
-                // all good
-            }
+    void testOutputFormat() throws Exception {
+        final InputOutputFormatVertex vertex = new InputOutputFormatVertex("Name");
 
-            InputOutputFormatVertex copy = InstantiationUtil.clone(vertex);
-            ClassLoader ctxCl = Thread.currentThread().getContextClassLoader();
-            try {
-                copy.initializeOnMaster(cl);
-                fail("Did not throw expected exception.");
-            } catch (TestException e) {
-                // all good
-            }
-            assertEquals(
-                    "Previous classloader was not restored.",
-                    ctxCl,
-                    Thread.currentThread().getContextClassLoader());
-
-            try {
-                copy.finalizeOnMaster(cl);
-                fail("Did not throw expected exception.");
-            } catch (TestException e) {
-                // all good
-            }
-            assertEquals(
-                    "Previous classloader was not restored.",
-                    ctxCl,
-                    Thread.currentThread().getContextClassLoader());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        OperatorID operatorID = new OperatorID();
+        Configuration parameters = new Configuration();
+        parameters.setString("test_key", "test_value");
+        new InputOutputFormatContainer(Thread.currentThread().getContextClassLoader())
+                .addOutputFormat(operatorID, new TestingOutputFormat(parameters))
+                .addParameters(operatorID, parameters)
+                .write(new TaskConfig(vertex.getConfiguration()));
+
+        final ClassLoader cl = new TestClassLoader();
+
+        Assertions.assertThatThrownBy(() -> vertex.initializeOnMaster(cl))
+                .isInstanceOf(TestException.class);
+
+        InputOutputFormatVertex copy = InstantiationUtil.clone(vertex);
+        ClassLoader ctxCl = Thread.currentThread().getContextClassLoader();
+        Assertions.assertThatThrownBy(() -> copy.initializeOnMaster(cl))
+                .isInstanceOf(TestException.class);
+
+        Assertions.assertThat(Thread.currentThread().getContextClassLoader()).isEqualTo(ctxCl);
+
+        Assertions.assertThatThrownBy(() -> copy.finalizeOnMaster(cl))
+                .isInstanceOf(TestException.class);
+        Assertions.assertThat(Thread.currentThread().getContextClassLoader()).isEqualTo(ctxCl);

Review Comment:
   Assertions.assertThat(Thread.currentThread().getContextClassLoader()).as("Previous classloader was not restored.").isEqualTo(ctxCl);



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java:
##########
@@ -335,17 +327,20 @@ private static void assertPartitionsEquals(
                 }
             }
             List<ConsumerVertexGroup> adaptedConsumers = adaptedPartition.getConsumerVertexGroups();
-            assertFalse(adaptedConsumers.isEmpty());
+            Assertions.assertThat(adaptedConsumers).isNotEmpty();
             for (ExecutionVertexID originalId : originalConsumerIds) {
                 // it is sufficient to verify that some vertex exists with the correct ID here,
                 // since deep equality is verified later in the main loop
                 // this DOES rely on an implicit assumption that the vertices objects returned by
                 // the topology are
                 // identical to those stored in the partition
-                assertTrue(
-                        adaptedConsumers.stream()
-                                .flatMap(IterableUtils::toStream)
-                                .anyMatch(adaptedConsumer -> adaptedConsumer.equals(originalId)));
+                Assertions.assertThat(

Review Comment:
   Looks to me we can simplify it to:
   ```
   Assertions.assertThat(adaptedConsumers).anyMatch(adaptedConsumer -> adaptedConsumer.equals(originalId));
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java:
##########
@@ -29,164 +29,132 @@
 import org.apache.flink.runtime.scheduler.VertexParallelismStore;
 import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Collections;
 import java.util.concurrent.ScheduledExecutorService;
 
-import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
 /** Test for {@link ExecutionJobVertex} */
-public class ExecutionJobVertexTest {
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+class ExecutionJobVertexTest {
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     @Test
-    public void testParallelismGreaterThanMaxParallelism() {
+    void testParallelismGreaterThanMaxParallelism() {
         JobVertex jobVertex = new JobVertex("testVertex");
         jobVertex.setInvokableClass(AbstractInvokable.class);
         // parallelism must be smaller than the max parallelism
         jobVertex.setParallelism(172);
         jobVertex.setMaxParallelism(4);
 
-        assertThrows(
-                "higher than the max parallelism",
-                JobException.class,
-                () -> ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex));
+        Assertions.assertThatThrownBy(
+                        () -> ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex))
+                .isInstanceOf(JobException.class)
+                .hasMessageContaining("higher than the max parallelism");
     }
 
     @Test
-    public void testLazyInitialization() throws Exception {
+    void testLazyInitialization() throws Exception {
         final int parallelism = 3;
         final int configuredMaxParallelism = 12;
         final ExecutionJobVertex ejv =
                 createDynamicExecutionJobVertex(parallelism, configuredMaxParallelism, -1);
 
-        assertThat(ejv.getParallelism(), is(parallelism));
-        assertThat(ejv.getMaxParallelism(), is(configuredMaxParallelism));
-        assertThat(ejv.isInitialized(), is(false));
+        Assertions.assertThat(ejv.getParallelism()).isEqualTo(parallelism);
+        Assertions.assertThat(ejv.getMaxParallelism()).isEqualTo(configuredMaxParallelism);
+        Assertions.assertThat(ejv.isInitialized()).isFalse();
 
-        assertThat(ejv.getTaskVertices().length, is(0));
+        Assertions.assertThat(ejv.getTaskVertices()).isEmpty();
 
-        try {
-            ejv.getInputs();
-            Assert.fail("failure is expected");
-        } catch (IllegalStateException e) {
-            // ignore
-        }
+        Assertions.assertThatThrownBy(ejv::getInputs).isInstanceOf(IllegalStateException.class);
 
-        try {
-            ejv.getProducedDataSets();
-            Assert.fail("failure is expected");
-        } catch (IllegalStateException e) {
-            // ignore
-        }
+        Assertions.assertThatThrownBy(ejv::getProducedDataSets)
+                .isInstanceOf(IllegalStateException.class);
 
-        try {
-            ejv.getSplitAssigner();
-            Assert.fail("failure is expected");
-        } catch (IllegalStateException e) {
-            // ignore
-        }
+        Assertions.assertThatThrownBy(ejv::getSplitAssigner)
+                .isInstanceOf(IllegalStateException.class);
 
-        try {
-            ejv.getOperatorCoordinators();
-            Assert.fail("failure is expected");
-        } catch (IllegalStateException e) {
-            // ignore
-        }
+        Assertions.assertThatThrownBy(ejv::getOperatorCoordinators)
+                .isInstanceOf(IllegalStateException.class);
 
-        try {
-            ejv.connectToPredecessors(Collections.emptyMap());
-            Assert.fail("failure is expected");
-        } catch (IllegalStateException e) {
-            // ignore
-        }
+        Assertions.assertThatThrownBy(() -> ejv.connectToPredecessors(Collections.emptyMap()))
+                .isInstanceOf(IllegalStateException.class);
 
-        try {
-            ejv.executionVertexFinished();
-            Assert.fail("failure is expected");
-        } catch (IllegalStateException e) {
-            // ignore
-        }
+        Assertions.assertThatThrownBy(ejv::executionVertexFinished)
+                .isInstanceOf(IllegalStateException.class);
 
-        try {
-            ejv.executionVertexUnFinished();
-            Assert.fail("failure is expected");
-        } catch (IllegalStateException e) {
-            // ignore
-        }
+        Assertions.assertThatThrownBy(ejv::executionVertexUnFinished)
+                .isInstanceOf(IllegalStateException.class);
 
         initializeVertex(ejv);
 
-        assertThat(ejv.isInitialized(), is(true));
-        assertThat(ejv.getTaskVertices().length, is(3));
-        assertThat(ejv.getInputs().size(), is(0));
-        assertThat(ejv.getProducedDataSets().length, is(1));
-        assertThat(ejv.getOperatorCoordinators().size(), is(0));
+        Assertions.assertThat(ejv.isInitialized()).isTrue();
+        Assertions.assertThat(ejv.getTaskVertices().length).isEqualTo(3);

Review Comment:
   nit: can use `hasSize`
   This also applies to below lines.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wsry commented on a diff in pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
wsry commented on code in PR #20350:
URL: https://github.com/apache/flink/pull/20350#discussion_r937983848


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java:
##########
@@ -35,55 +35,48 @@
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.TestLogger;
 
 import org.assertj.core.api.Assertions;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 /** Tests for {@link IntermediateResultPartition}. */
 public class IntermediateResultPartitionTest extends TestLogger {

Review Comment:
   1. This test is used by another test, the ```public``` can not be removed.
   2. Replace TestLogger with ```@ExtendWith({TestLoggerExtension.class})```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wsry commented on a diff in pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
wsry commented on code in PR #20350:
URL: https://github.com/apache/flink/pull/20350#discussion_r939480667


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java:
##########
@@ -35,55 +35,48 @@
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.TestLogger;
 
 import org.assertj.core.api.Assertions;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 /** Tests for {@link IntermediateResultPartition}. */
 public class IntermediateResultPartitionTest extends TestLogger {

Review Comment:
   Removed ```@ExtendWith({TestLoggerExtension.class})```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20350:
URL: https://github.com/apache/flink/pull/20350#issuecomment-1193733348

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "120097ceda6d3730a60bffddf718de4c1643b09e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "120097ceda6d3730a60bffddf718de4c1643b09e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 120097ceda6d3730a60bffddf718de4c1643b09e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wsry commented on a diff in pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
wsry commented on code in PR #20350:
URL: https://github.com/apache/flink/pull/20350#discussion_r938069755


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java:
##########
@@ -251,11 +279,24 @@ public static ExecutionGraph createExecutionGraph(
             v2.setMaxParallelism(consumerMaxParallelism);
         }
 
-        v2.connectNewDataSetAsInput(v1, distributionPattern, ResultPartitionType.BLOCKING);
+        final JobVertex v3 = new JobVertex("v3");

Review Comment:
   Yes, exactly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wsry commented on pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
wsry commented on PR #20350:
URL: https://github.com/apache/flink/pull/20350#issuecomment-1207144254

   @zhuzhurk I have updated the PR and add two fixup commits. Please take another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wsry commented on a diff in pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
wsry commented on code in PR #20350:
URL: https://github.com/apache/flink/pull/20350#discussion_r937996143


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtilTest.java:
##########
@@ -83,14 +81,14 @@ public void testIsolatedVertices() throws Exception {
      * </pre>
      */
     @Test
-    public void testVariousResultPartitionTypesBetweenVertices() throws Exception {
+    void testVariousResultPartitionTypesBetweenVertices() throws Exception {
         testThreeVerticesConnectSequentially(false, true, 1, 2);
         testThreeVerticesConnectSequentially(false, false, 0);
         testThreeVerticesConnectSequentially(true, true, 1, 3);
     }
 
     private void testThreeVerticesConnectSequentially(
-            boolean isForward1, boolean isForward2, int numOfGroups, int... groupSizes)
+            boolean isForward1, boolean isForward2, int numOfGroups, Integer... groupSizes)

Review Comment:
   Because AssertJ's ```contains``` method does not accept int array.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wsry commented on pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
wsry commented on PR #20350:
URL: https://github.com/apache/flink/pull/20350#issuecomment-1206038365

   @zhuzhurk @TanYuxin-tyx Thanks for the review and feedbacks. I have updated the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20350:
URL: https://github.com/apache/flink/pull/20350#discussion_r938549010


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java:
##########
@@ -35,55 +35,48 @@
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.TestLogger;
 
 import org.assertj.core.api.Assertions;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 /** Tests for {@link IntermediateResultPartition}. */
 public class IntermediateResultPartitionTest extends TestLogger {

Review Comment:
   `@ExtendWith({TestLoggerExtension.class})` is not needed because it is by default activated for all tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20350:
URL: https://github.com/apache/flink/pull/20350#discussion_r938553227


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java:
##########
@@ -191,18 +181,17 @@ public void testUpdateTopology() throws Exception {
 
         executionGraph.initializeJobVertex(ejv1, 0L);
         adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv1));
-        assertThat(IterableUtils.toStream(adapter.getVertices()).count(), is(3L));
+        Assertions.assertThat(IterableUtils.toStream(adapter.getVertices()).count()).isEqualTo(3L);

Review Comment:
   Seems this comment is not fully addressed. There is not need to convert an `Iterable` to `Stream` to do `hasSize` check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wsry commented on a diff in pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
wsry commented on code in PR #20350:
URL: https://github.com/apache/flink/pull/20350#discussion_r938058776


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java:
##########
@@ -47,6 +49,15 @@ public class IntermediateResultPartition {
     /** Whether this partition has produced some data. */
     private boolean hasDataProduced = false;
 
+    /**
+     * Released {@link ConsumedPartitionGroup}s for this result partition. This result partition can
+     * be released if all {@link ConsumedPartitionGroup}s are released.
+     */
+    private final Set<ConsumedPartitionGroup> releasedPartitionGroups = new HashSet<>();

Review Comment:
   1. I use a hash set because it can tolerate duplicate releasing, though maybe there is no such case currently.
   2. Renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wsry commented on pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
wsry commented on PR #20350:
URL: https://github.com/apache/flink/pull/20350#issuecomment-1207183205

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20350:
URL: https://github.com/apache/flink/pull/20350#discussion_r938582717


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -249,6 +251,24 @@ public static void completeCancellingForAllVertices(ExecutionGraph eg) {
         }
     }
 
+    public static void finishJobVertex(ExecutionGraph executionGraph, JobVertexID jobVertexID) {
+

Review Comment:
   jobVertexID -> jobVertexId
   
   I prefer to remove this empty line.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##########
@@ -718,31 +716,37 @@ public CompletableFuture<?> suspend() {
     }
 
     private void updatePartitionConsumers(final IntermediateResultPartition partition) {
-        final Optional<ConsumerVertexGroup> consumerVertexGroup =
-                partition.getConsumerVertexGroupOptional();
-        if (!consumerVertexGroup.isPresent()) {
+        final List<ConsumerVertexGroup> consumerVertexGroups = partition.getConsumerVertexGroups();
+        if (consumerVertexGroups.isEmpty()) {
             return;
         }
-        for (ExecutionVertexID consumerVertexId : consumerVertexGroup.get()) {
-            final ExecutionVertex consumerVertex =
-                    vertex.getExecutionGraphAccessor().getExecutionVertexOrThrow(consumerVertexId);
-            final Execution consumer = consumerVertex.getCurrentExecutionAttempt();
-            final ExecutionState consumerState = consumer.getState();
-
-            // ----------------------------------------------------------------
-            // Consumer is recovering or running => send update message now
-            // Consumer is deploying => cache the partition info which would be
-            // sent after switching to running
-            // ----------------------------------------------------------------
-            if (consumerState == DEPLOYING
-                    || consumerState == RUNNING
-                    || consumerState == INITIALIZING) {
-                final PartitionInfo partitionInfo = createPartitionInfo(partition);
-
-                if (consumerState == DEPLOYING) {
-                    consumerVertex.cachePartitionInfo(partitionInfo);
-                } else {
-                    consumer.sendUpdatePartitionInfoRpcCall(Collections.singleton(partitionInfo));
+        final Set<ExecutionVertexID> updatedVertices = new HashSet<>();
+        for (ConsumerVertexGroup consumerVertexGroup : consumerVertexGroups) {
+            for (ExecutionVertexID consumerVertexId : consumerVertexGroup) {
+                final ExecutionVertex consumerVertex =
+                        vertex.getExecutionGraphAccessor()
+                                .getExecutionVertexOrThrow(consumerVertexId);
+                final Execution consumer = consumerVertex.getCurrentExecutionAttempt();
+                final ExecutionState consumerState = consumer.getState();
+
+                // ----------------------------------------------------------------
+                // Consumer is recovering or running => send update message now
+                // Consumer is deploying => cache the partition info which would be
+                // sent after switching to running
+                // ----------------------------------------------------------------
+                if ((consumerState == DEPLOYING
+                                || consumerState == RUNNING
+                                || consumerState == INITIALIZING)
+                        && !updatedVertices.contains(consumerVertexId)) {

Review Comment:
   Maybe do this check at the beginning of the inner loop? To avoid executing some unnecessary logics.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java:
##########
@@ -58,6 +66,25 @@ public IntermediateResultPartition(
         this.edgeManager = edgeManager;
     }
 
+    public void releaseConsumedPartitionGroup(ConsumedPartitionGroup partitionGroup) {

Review Comment:
   -> markPartitionGroupReleasable()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wsry merged pull request #20350: [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Posted by GitBox <gi...@apache.org>.
wsry merged PR #20350:
URL: https://github.com/apache/flink/pull/20350


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org