You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2024/01/22 11:03:51 UTC

(flink) branch master updated (27e6ac83617 -> c3c836216ea)

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 27e6ac83617 [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty
     new feb5f926706 [hotfix] Remove redundant lazy initialization of LazyInitializedCoordinatorContext.
     new c3c836216ea [FLINK-33768] Support dynamic source parallelism inference for batch jobs

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connector/source/DynamicFilteringInfo.java}    |  16 +-
 .../source/DynamicParallelismInference.java        |  66 +++++++++
 .../executiongraph/DefaultExecutionGraph.java      |  27 ++--
 .../runtime/executiongraph/ExecutionGraph.java     |  11 +-
 .../runtime/executiongraph/ExecutionJobVertex.java |  89 ++++++++----
 .../SpeculativeExecutionJobVertex.java             |  13 +-
 .../coordination/OperatorCoordinatorHolder.java    |  30 +++-
 .../RecreateOnResetOperatorCoordinator.java        |   1 -
 .../DefaultOperatorCoordinatorHandler.java         |   6 +-
 .../scheduler/OperatorCoordinatorHandler.java      |   3 +-
 .../adaptivebatch/AdaptiveBatchScheduler.java      | 161 ++++++++++++++++++---
 ...faultVertexParallelismAndInputInfosDecider.java |  25 +++-
 .../VertexParallelismAndInputInfosDecider.java     |  18 +++
 .../source/coordinator/SourceCoordinator.java      |  57 ++++++++
 .../coordinator/SourceCoordinatorContext.java      |  36 ++++-
 .../DefaultExecutionGraphConstructionTest.java     |   8 +-
 .../executiongraph/EdgeManagerBuildUtilTest.java   |  18 +--
 .../executiongraph/ExecutionJobVertexTest.java     |  11 +-
 .../DefaultOperatorCoordinatorHandlerTest.java     |  11 +-
 .../runtime/scheduler/DefaultSchedulerBuilder.java |  43 ++++--
 .../runtime/scheduler/SchedulerTestingUtils.java   |   5 +-
 .../SsgNetworkMemoryCalculationUtilsTest.java      |  20 +--
 .../adapter/DefaultExecutionTopologyTest.java      |  19 +--
 .../runtime/scheduler/adaptive/ExecutingTest.java  |   8 +-
 .../adaptive/StateTrackingMockExecutionGraph.java  |   3 +-
 .../TestingOperatorCoordinatorHandler.java         |   3 +-
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  |  34 ++++-
 ...tVertexParallelismAndInputInfosDeciderTest.java |  33 ++++-
 .../source/coordinator/SourceCoordinatorTest.java  |  20 +++
 .../source/coordinator/TestingSplitEnumerator.java |   8 +-
 .../connector/source/DynamicFilteringEvent.java    |   3 +-
 .../scheduling/AdaptiveBatchSchedulerITCase.java   |  78 ++++++++--
 32 files changed, 677 insertions(+), 207 deletions(-)
 copy flink-core/src/main/java/org/apache/flink/{core/io/Versioned.java => api/connector/source/DynamicFilteringInfo.java} (70%)
 create mode 100644 flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicParallelismInference.java


(flink) 02/02: [FLINK-33768] Support dynamic source parallelism inference for batch jobs

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c3c836216eaaaf24c1add3b490c8f425fda01d7c
Author: sunxia <xi...@gmail.com>
AuthorDate: Wed Jan 10 19:15:32 2024 +0800

    [FLINK-33768] Support dynamic source parallelism inference for batch jobs
    
    This closes #24087.
---
 .../api/connector/source/DynamicFilteringInfo.java |  29 ++++
 .../source/DynamicParallelismInference.java        |  66 +++++++++
 .../executiongraph/DefaultExecutionGraph.java      |  27 ++--
 .../runtime/executiongraph/ExecutionGraph.java     |  11 +-
 .../runtime/executiongraph/ExecutionJobVertex.java |  89 ++++++++----
 .../SpeculativeExecutionJobVertex.java             |  13 +-
 .../coordination/OperatorCoordinatorHolder.java    |  29 +++-
 .../RecreateOnResetOperatorCoordinator.java        |   1 -
 .../DefaultOperatorCoordinatorHandler.java         |   6 +-
 .../scheduler/OperatorCoordinatorHandler.java      |   3 +-
 .../adaptivebatch/AdaptiveBatchScheduler.java      | 161 ++++++++++++++++++---
 ...faultVertexParallelismAndInputInfosDecider.java |  25 +++-
 .../VertexParallelismAndInputInfosDecider.java     |  18 +++
 .../source/coordinator/SourceCoordinator.java      |  57 ++++++++
 .../coordinator/SourceCoordinatorContext.java      |  36 ++++-
 .../DefaultExecutionGraphConstructionTest.java     |   8 +-
 .../executiongraph/EdgeManagerBuildUtilTest.java   |  18 +--
 .../executiongraph/ExecutionJobVertexTest.java     |  11 +-
 .../DefaultOperatorCoordinatorHandlerTest.java     |  11 +-
 .../runtime/scheduler/DefaultSchedulerBuilder.java |  43 ++++--
 .../runtime/scheduler/SchedulerTestingUtils.java   |   5 +-
 .../SsgNetworkMemoryCalculationUtilsTest.java      |  20 +--
 .../adapter/DefaultExecutionTopologyTest.java      |  19 +--
 .../runtime/scheduler/adaptive/ExecutingTest.java  |   8 +-
 .../adaptive/StateTrackingMockExecutionGraph.java  |   3 +-
 .../TestingOperatorCoordinatorHandler.java         |   3 +-
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  |  34 ++++-
 ...tVertexParallelismAndInputInfosDeciderTest.java |  33 ++++-
 .../source/coordinator/SourceCoordinatorTest.java  |  20 +++
 .../source/coordinator/TestingSplitEnumerator.java |   8 +-
 .../connector/source/DynamicFilteringEvent.java    |   3 +-
 .../scheduling/AdaptiveBatchSchedulerITCase.java   |  78 ++++++++--
 32 files changed, 701 insertions(+), 195 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicFilteringInfo.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicFilteringInfo.java
new file mode 100644
index 00000000000..b5ee8ab5c5e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicFilteringInfo.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A decorative interface that indicates it holds the dynamic partition filtering data. The actual
+ * information needs to be obtained from the implementing class
+ * org.apache.flink.table.connector.source.DynamicFilteringEvent.
+ */
+@PublicEvolving
+public interface DynamicFilteringInfo {}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicParallelismInference.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicParallelismInference.java
new file mode 100644
index 00000000000..a9ec578c844
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicParallelismInference.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Optional;
+
+/**
+ * Sources that implement this interface will dynamically infer the source’s parallelism when it is
+ * unspecified. It will be invoked during the Flink runtime before the source vertex is scheduled.
+ *
+ * <p>The implementations typically work together with the {@link Source} and are currently only
+ * effective for batch jobs that use the adaptive batch scheduler.
+ */
+@PublicEvolving
+public interface DynamicParallelismInference {
+    /** A context that provides dynamic parallelism decision infos. */
+    @PublicEvolving
+    interface Context {
+        /**
+         * Get the dynamic filtering info of the source vertex.
+         *
+         * @return the dynamic filter instance.
+         */
+        Optional<DynamicFilteringInfo> getDynamicFilteringInfo();
+
+        /**
+         * Get the upper bound for the inferred parallelism.
+         *
+         * @return the upper bound for the inferred parallelism.
+         */
+        int getParallelismInferenceUpperBound();
+
+        /**
+         * Get the average size of data volume (in bytes) to expect each task instance to process.
+         *
+         * @return the data volume per task in bytes.
+         */
+        long getDataVolumePerTask();
+    }
+
+    /**
+     * The method is invoked on the master (JobManager) before the initialization of the source
+     * vertex.
+     *
+     * @param context The context to get dynamic parallelism decision infos.
+     */
+    int inferParallelism(Context context);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 12859bc91d8..845c10f2d42 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -830,9 +830,9 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
                 tasks.size(),
                 intermediateResults.size());
 
-        attachJobVertices(verticesToAttach);
+        attachJobVertices(verticesToAttach, jobManagerJobMetricGroup);
         if (!isDynamic) {
-            initializeJobVertices(verticesToAttach, jobManagerJobMetricGroup);
+            initializeJobVertices(verticesToAttach);
         }
 
         // the topology assigning should happen before notifying new vertices to failoverStrategy
@@ -843,7 +843,9 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
     }
 
     /** Attach job vertices without initializing them. */
-    private void attachJobVertices(List<JobVertex> topologicallySorted) throws JobException {
+    private void attachJobVertices(
+            List<JobVertex> topologicallySorted, JobManagerJobMetricGroup jobManagerJobMetricGroup)
+            throws JobException {
         for (JobVertex jobVertex : topologicallySorted) {
 
             if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
@@ -856,7 +858,11 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
             // create the execution job vertex and attach it to the graph
             ExecutionJobVertex ejv =
                     executionJobVertexFactory.createExecutionJobVertex(
-                            this, jobVertex, parallelismInfo);
+                            this,
+                            jobVertex,
+                            parallelismInfo,
+                            coordinatorStore,
+                            jobManagerJobMetricGroup);
 
             ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
             if (previousTask != null) {
@@ -871,14 +877,12 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
         }
     }
 
-    private void initializeJobVertices(
-            List<JobVertex> topologicallySorted, JobManagerJobMetricGroup jobManagerJobMetricGroup)
-            throws JobException {
+    private void initializeJobVertices(List<JobVertex> topologicallySorted) throws JobException {
         final long createTimestamp = System.currentTimeMillis();
 
         for (JobVertex jobVertex : topologicallySorted) {
             final ExecutionJobVertex ejv = tasks.get(jobVertex.getID());
-            initializeJobVertex(ejv, createTimestamp, jobManagerJobMetricGroup);
+            initializeJobVertex(ejv, createTimestamp);
         }
     }
 
@@ -886,8 +890,7 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
     public void initializeJobVertex(
             ExecutionJobVertex ejv,
             long createTimestamp,
-            Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos,
-            JobManagerJobMetricGroup jobManagerJobMetricGroup)
+            Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos)
             throws JobException {
 
         checkNotNull(ejv);
@@ -901,9 +904,7 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
                 executionHistorySizeLimit,
                 rpcTimeout,
                 createTimestamp,
-                this.initialAttemptCounts.getAttemptCounts(ejv.getJobVertexId()),
-                coordinatorStore,
-                jobManagerJobMetricGroup);
+                this.initialAttemptCounts.getAttemptCounts(ejv.getJobVertexId()));
 
         ejv.connectToPredecessors(this.intermediateResults);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index b3db9ecc0da..2194b6d59eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -219,17 +219,13 @@ public interface ExecutionGraph extends AccessExecutionGraph {
     @Nonnull
     ComponentMainThreadExecutor getJobMasterMainThreadExecutor();
 
-    default void initializeJobVertex(
-            ExecutionJobVertex ejv,
-            long createTimestamp,
-            JobManagerJobMetricGroup jobManagerJobMetricGroup)
+    default void initializeJobVertex(ExecutionJobVertex ejv, long createTimestamp)
             throws JobException {
         initializeJobVertex(
                 ejv,
                 createTimestamp,
                 VertexInputInfoComputationUtils.computeVertexInputInfos(
-                        ejv, getAllIntermediateResults()::get),
-                jobManagerJobMetricGroup);
+                        ejv, getAllIntermediateResults()::get));
     }
 
     /**
@@ -244,8 +240,7 @@ public interface ExecutionGraph extends AccessExecutionGraph {
     void initializeJobVertex(
             ExecutionJobVertex ejv,
             long createTimestamp,
-            Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos,
-            JobManagerJobMetricGroup jobManagerJobMetricGroup)
+            Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos)
             throws JobException;
 
     /**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 57092514bc4..18131e60337 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -46,7 +46,9 @@ import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
+import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
 import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
+import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.OptionalFailure;
@@ -118,7 +120,7 @@ public class ExecutionJobVertex
     private Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey =
             null;
 
-    @Nullable private Collection<OperatorCoordinatorHolder> operatorCoordinators;
+    private final Collection<OperatorCoordinatorHolder> operatorCoordinators;
 
     @Nullable private InputSplitAssigner splitAssigner;
 
@@ -126,7 +128,9 @@ public class ExecutionJobVertex
     public ExecutionJobVertex(
             InternalExecutionGraphAccessor graph,
             JobVertex jobVertex,
-            VertexParallelismInformation parallelismInfo)
+            VertexParallelismInformation parallelismInfo,
+            CoordinatorStore coordinatorStore,
+            JobManagerJobMetricGroup jobManagerJobMetricGroup)
             throws JobException {
 
         if (graph == null || jobVertex == null) {
@@ -154,15 +158,38 @@ public class ExecutionJobVertex
         // take the sharing group
         this.slotSharingGroup = checkNotNull(jobVertex.getSlotSharingGroup());
         this.coLocationGroup = jobVertex.getCoLocationGroup();
+
+        final List<SerializedValue<OperatorCoordinator.Provider>> coordinatorProviders =
+                getJobVertex().getOperatorCoordinators();
+        if (coordinatorProviders.isEmpty()) {
+            this.operatorCoordinators = Collections.emptyList();
+        } else {
+            final ArrayList<OperatorCoordinatorHolder> coordinators =
+                    new ArrayList<>(coordinatorProviders.size());
+            try {
+                for (final SerializedValue<OperatorCoordinator.Provider> provider :
+                        coordinatorProviders) {
+                    coordinators.add(
+                            createOperatorCoordinatorHolder(
+                                    provider,
+                                    graph.getUserClassLoader(),
+                                    coordinatorStore,
+                                    jobManagerJobMetricGroup));
+                }
+            } catch (Exception | LinkageError e) {
+                IOUtils.closeAllQuietly(coordinators);
+                throw new JobException(
+                        "Cannot instantiate the coordinator for operator " + getName(), e);
+            }
+            this.operatorCoordinators = Collections.unmodifiableList(coordinators);
+        }
     }
 
     protected void initialize(
             int executionHistorySizeLimit,
             Time timeout,
             long createTimestamp,
-            SubtaskAttemptNumberStore initialAttemptCounts,
-            CoordinatorStore coordinatorStore,
-            JobManagerJobMetricGroup jobManagerJobMetricGroup)
+            SubtaskAttemptNumberStore initialAttemptCounts)
             throws JobException {
 
         checkState(parallelismInfo.getParallelism() > 0);
@@ -211,31 +238,6 @@ public class ExecutionJobVertex
             }
         }
 
-        final List<SerializedValue<OperatorCoordinator.Provider>> coordinatorProviders =
-                getJobVertex().getOperatorCoordinators();
-        if (coordinatorProviders.isEmpty()) {
-            this.operatorCoordinators = Collections.emptyList();
-        } else {
-            final ArrayList<OperatorCoordinatorHolder> coordinators =
-                    new ArrayList<>(coordinatorProviders.size());
-            try {
-                for (final SerializedValue<OperatorCoordinator.Provider> provider :
-                        coordinatorProviders) {
-                    coordinators.add(
-                            createOperatorCoordinatorHolder(
-                                    provider,
-                                    graph.getUserClassLoader(),
-                                    coordinatorStore,
-                                    jobManagerJobMetricGroup));
-                }
-            } catch (Exception | LinkageError e) {
-                IOUtils.closeAllQuietly(coordinators);
-                throw new JobException(
-                        "Cannot instantiate the coordinator for operator " + getName(), e);
-            }
-            this.operatorCoordinators = Collections.unmodifiableList(coordinators);
-        }
-
         // set up the input splits, if the vertex has any
         try {
             @SuppressWarnings("unchecked")
@@ -408,6 +410,26 @@ public class ExecutionJobVertex
         return operatorCoordinators;
     }
 
+    public List<SourceCoordinator<?, ?>> getSourceCoordinators() {
+        List<SourceCoordinator<?, ?>> sourceCoordinators = new ArrayList<>();
+        for (OperatorCoordinatorHolder oph : operatorCoordinators) {
+            if (oph.coordinator() instanceof RecreateOnResetOperatorCoordinator) {
+                RecreateOnResetOperatorCoordinator opc =
+                        (RecreateOnResetOperatorCoordinator) oph.coordinator();
+                try {
+                    if (opc.getInternalCoordinator() instanceof SourceCoordinator) {
+                        sourceCoordinators.add(
+                                (SourceCoordinator<?, ?>) opc.getInternalCoordinator());
+                    }
+                } catch (Throwable e) {
+                    throw new RuntimeException(
+                            "Unexpected error occurred when get sourceCoordinators.", e);
+                }
+            }
+        }
+        return sourceCoordinators;
+    }
+
     int getNumExecutionVertexFinished() {
         return numExecutionVertexFinished;
     }
@@ -642,9 +664,12 @@ public class ExecutionJobVertex
         ExecutionJobVertex createExecutionJobVertex(
                 InternalExecutionGraphAccessor graph,
                 JobVertex jobVertex,
-                VertexParallelismInformation parallelismInfo)
+                VertexParallelismInformation parallelismInfo,
+                CoordinatorStore coordinatorStore,
+                JobManagerJobMetricGroup jobManagerJobMetricGroup)
                 throws JobException {
-            return new ExecutionJobVertex(graph, jobVertex, parallelismInfo);
+            return new ExecutionJobVertex(
+                    graph, jobVertex, parallelismInfo, coordinatorStore, jobManagerJobMetricGroup);
         }
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionJobVertex.java
index 4aaa1e545a2..bbce29f1ffc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionJobVertex.java
@@ -34,9 +34,11 @@ public class SpeculativeExecutionJobVertex extends ExecutionJobVertex {
     public SpeculativeExecutionJobVertex(
             InternalExecutionGraphAccessor graph,
             JobVertex jobVertex,
-            VertexParallelismInformation parallelismInfo)
+            VertexParallelismInformation parallelismInfo,
+            CoordinatorStore coordinatorStore,
+            JobManagerJobMetricGroup jobManagerJobMetricGroup)
             throws JobException {
-        super(graph, jobVertex, parallelismInfo);
+        super(graph, jobVertex, parallelismInfo, coordinatorStore, jobManagerJobMetricGroup);
     }
 
     @Override
@@ -81,9 +83,12 @@ public class SpeculativeExecutionJobVertex extends ExecutionJobVertex {
         ExecutionJobVertex createExecutionJobVertex(
                 InternalExecutionGraphAccessor graph,
                 JobVertex jobVertex,
-                VertexParallelismInformation parallelismInfo)
+                VertexParallelismInformation parallelismInfo,
+                CoordinatorStore coordinatorStore,
+                JobManagerJobMetricGroup jobManagerJobMetricGroup)
                 throws JobException {
-            return new SpeculativeExecutionJobVertex(graph, jobVertex, parallelismInfo);
+            return new SpeculativeExecutionJobVertex(
+                    graph, jobVertex, parallelismInfo, coordinatorStore, jobManagerJobMetricGroup);
         }
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index ec7e35bc066..f3253f147db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.operators.coordination;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.OperatorCoordinatorMetricGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
@@ -117,11 +118,11 @@ public class OperatorCoordinatorHolder
 
     private final IncompleteFuturesTracker unconfirmedEvents;
 
-    private final int operatorParallelism;
     private final int operatorMaxParallelism;
 
     private GlobalFailureHandler globalFailureHandler;
     private ComponentMainThreadExecutor mainThreadExecutor;
+    private int operatorParallelism;
 
     private OperatorCoordinatorHolder(
             final OperatorID operatorId,
@@ -146,11 +147,27 @@ public class OperatorCoordinatorHolder
             GlobalFailureHandler globalFailureHandler,
             ComponentMainThreadExecutor mainThreadExecutor,
             @Nullable CheckpointCoordinator checkpointCoordinator) {
+        lazyInitialize(
+                globalFailureHandler,
+                mainThreadExecutor,
+                checkpointCoordinator,
+                operatorParallelism);
+    }
 
+    public void lazyInitialize(
+            GlobalFailureHandler globalFailureHandler,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            @Nullable CheckpointCoordinator checkpointCoordinator,
+            final int operatorParallelism) {
         this.globalFailureHandler = globalFailureHandler;
         this.mainThreadExecutor = mainThreadExecutor;
-
-        context.lazyInitialize(globalFailureHandler, mainThreadExecutor, checkpointCoordinator);
+        checkState(operatorParallelism != ExecutionConfig.PARALLELISM_DEFAULT);
+        this.operatorParallelism = operatorParallelism;
+        context.lazyInitialize(
+                globalFailureHandler,
+                mainThreadExecutor,
+                checkpointCoordinator,
+                operatorParallelism);
         setupAllSubtaskGateways();
     }
 
@@ -560,7 +577,6 @@ public class OperatorCoordinatorHolder
         private final OperatorID operatorId;
         private final String operatorName;
         private final ClassLoader userCodeClassLoader;
-        private final int operatorParallelism;
         private final CoordinatorStore coordinatorStore;
         private final boolean supportsConcurrentExecutionAttempts;
         private final OperatorCoordinatorMetricGroup metricGroup;
@@ -568,6 +584,7 @@ public class OperatorCoordinatorHolder
         private GlobalFailureHandler globalFailureHandler;
         private Executor schedulerExecutor;
         @Nullable private CheckpointCoordinator checkpointCoordinator;
+        private int operatorParallelism;
 
         private volatile boolean failed;
 
@@ -591,10 +608,12 @@ public class OperatorCoordinatorHolder
         void lazyInitialize(
                 GlobalFailureHandler globalFailureHandler,
                 Executor schedulerExecutor,
-                @Nullable CheckpointCoordinator checkpointCoordinator) {
+                @Nullable CheckpointCoordinator checkpointCoordinator,
+                final int operatorParallelism) {
             this.globalFailureHandler = checkNotNull(globalFailureHandler);
             this.schedulerExecutor = checkNotNull(schedulerExecutor);
             this.checkpointCoordinator = checkpointCoordinator;
+            this.operatorParallelism = operatorParallelism;
         }
 
         void unInitialize() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index 2c82090b5b0..d2d6d5681e8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -162,7 +162,6 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator {
 
     // ---------------------
 
-    @VisibleForTesting
     public OperatorCoordinator getInternalCoordinator() throws Exception {
         waitForAllAsyncCallsFinish();
         return coordinator.internalCoordinator;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandler.java
index 4b1e579034e..7896e97fbfa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandler.java
@@ -153,14 +153,16 @@ public class DefaultOperatorCoordinatorHandler implements OperatorCoordinatorHan
     @Override
     public void registerAndStartNewCoordinators(
             Collection<OperatorCoordinatorHolder> coordinators,
-            ComponentMainThreadExecutor mainThreadExecutor) {
+            ComponentMainThreadExecutor mainThreadExecutor,
+            final int parallelism) {
 
         for (OperatorCoordinatorHolder coordinator : coordinators) {
             coordinatorMap.put(coordinator.operatorId(), coordinator);
             coordinator.lazyInitialize(
                     globalFailureHandler,
                     mainThreadExecutor,
-                    executionGraph.getCheckpointCoordinator());
+                    executionGraph.getCheckpointCoordinator(),
+                    parallelism);
         }
         startOperatorCoordinators(coordinators);
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OperatorCoordinatorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OperatorCoordinatorHandler.java
index 021950c9def..2cb281c0252 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OperatorCoordinatorHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OperatorCoordinatorHandler.java
@@ -78,5 +78,6 @@ public interface OperatorCoordinatorHandler {
      */
     void registerAndStartNewCoordinators(
             Collection<OperatorCoordinatorHolder> coordinators,
-            ComponentMainThreadExecutor mainThreadExecutor);
+            ComponentMainThreadExecutor mainThreadExecutor,
+            final int parallelism);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index e4169c6852b..97b4b24f8ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -67,6 +67,8 @@ import org.apache.flink.runtime.scheduler.VertexParallelismStore;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
+import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 
 import org.slf4j.Logger;
@@ -77,7 +79,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -104,6 +108,9 @@ public class AdaptiveBatchScheduler extends DefaultScheduler {
 
     private final HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint;
 
+    private final Map<JobVertexID, CompletableFuture<Integer>>
+            sourceParallelismFuturesByJobVertexId;
+
     public AdaptiveBatchScheduler(
             final Logger log,
             final JobGraph jobGraph,
@@ -172,22 +179,28 @@ public class AdaptiveBatchScheduler extends DefaultScheduler {
         this.blockingResultInfos = new HashMap<>();
 
         this.hybridPartitionDataConsumeConstraint = hybridPartitionDataConsumeConstraint;
+
+        this.sourceParallelismFuturesByJobVertexId = new HashMap<>();
     }
 
     @Override
     protected void startSchedulingInternal() {
-        initializeVerticesIfPossible();
-
-        super.startSchedulingInternal();
+        tryComputeSourceParallelismThenRunAsync(
+                (Void value, Throwable throwable) -> {
+                    initializeVerticesIfPossible();
+                    super.startSchedulingInternal();
+                });
     }
 
     @Override
     protected void onTaskFinished(final Execution execution, final IOMetrics ioMetrics) {
         checkNotNull(ioMetrics);
         updateResultPartitionBytesMetrics(ioMetrics.getResultPartitionBytes());
-        initializeVerticesIfPossible();
-
-        super.onTaskFinished(execution, ioMetrics);
+        tryComputeSourceParallelismThenRunAsync(
+                (Void value, Throwable throwable) -> {
+                    initializeVerticesIfPossible();
+                    super.onTaskFinished(execution, ioMetrics);
+                });
     }
 
     private void updateResultPartitionBytesMetrics(
@@ -257,6 +270,74 @@ public class AdaptiveBatchScheduler extends DefaultScheduler {
                         || hybridPartitionDataConsumeConstraint.isOnlyConsumeFinishedPartition();
     }
 
+    private void tryComputeSourceParallelismThenRunAsync(BiConsumer<Void, Throwable> action) {
+        // Ensure `initializeVerticesIfPossible` is invoked asynchronously post
+        // `computeDynamicSourceParallelism`. Any method required to run after
+        // `initializeVerticesIfPossible` should be enqueued within the same asynchronous action to
+        // maintain the correct execution order.
+        FutureUtils.ConjunctFuture<Void> dynamicSourceParallelismFutures =
+                FutureUtils.waitForAll(computeDynamicSourceParallelism());
+        dynamicSourceParallelismFutures
+                .whenCompleteAsync(action, getMainThreadExecutor())
+                .exceptionally(
+                        throwable -> {
+                            log.error("An unexpected error occurred while scheduling.", throwable);
+                            this.handleGlobalFailure(new SuppressRestartsException(throwable));
+                            return null;
+                        });
+    }
+
+    public List<CompletableFuture<Integer>> computeDynamicSourceParallelism() {
+        final List<CompletableFuture<Integer>> dynamicSourceParallelismFutures = new ArrayList<>();
+        for (ExecutionJobVertex jobVertex : getExecutionGraph().getVerticesTopologically()) {
+            List<SourceCoordinator<?, ?>> sourceCoordinators = jobVertex.getSourceCoordinators();
+            if (sourceCoordinators.isEmpty() || jobVertex.isParallelismDecided()) {
+                continue;
+            }
+            if (sourceParallelismFuturesByJobVertexId.containsKey(jobVertex.getJobVertexId())) {
+                dynamicSourceParallelismFutures.add(
+                        sourceParallelismFuturesByJobVertexId.get(jobVertex.getJobVertexId()));
+                continue;
+            }
+
+            // We need to wait for the upstream vertex to complete, otherwise, dynamic filtering
+            // information will be inaccessible during source parallelism inference.
+            Optional<List<BlockingResultInfo>> consumedResultsInfo =
+                    tryGetConsumedResultsInfo(jobVertex);
+            if (consumedResultsInfo.isPresent()) {
+                List<CompletableFuture<Integer>> sourceParallelismFutures =
+                        sourceCoordinators.stream()
+                                .map(
+                                        sourceCoordinator ->
+                                                sourceCoordinator.inferSourceParallelismAsync(
+                                                        vertexParallelismAndInputInfosDecider
+                                                                .computeSourceParallelismUpperBound(
+                                                                        jobVertex.getJobVertexId(),
+                                                                        jobVertex
+                                                                                .getMaxParallelism()),
+                                                        vertexParallelismAndInputInfosDecider
+                                                                .getDataVolumePerTask()))
+                                .collect(Collectors.toList());
+                CompletableFuture<Integer> dynamicSourceParallelismFuture =
+                        mergeDynamicParallelismFutures(sourceParallelismFutures);
+                sourceParallelismFuturesByJobVertexId.put(
+                        jobVertex.getJobVertexId(), dynamicSourceParallelismFuture);
+                dynamicSourceParallelismFutures.add(dynamicSourceParallelismFuture);
+            }
+        }
+
+        return dynamicSourceParallelismFutures;
+    }
+
+    @VisibleForTesting
+    static CompletableFuture<Integer> mergeDynamicParallelismFutures(
+            List<CompletableFuture<Integer>> sourceParallelismFutures) {
+        return sourceParallelismFutures.stream()
+                .reduce(
+                        CompletableFuture.completedFuture(ExecutionConfig.PARALLELISM_DEFAULT),
+                        (a, b) -> a.thenCombine(b, Math::max));
+    }
+
     @VisibleForTesting
     public void initializeVerticesIfPossible() {
         final List<ExecutionJobVertex> newlyInitializedJobVertices = new ArrayList<>();
@@ -278,9 +359,7 @@ public class AdaptiveBatchScheduler extends DefaultScheduler {
                     // ExecutionGraph#initializeJobVertex(ExecutionJobVertex, long) to initialize.
                     // TODO: In the future, if we want to load balance for job vertices whose
                     // parallelism has already been decided, we need to refactor the logic here.
-                    getExecutionGraph()
-                            .initializeJobVertex(
-                                    jobVertex, createTimestamp, jobManagerJobMetricGroup);
+                    getExecutionGraph().initializeJobVertex(jobVertex, createTimestamp);
                     newlyInitializedJobVertices.add(jobVertex);
                 } else {
                     Optional<List<BlockingResultInfo>> consumedResultsInfo =
@@ -296,8 +375,7 @@ public class AdaptiveBatchScheduler extends DefaultScheduler {
                                 .initializeJobVertex(
                                         jobVertex,
                                         createTimestamp,
-                                        parallelismAndInputInfos.getJobVertexInputInfos(),
-                                        jobManagerJobMetricGroup);
+                                        parallelismAndInputInfos.getJobVertexInputInfos());
                         newlyInitializedJobVertices.add(jobVertex);
                     }
                 }
@@ -314,34 +392,51 @@ public class AdaptiveBatchScheduler extends DefaultScheduler {
 
     private ParallelismAndInputInfos tryDecideParallelismAndInputInfos(
             final ExecutionJobVertex jobVertex, List<BlockingResultInfo> inputs) {
-        int parallelism = jobVertex.getParallelism();
+        int vertexInitialParallelism = jobVertex.getParallelism();
         ForwardGroup forwardGroup = forwardGroupsByJobVertexId.get(jobVertex.getJobVertexId());
         if (!jobVertex.isParallelismDecided()
                 && forwardGroup != null
                 && forwardGroup.isParallelismDecided()) {
-            parallelism = forwardGroup.getParallelism();
+            vertexInitialParallelism = forwardGroup.getParallelism();
             log.info(
                     "Parallelism of JobVertex: {} ({}) is decided to be {} according to forward group's parallelism.",
                     jobVertex.getName(),
                     jobVertex.getJobVertexId(),
-                    parallelism);
+                    vertexInitialParallelism);
+        }
+
+        int vertexMinParallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+        if (sourceParallelismFuturesByJobVertexId.containsKey(jobVertex.getJobVertexId())) {
+            int dynamicSourceParallelism = getDynamicSourceParallelism(jobVertex);
+            // If the JobVertex only acts as a source vertex, dynamicSourceParallelism will serve as
+            // the vertex's initial parallelism and will remain unchanged. If the JobVertex is also
+            // a source with upstream inputs, dynamicSourceParallelism will serve as the vertex's
+            // minimum parallelism, with the final parallelism being the maximum of
+            // dynamicSourceParallelism and the vertex's dynamic parallelism according to upstream
+            // inputs.
+            if (!inputs.isEmpty()) {
+                vertexMinParallelism = dynamicSourceParallelism;
+            } else {
+                vertexInitialParallelism = dynamicSourceParallelism;
+            }
         }
 
         final ParallelismAndInputInfos parallelismAndInputInfos =
                 vertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex(
                         jobVertex.getJobVertexId(),
                         inputs,
-                        parallelism,
+                        vertexInitialParallelism,
+                        vertexMinParallelism,
                         jobVertex.getMaxParallelism());
 
-        if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
+        if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
             log.info(
                     "Parallelism of JobVertex: {} ({}) is decided to be {}.",
                     jobVertex.getName(),
                     jobVertex.getJobVertexId(),
                     parallelismAndInputInfos.getParallelism());
         } else {
-            checkState(parallelismAndInputInfos.getParallelism() == parallelism);
+            checkState(parallelismAndInputInfos.getParallelism() == vertexInitialParallelism);
         }
 
         if (forwardGroup != null && !forwardGroup.isParallelismDecided()) {
@@ -351,6 +446,36 @@ public class AdaptiveBatchScheduler extends DefaultScheduler {
         return parallelismAndInputInfos;
     }
 
+    private int getDynamicSourceParallelism(ExecutionJobVertex jobVertex) {
+        CompletableFuture<Integer> dynamicSourceParallelismFuture =
+                sourceParallelismFuturesByJobVertexId.get(jobVertex.getJobVertexId());
+        int dynamicSourceParallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+        if (dynamicSourceParallelismFuture != null) {
+            dynamicSourceParallelism = dynamicSourceParallelismFuture.join();
+            int vertexMaxParallelism = jobVertex.getMaxParallelism();
+            if (dynamicSourceParallelism > vertexMaxParallelism) {
+                log.info(
+                        "The dynamic inferred source parallelism {} is larger than the maximum parallelism {}. "
+                                + "Use {} as the upper bound parallelism of source job vertex {}.",
+                        dynamicSourceParallelism,
+                        vertexMaxParallelism,
+                        vertexMaxParallelism,
+                        jobVertex.getJobVertexId());
+                dynamicSourceParallelism = vertexMaxParallelism;
+            } else if (dynamicSourceParallelism > 0) {
+                log.info(
+                        "Parallelism of JobVertex: {} ({}) is decided to be {} according to dynamic source parallelism inference.",
+                        jobVertex.getName(),
+                        jobVertex.getJobVertexId(),
+                        dynamicSourceParallelism);
+            } else {
+                dynamicSourceParallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+            }
+        }
+
+        return dynamicSourceParallelism;
+    }
+
     private void enrichInputBytesForExecutionVertices(List<ExecutionVertex> executionVertices) {
         for (ExecutionVertex ev : executionVertices) {
             List<IntermediateResult> intermediateResults = ev.getJobVertex().getInputs();
@@ -451,7 +576,7 @@ public class AdaptiveBatchScheduler extends DefaultScheduler {
 
     private void initializeOperatorCoordinatorsFor(ExecutionJobVertex vertex) {
         operatorCoordinatorHandler.registerAndStartNewCoordinators(
-                vertex.getOperatorCoordinators(), getMainThreadExecutor());
+                vertex.getOperatorCoordinators(), getMainThreadExecutor(), vertex.getParallelism());
     }
 
     /**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
index e7326562852..4d5708e60c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
@@ -106,27 +106,34 @@ public class DefaultVertexParallelismAndInputInfosDecider
             JobVertexID jobVertexId,
             List<BlockingResultInfo> consumedResults,
             int vertexInitialParallelism,
+            int vertexMinParallelism,
             int vertexMaxParallelism) {
         checkArgument(
                 vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
                         || vertexInitialParallelism > 0);
-        checkArgument(vertexMaxParallelism > 0 && vertexMaxParallelism >= vertexInitialParallelism);
+        checkArgument(
+                vertexMinParallelism == ExecutionConfig.PARALLELISM_DEFAULT
+                        || vertexMinParallelism > 0);
+        checkArgument(
+                vertexMaxParallelism > 0
+                        && vertexMaxParallelism >= vertexInitialParallelism
+                        && vertexMaxParallelism >= vertexMinParallelism);
 
         if (consumedResults.isEmpty()) {
             // source job vertex
             int parallelism =
                     vertexInitialParallelism > 0
                             ? vertexInitialParallelism
-                            : computeSourceParallelism(jobVertexId, vertexMaxParallelism);
+                            : computeSourceParallelismUpperBound(jobVertexId, vertexMaxParallelism);
             return new ParallelismAndInputInfos(parallelism, Collections.emptyMap());
         } else {
-            int minParallelism = globalMinParallelism;
+            int minParallelism = Math.max(globalMinParallelism, vertexMinParallelism);
             int maxParallelism = globalMaxParallelism;
 
             if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
                     && vertexMaxParallelism < minParallelism) {
                 LOG.info(
-                        "The vertex maximum parallelism {} is smaller than the global minimum parallelism {}. "
+                        "The vertex maximum parallelism {} is smaller than the minimum parallelism {}. "
                                 + "Use {} as the lower bound to decide parallelism of job vertex {}.",
                         vertexMaxParallelism,
                         minParallelism,
@@ -167,11 +174,12 @@ public class DefaultVertexParallelismAndInputInfosDecider
         }
     }
 
-    private int computeSourceParallelism(JobVertexID jobVertexId, int maxParallelism) {
+    @Override
+    public int computeSourceParallelismUpperBound(JobVertexID jobVertexId, int maxParallelism) {
         if (globalDefaultSourceParallelism > maxParallelism) {
             LOG.info(
                     "The global default source parallelism {} is larger than the maximum parallelism {}. "
-                            + "Use {} as the parallelism of source job vertex {}.",
+                            + "Use {} as the upper bound parallelism of source job vertex {}.",
                     globalDefaultSourceParallelism,
                     maxParallelism,
                     maxParallelism,
@@ -182,6 +190,11 @@ public class DefaultVertexParallelismAndInputInfosDecider
         }
     }
 
+    @Override
+    public long getDataVolumePerTask() {
+        return dataVolumePerTask;
+    }
+
     private static boolean areAllInputsAllToAll(List<BlockingResultInfo> consumedResults) {
         return consumedResults.stream().noneMatch(BlockingResultInfo::isPointwise);
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/VertexParallelismAndInputInfosDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/VertexParallelismAndInputInfosDecider.java
index b2144a10de7..2ae7f9541b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/VertexParallelismAndInputInfosDecider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/VertexParallelismAndInputInfosDecider.java
@@ -41,6 +41,7 @@ public interface VertexParallelismAndInputInfosDecider {
      *     number, it will be respected. If it's not set(equals to {@link
      *     ExecutionConfig#PARALLELISM_DEFAULT}), a parallelism will be automatically decided for
      *     the vertex.
+     * @param vertexMinParallelism The min parallelism of the job vertex.
      * @param vertexMaxParallelism The max parallelism of the job vertex.
      * @return the parallelism and vertex input infos.
      */
@@ -48,5 +49,22 @@ public interface VertexParallelismAndInputInfosDecider {
             JobVertexID jobVertexId,
             List<BlockingResultInfo> consumedResults,
             int vertexInitialParallelism,
+            int vertexMinParallelism,
             int vertexMaxParallelism);
+
+    /**
+     * Compute source parallelism upper bound for the source vertex.
+     *
+     * @param jobVertexId The job vertex id
+     * @param maxParallelism The max parallelism of the job vertex.
+     * @return the upper bound parallelism for the source vertex.
+     */
+    int computeSourceParallelismUpperBound(JobVertexID jobVertexId, int maxParallelism);
+
+    /**
+     * Get the average size of data volume to expect each task instance to process.
+     *
+     * @return the data volume.
+     */
+    long getDataVolumePerTask();
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index b69f8172ab1..c9e6eb8a2a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.source.coordinator;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
+import org.apache.flink.api.connector.source.DynamicFilteringInfo;
+import org.apache.flink.api.connector.source.DynamicParallelismInference;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceSplit;
@@ -653,6 +656,60 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
         }
     }
 
+    private Optional<DynamicFilteringInfo> getSourceDynamicFilteringInfo() {
+        if (coordinatorListeningID != null
+                && coordinatorStore.containsKey(coordinatorListeningID)) {
+            Object event = coordinatorStore.get(coordinatorListeningID);
+            if (event instanceof SourceEventWrapper) {
+                SourceEvent sourceEvent = ((SourceEventWrapper) event).getSourceEvent();
+                if (sourceEvent instanceof DynamicFilteringInfo) {
+                    return Optional.of((DynamicFilteringInfo) sourceEvent);
+                }
+            }
+        }
+
+        return Optional.empty();
+    }
+
+    public CompletableFuture<Integer> inferSourceParallelismAsync(
+            int parallelismInferenceUpperBound, long dataVolumePerTask) {
+        return context.supplyAsync(
+                        () -> {
+                            if (!(source instanceof DynamicParallelismInference)) {
+                                return ExecutionConfig.PARALLELISM_DEFAULT;
+                            }
+
+                            DynamicParallelismInference parallelismInference =
+                                    (DynamicParallelismInference) source;
+                            try {
+                                return parallelismInference.inferParallelism(
+                                        new DynamicParallelismInference.Context() {
+                                            @Override
+                                            public Optional<DynamicFilteringInfo>
+                                                    getDynamicFilteringInfo() {
+                                                return getSourceDynamicFilteringInfo();
+                                            }
+
+                                            @Override
+                                            public int getParallelismInferenceUpperBound() {
+                                                return parallelismInferenceUpperBound;
+                                            }
+
+                                            @Override
+                                            public long getDataVolumePerTask() {
+                                                return dataVolumePerTask;
+                                            }
+                                        });
+                            } catch (Throwable e) {
+                                LOG.error(
+                                        "Unexpected error occurred when dynamically inferring source parallelism.",
+                                        e);
+                                return ExecutionConfig.PARALLELISM_DEFAULT;
+                            }
+                        })
+                .thenApply(future -> (Integer) future);
+    }
+
     /** The watermark element for {@link HeapPriorityQueue}. */
     public static class WatermarkElement extends AbstractHeapPriorityQueueElement
             implements PriorityComparable<WatermarkElement> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index bc13eb7ed61..ced3d93df0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.source.coordinator;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.connector.source.ReaderInfo;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceSplit;
@@ -60,6 +61,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -70,6 +72,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
+import java.util.function.Supplier;
 
 import static org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.shutdownExecutorForcefully;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -109,10 +112,10 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
     private final SplitAssignmentTracker<SplitT> assignmentTracker;
     private final SourceCoordinatorProvider.CoordinatorExecutorThreadFactory
             coordinatorThreadFactory;
-    private final SubtaskGateways subtaskGateways;
+    private SubtaskGateways subtaskGateways;
     private final String coordinatorThreadName;
     private final boolean supportsConcurrentExecutionAttempts;
-    private final boolean[] subtaskHasNoMoreSplits;
+    private boolean[] subtaskHasNoMoreSplits;
     private volatile boolean closed;
     private volatile TernaryBoolean backlog = TernaryBoolean.UNDEFINED;
 
@@ -155,11 +158,6 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
         this.coordinatorThreadName = coordinatorThreadFactory.getCoordinatorThreadName();
         this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;
 
-        final int parallelism = operatorCoordinatorContext.currentParallelism();
-        this.subtaskGateways = new SubtaskGateways(parallelism);
-        this.subtaskHasNoMoreSplits = new boolean[parallelism];
-        Arrays.fill(subtaskHasNoMoreSplits, false);
-
         final Executor errorHandlingCoordinatorExecutor =
                 (runnable) ->
                         coordinatorExecutor.execute(
@@ -180,6 +178,7 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
 
     @Override
     public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
+        checkAndLazyInitialize();
         checkState(
                 !supportsConcurrentExecutionAttempts,
                 "The split enumerator must invoke SplitEnumeratorContext"
@@ -201,6 +200,7 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
 
     @Override
     public void sendEventToSourceReader(int subtaskId, int attemptNumber, SourceEvent event) {
+        checkAndLazyInitialize();
         checkSubtaskIndex(subtaskId);
 
         callInCoordinatorThread(
@@ -216,6 +216,7 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
     }
 
     void sendEventToSourceOperator(int subtaskId, OperatorEvent event) {
+        checkAndLazyInitialize();
         checkSubtaskIndex(subtaskId);
 
         callInCoordinatorThread(
@@ -229,6 +230,7 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
     }
 
     void sendEventToSourceOperatorIfTaskReady(int subtaskId, OperatorEvent event) {
+        checkAndLazyInitialize();
         checkSubtaskIndex(subtaskId);
 
         callInCoordinatorThread(
@@ -389,18 +391,21 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
     // --------- Package private additional methods for the SourceCoordinator ------------
 
     void attemptReady(OperatorCoordinator.SubtaskGateway gateway) {
+        checkAndLazyInitialize();
         checkState(coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
 
         subtaskGateways.registerSubtaskGateway(gateway);
     }
 
     void attemptFailed(int subtaskIndex, int attemptNumber) {
+        checkAndLazyInitialize();
         checkState(coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
 
         subtaskGateways.unregisterSubtaskGateway(subtaskIndex, attemptNumber);
     }
 
     void subtaskReset(int subtaskIndex) {
+        checkAndLazyInitialize();
         checkState(coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
 
         subtaskGateways.reset(subtaskIndex);
@@ -409,6 +414,7 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
     }
 
     boolean hasNoMoreSplits(int subtaskIndex) {
+        checkAndLazyInitialize();
         return subtaskHasNoMoreSplits[subtaskIndex];
     }
 
@@ -527,6 +533,10 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
                 unit);
     }
 
+    CompletableFuture<?> supplyAsync(Supplier<?> task) {
+        return CompletableFuture.supplyAsync(task, coordinatorExecutor);
+    }
+
     // ---------------- private helper methods -----------------
 
     private void checkSubtaskIndex(int subtaskIndex) {
@@ -538,6 +548,16 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
         }
     }
 
+    private void checkAndLazyInitialize() {
+        if (subtaskGateways == null) {
+            final int parallelism = operatorCoordinatorContext.currentParallelism();
+            checkState(parallelism != ExecutionConfig.PARALLELISM_DEFAULT);
+            this.subtaskGateways = new SubtaskGateways(parallelism);
+            this.subtaskHasNoMoreSplits = new boolean[parallelism];
+            Arrays.fill(subtaskHasNoMoreSplits, false);
+        }
+    }
+
     /**
      * A helper method that delegates the callable to the coordinator thread if the current thread
      * is not the coordinator thread, otherwise call the callable right away.
@@ -583,6 +603,7 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
     }
 
     private void assignSplitsToAttempt(int subtaskIndex, int attemptNumber, List<SplitT> splits) {
+        checkAndLazyInitialize();
         if (splits.isEmpty()) {
             return;
         }
@@ -607,6 +628,7 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
     }
 
     private void signalNoMoreSplitsToAttempt(int subtaskIndex, int attemptNumber) {
+        checkAndLazyInitialize();
         checkAttemptReaderReady(subtaskIndex, attemptNumber);
 
         final OperatorCoordinator.SubtaskGateway gateway =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
index 8aec8f5c520..e574a59fb26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
@@ -430,7 +430,7 @@ class DefaultExecutionGraphConstructionTest {
         eg.attachJobGraph(ordered, JOB_MANAGER_JOB_METRIC_GROUP);
 
         ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
-        eg.initializeJobVertex(ejv1, 0L, JOB_MANAGER_JOB_METRIC_GROUP);
+        eg.initializeJobVertex(ejv1, 0L);
 
         IntermediateResult result =
                 Objects.requireNonNull(eg.getJobVertex(v1.getID())).getProducedDataSets()[0];
@@ -444,7 +444,7 @@ class DefaultExecutionGraphConstructionTest {
         assertThat(partition1.getConsumedPartitionGroups()).isEmpty();
 
         ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
-        eg.initializeJobVertex(ejv2, 0L, JOB_MANAGER_JOB_METRIC_GROUP);
+        eg.initializeJobVertex(ejv2, 0L);
 
         ConsumedPartitionGroup consumedPartitionGroup =
                 partition1.getConsumedPartitionGroups().get(0);
@@ -468,7 +468,7 @@ class DefaultExecutionGraphConstructionTest {
         eg.attachJobGraph(ordered, JOB_MANAGER_JOB_METRIC_GROUP);
 
         ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
-        eg.initializeJobVertex(ejv1, 0L, JOB_MANAGER_JOB_METRIC_GROUP);
+        eg.initializeJobVertex(ejv1, 0L);
 
         IntermediateResult result =
                 Objects.requireNonNull(eg.getJobVertex(v1.getID())).getProducedDataSets()[0];
@@ -487,7 +487,7 @@ class DefaultExecutionGraphConstructionTest {
         assertThat(partition4.getConsumedPartitionGroups()).isEmpty();
 
         ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
-        eg.initializeJobVertex(ejv2, 0L, JOB_MANAGER_JOB_METRIC_GROUP);
+        eg.initializeJobVertex(ejv2, 0L);
 
         ConsumedPartitionGroup consumedPartitionGroup1 =
                 partition1.getConsumedPartitionGroups().get(0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtilTest.java
index dc019fcfa90..9f720ed3890 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtilTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtilTest.java
@@ -100,17 +100,12 @@ class EdgeManagerBuildUtilTest {
         final ExecutionJobVertex consumer = vertexIterator.next();
 
         // initialize producer and consumer
-        eg.initializeJobVertex(
-                producer,
-                1L,
-                Collections.emptyMap(),
-                UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+        eg.initializeJobVertex(producer, 1L, Collections.emptyMap());
         eg.initializeJobVertex(
                 consumer,
                 1L,
                 Collections.singletonMap(
-                        producer.getProducedDataSets()[0].getId(), jobVertexInputInfo),
-                UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+                        producer.getProducedDataSets()[0].getId(), jobVertexInputInfo));
 
         IntermediateResult result =
                 Objects.requireNonNull(eg.getJobVertex(producer.getJobVertexId()))
@@ -175,17 +170,12 @@ class EdgeManagerBuildUtilTest {
         final ExecutionJobVertex consumer = vertexIterator.next();
 
         // initialize producer and consumer
-        eg.initializeJobVertex(
-                producer,
-                1L,
-                Collections.emptyMap(),
-                UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+        eg.initializeJobVertex(producer, 1L, Collections.emptyMap());
         eg.initializeJobVertex(
                 consumer,
                 1L,
                 Collections.singletonMap(
-                        producer.getProducedDataSets()[0].getId(), jobVertexInputInfo),
-                UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+                        producer.getProducedDataSets()[0].getId(), jobVertexInputInfo));
 
         IntermediateResult result =
                 Objects.requireNonNull(eg.getJobVertex(producer.getJobVertexId()))
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
index c601e5e151b..174b27db580 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
@@ -159,9 +159,7 @@ class ExecutionJobVertexTest {
                 1,
                 Time.milliseconds(1L),
                 1L,
-                new DefaultSubtaskAttemptNumberStore(Collections.emptyList()),
-                new CoordinatorStoreImpl(),
-                UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+                new DefaultSubtaskAttemptNumberStore(Collections.emptyList()));
     }
 
     private static ExecutionJobVertex createDynamicExecutionJobVertex() throws Exception {
@@ -192,6 +190,11 @@ class ExecutionJobVertexTest {
         final VertexParallelismInformation vertexParallelismInfo =
                 vertexParallelismStore.getParallelismInfo(jobVertex.getID());
 
-        return new ExecutionJobVertex(eg, jobVertex, vertexParallelismInfo);
+        return new ExecutionJobVertex(
+                eg,
+                jobVertex,
+                vertexParallelismInfo,
+                new CoordinatorStoreImpl(),
+                UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandlerTest.java
index b004a1a7db8..fe36bb358a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandlerTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorResource;
@@ -64,17 +63,17 @@ public class DefaultOperatorCoordinatorHandlerTest {
         ExecutionJobVertex ejv2 = executionGraph.getJobVertex(jobVertices[1].getID());
         executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
-        executionGraph.initializeJobVertex(
-                ejv1, 0L, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+        executionGraph.initializeJobVertex(ejv1, 0L);
 
         DefaultOperatorCoordinatorHandler handler =
                 new DefaultOperatorCoordinatorHandler(executionGraph, throwable -> {});
         assertThat(handler.getCoordinatorMap().keySet(), containsInAnyOrder(operatorId1));
 
-        executionGraph.initializeJobVertex(
-                ejv2, 0L, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+        executionGraph.initializeJobVertex(ejv2, 0L);
         handler.registerAndStartNewCoordinators(
-                ejv2.getOperatorCoordinators(), executionGraph.getJobMasterMainThreadExecutor());
+                ejv2.getOperatorCoordinators(),
+                executionGraph.getJobMasterMainThreadExecutor(),
+                ejv2.getParallelism());
 
         assertThat(
                 handler.getCoordinatorMap().keySet(), containsInAnyOrder(operatorId1, operatorId2));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
index b4612af6f3d..7df0a95bc9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
+import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
 import org.apache.flink.runtime.scheduler.adaptivebatch.SpeculativeScheduler;
 import org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismAndInputInfosDecider;
 import org.apache.flink.runtime.scheduler.strategy.AllFinishedInputConsumableDecider;
@@ -66,6 +67,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Function;
@@ -415,17 +417,36 @@ public class DefaultSchedulerBuilder {
 
     public static VertexParallelismAndInputInfosDecider createCustomParallelismDecider(
             Function<JobVertexID, Integer> parallelismFunction) {
-        return (jobVertexId, consumedResults, vertexInitialParallelism, ignored) -> {
-            int parallelism =
-                    vertexInitialParallelism > 0
-                            ? vertexInitialParallelism
-                            : parallelismFunction.apply(jobVertexId);
-            return new ParallelismAndInputInfos(
-                    parallelism,
-                    consumedResults.isEmpty()
-                            ? Collections.emptyMap()
-                            : VertexInputInfoComputationUtils.computeVertexInputInfos(
-                                    parallelism, consumedResults, true));
+        return new VertexParallelismAndInputInfosDecider() {
+            @Override
+            public ParallelismAndInputInfos decideParallelismAndInputInfosForVertex(
+                    JobVertexID jobVertexId,
+                    List<BlockingResultInfo> consumedResults,
+                    int vertexInitialParallelism,
+                    int vertexMinParallelism,
+                    int vertexMaxParallelism) {
+                int parallelism =
+                        vertexInitialParallelism > 0
+                                ? vertexInitialParallelism
+                                : parallelismFunction.apply(jobVertexId);
+                return new ParallelismAndInputInfos(
+                        parallelism,
+                        consumedResults.isEmpty()
+                                ? Collections.emptyMap()
+                                : VertexInputInfoComputationUtils.computeVertexInputInfos(
+                                        parallelism, consumedResults, true));
+            }
+
+            @Override
+            public int computeSourceParallelismUpperBound(
+                    JobVertexID jobVertexId, int maxParallelism) {
+                return parallelismFunction.apply(jobVertexId);
+            }
+
+            @Override
+            public long getDataVolumePerTask() {
+                return 1;
+            }
         };
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index a57f648d770..48743fe3998 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -54,7 +54,6 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.StateBackend;
@@ -407,9 +406,7 @@ public class SchedulerTestingUtils {
             JobVertexID jobVertex, ExecutionGraph executionGraph) {
         try {
             executionGraph.initializeJobVertex(
-                    executionGraph.getJobVertex(jobVertex),
-                    System.currentTimeMillis(),
-                    UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+                    executionGraph.getJobVertex(jobVertex), System.currentTimeMillis());
             executionGraph.notifyNewlyInitializedJobVertices(
                     Collections.singletonList(executionGraph.getJobVertex(jobVertex)));
         } catch (JobException exception) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
index cafb634b068..9046d0e61c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ProducerDescriptor;
@@ -153,18 +152,15 @@ public class SsgNetworkMemoryCalculationUtilsTest {
         ExecutionJobVertex map = jobVertices.next();
         ExecutionJobVertex sink = jobVertices.next();
 
-        executionGraph.initializeJobVertex(
-                source, 0L, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+        executionGraph.initializeJobVertex(source, 0L);
         triggerComputeNumOfSubpartitions(source.getProducedDataSets()[0]);
 
         map.setParallelism(5);
-        executionGraph.initializeJobVertex(
-                map, 0L, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+        executionGraph.initializeJobVertex(map, 0L);
         triggerComputeNumOfSubpartitions(map.getProducedDataSets()[0]);
 
         sink.setParallelism(7);
-        executionGraph.initializeJobVertex(
-                sink, 0L, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+        executionGraph.initializeJobVertex(sink, 0L);
 
         assertNetworkMemory(
                 slotSharingGroups,
@@ -229,18 +225,12 @@ public class SsgNetworkMemoryCalculationUtilsTest {
         final ExecutionJobVertex producer = vertexIterator.next();
         final ExecutionJobVertex consumer = vertexIterator.next();
 
-        eg.initializeJobVertex(
-                producer,
-                0L,
-                UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+        eg.initializeJobVertex(producer, 0L);
         final IntermediateResult result = producer.getProducedDataSets()[0];
         triggerComputeNumOfSubpartitions(result);
 
         consumer.setParallelism(decidedConsumerParallelism);
-        eg.initializeJobVertex(
-                consumer,
-                0L,
-                UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+        eg.initializeJobVertex(consumer, 0L);
 
         Map<IntermediateDataSetID, Integer> maxInputChannelNums = new HashMap<>();
         Map<IntermediateDataSetID, ResultPartitionType> inputPartitionTypes = new HashMap<>();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
index 4aa2b11faad..eb603e1ef2d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
@@ -178,13 +177,11 @@ class DefaultExecutionTopologyTest {
         final ExecutionJobVertex ejv1 = executionGraph.getJobVertex(jobVertices[0].getID());
         final ExecutionJobVertex ejv2 = executionGraph.getJobVertex(jobVertices[1].getID());
 
-        executionGraph.initializeJobVertex(
-                ejv1, 0L, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+        executionGraph.initializeJobVertex(ejv1, 0L);
         adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv1));
         assertThat(adapter.getVertices()).hasSize(3);
 
-        executionGraph.initializeJobVertex(
-                ejv2, 0L, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+        executionGraph.initializeJobVertex(ejv2, 0L);
         adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv2));
         assertThat(adapter.getVertices()).hasSize(6);
 
@@ -200,12 +197,10 @@ class DefaultExecutionTopologyTest {
         final ExecutionJobVertex ejv1 = executionGraph.getJobVertex(jobVertices[0].getID());
         final ExecutionJobVertex ejv2 = executionGraph.getJobVertex(jobVertices[1].getID());
 
-        executionGraph.initializeJobVertex(
-                ejv1, 0L, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+        executionGraph.initializeJobVertex(ejv1, 0L);
         adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv1));
 
-        executionGraph.initializeJobVertex(
-                ejv2, 0L, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+        executionGraph.initializeJobVertex(ejv2, 0L);
         assertThatThrownBy(
                         () ->
                                 adapter.notifyExecutionGraphUpdated(
@@ -222,14 +217,12 @@ class DefaultExecutionTopologyTest {
         final ExecutionJobVertex ejv1 = executionGraph.getJobVertex(jobVertices[0].getID());
         final ExecutionJobVertex ejv2 = executionGraph.getJobVertex(jobVertices[1].getID());
 
-        executionGraph.initializeJobVertex(
-                ejv1, 0L, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+        executionGraph.initializeJobVertex(ejv1, 0L);
         adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv1));
         SchedulingPipelinedRegion regionOld =
                 adapter.getPipelinedRegionOfVertex(new ExecutionVertexID(ejv1.getJobVertexId(), 0));
 
-        executionGraph.initializeJobVertex(
-                ejv2, 0L, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+        executionGraph.initializeJobVertex(ejv2, 0L);
         adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv2));
         SchedulingPipelinedRegion regionNew =
                 adapter.getPipelinedRegionOfVertex(new ExecutionVertexID(ejv1.getJobVertexId(), 0));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index 36dcf1d143c..7431658d979 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -961,15 +961,15 @@ public class ExecutingTest extends TestLogger {
             super(
                     new MockInternalExecutionGraphAccessor(),
                     new JobVertex("test"),
-                    new DefaultVertexParallelismInfo(1, 1, max -> Optional.empty()));
+                    new DefaultVertexParallelismInfo(1, 1, max -> Optional.empty()),
+                    new CoordinatorStoreImpl(),
+                    UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
 
             initialize(
                     1,
                     Time.milliseconds(1L),
                     1L,
-                    new DefaultSubtaskAttemptNumberStore(Collections.emptyList()),
-                    new CoordinatorStoreImpl(),
-                    UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+                    new DefaultSubtaskAttemptNumberStore(Collections.emptyList()));
             mockExecutionVertex = executionVertexSupplier.apply(this);
         }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
index e9932264941..99faf87068e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
@@ -390,8 +390,7 @@ class StateTrackingMockExecutionGraph implements ExecutionGraph {
     public void initializeJobVertex(
             ExecutionJobVertex ejv,
             long createTimestamp,
-            Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos,
-            JobManagerJobMetricGroup jobManagerJobMetricGroup)
+            Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos)
             throws JobException {
         throw new UnsupportedOperationException();
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingOperatorCoordinatorHandler.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingOperatorCoordinatorHandler.java
index a9b5669e98d..2e802cea771 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingOperatorCoordinatorHandler.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingOperatorCoordinatorHandler.java
@@ -69,7 +69,8 @@ class TestingOperatorCoordinatorHandler implements OperatorCoordinatorHandler {
     @Override
     public void registerAndStartNewCoordinators(
             Collection<OperatorCoordinatorHolder> coordinators,
-            ComponentMainThreadExecutor mainThreadExecutor) {
+            ComponentMainThreadExecutor mainThreadExecutor,
+            final int parallelism) {
         throw new UnsupportedOperationException();
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
index bdd592e0517..93e6d2c1773 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.runtime.scheduler.adaptivebatch;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BatchExecutionOptions;
 import org.apache.flink.core.failure.FailureEnricher;
@@ -58,6 +59,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -65,6 +67,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
@@ -75,7 +78,6 @@ import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createFin
 import static org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDeciderTest.createDecider;
 import static org.apache.flink.shaded.guava32.com.google.common.collect.Iterables.getOnlyElement;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link AdaptiveBatchScheduler}. */
 class AdaptiveBatchSchedulerTest {
@@ -106,7 +108,7 @@ class AdaptiveBatchSchedulerTest {
         final SchedulerBase scheduler =
                 createScheduler(jobGraph, Collections.singleton(failureEnricher), restartStrategy);
         // Triggered failure on initializeJobVertex that should be labeled
-        assertThatThrownBy(scheduler::startScheduling).isInstanceOf(IllegalStateException.class);
+        scheduler.startScheduling();
         final Iterable<RootExceptionHistoryEntry> exceptionHistory =
                 scheduler.requestJob().getExceptionHistory();
         final RootExceptionHistoryEntry failure = exceptionHistory.iterator().next();
@@ -351,6 +353,34 @@ class AdaptiveBatchSchedulerTest {
         assertThat(source.getParallelism()).isEqualTo(8);
     }
 
+    @Test
+    void testMergeDynamicParallelismFutures() {
+        List<CompletableFuture<Integer>> sourceParallelismFutures = new ArrayList<>();
+
+        CompletableFuture<Integer> sourceParallelismFuture =
+                CompletableFuture.completedFuture(ExecutionConfig.PARALLELISM_DEFAULT);
+        sourceParallelismFutures.add(sourceParallelismFuture);
+
+        // Testing scenarios without effective parallelism
+        CompletableFuture<Integer> mergedSourceParallelismFuture =
+                AdaptiveBatchScheduler.mergeDynamicParallelismFutures(sourceParallelismFutures);
+        assertThat(mergedSourceParallelismFuture.join())
+                .isEqualTo(ExecutionConfig.PARALLELISM_DEFAULT);
+
+        CompletableFuture<Integer> sourceParallelismFuture1 = CompletableFuture.completedFuture(1);
+        CompletableFuture<Integer> sourceParallelismFuture2 = CompletableFuture.completedFuture(2);
+        CompletableFuture<Integer> sourceParallelismFuture4 = CompletableFuture.completedFuture(4);
+
+        sourceParallelismFutures.add(sourceParallelismFuture1);
+        sourceParallelismFutures.add(sourceParallelismFuture2);
+        sourceParallelismFutures.add(sourceParallelismFuture4);
+
+        // Testing scenarios with multiple sources in ExecutionJobVertex
+        mergedSourceParallelismFuture =
+                AdaptiveBatchScheduler.mergeDynamicParallelismFutures(sourceParallelismFutures);
+        assertThat(mergedSourceParallelismFuture.join()).isEqualTo(4);
+    }
+
     void testUserConfiguredMaxParallelism(
             int globalMinParallelism,
             int globalMaxParallelism,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java
index 4ed1a3e5d39..4b0c3a2a7a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java
@@ -298,6 +298,7 @@ class DefaultVertexParallelismAndInputInfosDeciderTest {
                         new JobVertexID(),
                         Collections.singletonList(allToAllBlockingResultInfo),
                         3,
+                        MIN_PARALLELISM,
                         MAX_PARALLELISM);
 
         assertThat(parallelismAndInputInfos.getParallelism()).isEqualTo(3);
@@ -322,6 +323,36 @@ class DefaultVertexParallelismAndInputInfosDeciderTest {
         assertThat(parallelismAndInputInfos.getJobVertexInputInfos()).isEmpty();
     }
 
+    @Test
+    void testDynamicSourceParallelismWithUpstreamInputs() {
+        final DefaultVertexParallelismAndInputInfosDecider decider =
+                createDecider(MIN_PARALLELISM, MAX_PARALLELISM, DATA_VOLUME_PER_TASK);
+
+        AllToAllBlockingResultInfo allToAllBlockingResultInfo =
+                createAllToAllBlockingResultInfo(
+                        new long[] {10L, 15L, 13L, 12L, 1L, 10L, 8L, 20L, 12L, 17L});
+        int dynamicSourceParallelism = 4;
+        ParallelismAndInputInfos parallelismAndInputInfos =
+                decider.decideParallelismAndInputInfosForVertex(
+                        new JobVertexID(),
+                        Collections.singletonList(allToAllBlockingResultInfo),
+                        -1,
+                        dynamicSourceParallelism,
+                        MAX_PARALLELISM);
+
+        assertThat(parallelismAndInputInfos.getParallelism()).isEqualTo(4);
+        assertThat(parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(1);
+
+        checkAllToAllJobVertexInputInfo(
+                Iterables.getOnlyElement(
+                        parallelismAndInputInfos.getJobVertexInputInfos().values()),
+                Arrays.asList(
+                        new IndexRange(0, 1),
+                        new IndexRange(2, 5),
+                        new IndexRange(6, 7),
+                        new IndexRange(8, 9)));
+    }
+
     @Test
     void testEvenlyDistributeDataWithMaxSubpartitionLimitation() {
         long[] subpartitionBytes = new long[1024];
@@ -431,7 +462,7 @@ class DefaultVertexParallelismAndInputInfosDeciderTest {
         final DefaultVertexParallelismAndInputInfosDecider decider =
                 createDecider(minParallelism, maxParallelism, dataVolumePerTask);
         return decider.decideParallelismAndInputInfosForVertex(
-                new JobVertexID(), consumedResults, -1, maxParallelism);
+                new JobVertexID(), consumedResults, -1, minParallelism, maxParallelism);
     }
 
     private AllToAllBlockingResultInfo createAllToAllBlockingResultInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index c0d77a4820c..ea9455ac45f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.source.coordinator;
 
 import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
 import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.DynamicFilteringInfo;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReader;
@@ -536,6 +537,25 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
         assertThat(store.get(listeningID)).isNotNull().isSameAs(coordinator);
     }
 
+    @Test
+    public void testInferSourceParallelismAsync() throws Exception {
+        final String listeningID = "testListeningID";
+
+        class TestDynamicFilteringEvent implements SourceEvent, DynamicFilteringInfo {}
+
+        CoordinatorStore store = new CoordinatorStoreImpl();
+        store.putIfAbsent(listeningID, new SourceEventWrapper(new TestDynamicFilteringEvent()));
+        final SourceCoordinator<?, ?> coordinator =
+                new SourceCoordinator<>(
+                        OPERATOR_NAME,
+                        createMockSource(),
+                        context,
+                        store,
+                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                        listeningID);
+        assertThat(coordinator.inferSourceParallelismAsync(2, 1).get()).isEqualTo(2);
+    }
+
     // ------------------------------------------------------------------------
     //  test helpers
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/TestingSplitEnumerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/TestingSplitEnumerator.java
index aba85ac333a..7eba9e64c16 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/TestingSplitEnumerator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/TestingSplitEnumerator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.source.coordinator;
 
 import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.DynamicParallelismInference;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReader;
@@ -218,7 +219,7 @@ public class TestingSplitEnumerator<SplitT extends SourceSplit>
 
     @SuppressWarnings("serial")
     static class FactorySource<T, SplitT extends SourceSplit>
-            implements Source<T, SplitT, Set<SplitT>> {
+            implements Source<T, SplitT, Set<SplitT>>, DynamicParallelismInference {
 
         private final SimpleVersionedSerializer<SplitT> splitSerializer;
         private final SimpleVersionedSerializer<Set<SplitT>> checkpointSerializer;
@@ -261,5 +262,10 @@ public class TestingSplitEnumerator<SplitT extends SourceSplit>
         public SimpleVersionedSerializer<Set<SplitT>> getEnumeratorCheckpointSerializer() {
             return checkpointSerializer;
         }
+
+        @Override
+        public int inferParallelism(Context context) {
+            return context.getParallelismInferenceUpperBound();
+        }
     }
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringEvent.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringEvent.java
index cb444c2ce18..a575a483319 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringEvent.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringEvent.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.DynamicFilteringInfo;
 import org.apache.flink.api.connector.source.SourceEvent;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -29,7 +30,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * dynamic filtering, via DynamicFilteringDataCollectorCoordinator and SourceCoordinator.
  */
 @PublicEvolving
-public class DynamicFilteringEvent implements SourceEvent {
+public class DynamicFilteringEvent implements SourceEvent, DynamicFilteringInfo {
     private final DynamicFilteringData data;
 
     public DynamicFilteringEvent(DynamicFilteringData data) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
index 3e4d2004ced..5c8c8ba4836 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
@@ -19,8 +19,11 @@
 package org.apache.flink.test.scheduling;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.operators.SlotSharingGroup;
+import org.apache.flink.api.connector.source.DynamicParallelismInference;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
 import org.apache.flink.configuration.BatchExecutionOptions;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
@@ -68,6 +71,16 @@ class AdaptiveBatchSchedulerITCase {
         numberCountResults = new ConcurrentLinkedQueue<>();
     }
 
+    @Test
+    void testScheduling() throws Exception {
+        testSchedulingBase(false);
+    }
+
+    @Test
+    void testSchedulingWithDynamicSourceParallelismInference() throws Exception {
+        testSchedulingBase(true);
+    }
+
     @Test
     void testParallelismOfForwardGroupLargerThanGlobalMaxParallelism() throws Exception {
         final Configuration configuration = createConfiguration();
@@ -117,9 +130,8 @@ class AdaptiveBatchSchedulerITCase {
         env.execute();
     }
 
-    @Test
-    void testScheduling() throws Exception {
-        executeJob();
+    private void testSchedulingBase(Boolean useSourceParallelismInference) throws Exception {
+        executeJob(useSourceParallelismInference);
 
         Map<Long, Long> numberCountResultMap =
                 numberCountResults.stream()
@@ -138,7 +150,7 @@ class AdaptiveBatchSchedulerITCase {
         assertThat(numberCountResultMap).isEqualTo(expectedResult);
     }
 
-    private void executeJob() throws Exception {
+    private void executeJob(Boolean useSourceParallelismInference) throws Exception {
         final Configuration configuration = createConfiguration();
         configuration.set(ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING, true);
 
@@ -157,17 +169,36 @@ class AdaptiveBatchSchedulerITCase {
             slotSharingGroups.add(group);
         }
 
-        final DataStream<Long> source1 =
-                env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
-                        .setParallelism(SOURCE_PARALLELISM_1)
-                        .name("source1")
-                        .slotSharingGroup(slotSharingGroups.get(0));
-
-        final DataStream<Long> source2 =
-                env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
-                        .setParallelism(SOURCE_PARALLELISM_2)
-                        .name("source2")
-                        .slotSharingGroup(slotSharingGroups.get(1));
+        DataStream<Long> source1;
+        DataStream<Long> source2;
+
+        if (useSourceParallelismInference) {
+            source1 =
+                    env.fromSource(
+                                    new TestingParallelismInferenceNumberSequenceSource(
+                                            0, NUMBERS_TO_PRODUCE - 1, SOURCE_PARALLELISM_1),
+                                    WatermarkStrategy.noWatermarks(),
+                                    "source1")
+                            .slotSharingGroup(slotSharingGroups.get(0));
+            source2 =
+                    env.fromSource(
+                                    new TestingParallelismInferenceNumberSequenceSource(
+                                            0, NUMBERS_TO_PRODUCE - 1, SOURCE_PARALLELISM_2),
+                                    WatermarkStrategy.noWatermarks(),
+                                    "source2")
+                            .slotSharingGroup(slotSharingGroups.get(1));
+        } else {
+            source1 =
+                    env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
+                            .setParallelism(SOURCE_PARALLELISM_1)
+                            .name("source1")
+                            .slotSharingGroup(slotSharingGroups.get(0));
+            source2 =
+                    env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
+                            .setParallelism(SOURCE_PARALLELISM_2)
+                            .name("source2")
+                            .slotSharingGroup(slotSharingGroups.get(1));
+        }
 
         source1.union(source2)
                 .rescale()
@@ -210,4 +241,21 @@ class AdaptiveBatchSchedulerITCase {
             numberCountResults.add(numberCountResult);
         }
     }
+
+    private static class TestingParallelismInferenceNumberSequenceSource
+            extends NumberSequenceSource implements DynamicParallelismInference {
+        private static final long serialVersionUID = 1L;
+        private final int expectedParallelism;
+
+        public TestingParallelismInferenceNumberSequenceSource(
+                long from, long to, int expectedParallelism) {
+            super(from, to);
+            this.expectedParallelism = expectedParallelism;
+        }
+
+        @Override
+        public int inferParallelism(Context context) {
+            return expectedParallelism;
+        }
+    }
 }


(flink) 01/02: [hotfix] Remove redundant lazy initialization of LazyInitializedCoordinatorContext.

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit feb5f9267067e549ee40efd691ae756338cc2e0b
Author: sunxia <xi...@gmail.com>
AuthorDate: Tue Jan 9 18:54:10 2024 +0800

    [hotfix] Remove redundant lazy initialization of LazyInitializedCoordinatorContext.
---
 .../flink/runtime/operators/coordination/OperatorCoordinatorHolder.java  | 1 -
 1 file changed, 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index a8ddf937de2..ec7e35bc066 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -149,7 +149,6 @@ public class OperatorCoordinatorHolder
 
         this.globalFailureHandler = globalFailureHandler;
         this.mainThreadExecutor = mainThreadExecutor;
-        context.lazyInitialize(globalFailureHandler, mainThreadExecutor, checkpointCoordinator);
 
         context.lazyInitialize(globalFailureHandler, mainThreadExecutor, checkpointCoordinator);
         setupAllSubtaskGateways();