You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/24 10:02:25 UTC

[GitHub] [flink] wanglijie95 opened a new pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

wanglijie95 opened a new pull request #18462:
URL: https://github.com/apache/flink/pull/18462


   ## What is the purpose of the change
   Introduce AdaptiveBatchScheduler, which can automatically decide parallelisms of job vertices for batch jobs.
   
   ## Brief change log
   ba370fbb3323668cbd9cd5b81858ebe09a82c785 Introduce AdaptiveBatchScheduler.
   4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155  AdaptiveBatchScheduler updates JobGraph and JsonPlan each time the parallelism is changed
   
   
   
   ## Verifying this change
   UT `AdaptiveBatchSchedulerTest`
   ITCase `AdaptiveBatchSchedulerITCase`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (**no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**)
     - The serializers: (**no**)
     - The runtime per-record code paths (performance sensitive): (**no**)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**no**)
     - The S3 file system connector: (**no**)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes**)
     - If yes, how is the feature documented? (**JavaDocs**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #18462:
URL: https://github.com/apache/flink/pull/18462#discussion_r791884986



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
##########
@@ -190,10 +191,24 @@ private void maybeSetParallelism(final ExecutionJobVertex jobVertex) {
                     jobVertex.getName(),
                     parallelism);
 
-            jobVertex.setParallelism(parallelism);
+            changeJobVertexParallelism(jobVertex, parallelism);
         }
     }
 
+    private void changeJobVertexParallelism(ExecutionJobVertex jobVertex, int parallelism) {
+        // update PlanJson

Review comment:
       Would you add some comments for why we need to update the plan json?  e.g. "it is needed to enable REST APIs to return the latest parallelism of job vertices."

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##########
@@ -676,7 +727,7 @@ private void notifyCoordinatorOfCancellation(ExecutionVertex vertex) {
 
         @Override
         public ResourceProfile getResourceProfile(final ExecutionVertexID executionVertexId) {
-            return getExecutionVertex(executionVertexId).getResourceProfile();
+            return getExecutionJobVertex(executionVertexId.getJobVertexId()).getResourceProfile();

Review comment:
       What's this change for?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -762,6 +762,11 @@ public void setInternalTaskFailuresListener(
     //  Actions
     // --------------------------------------------------------------------------------------------
 
+    @Override
+    public void notifyNewJobVertexInitialized(List<ExecutionJobVertex> vertices) {

Review comment:
       maybe `notifyNewlyInitializedJobVertices()`? because the job vertices are just newly initialized rather than newly added.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.util.SlotSelectionStrategyUtils;
+import org.apache.flink.util.clock.SystemClock;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Factory for {@link AdaptiveBatchScheduler}. */
+public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory {
+
+    @Override
+    public SchedulerNG createInstance(
+            Logger log,
+            JobGraph jobGraph,
+            Executor ioExecutor,
+            Configuration jobMasterConfiguration,
+            SlotPoolService slotPoolService,
+            ScheduledExecutorService futureExecutor,
+            ClassLoader userCodeLoader,
+            CheckpointRecoveryFactory checkpointRecoveryFactory,
+            Time rpcTimeout,
+            BlobWriter blobWriter,
+            JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            Time slotRequestTimeout,
+            ShuffleMaster<?> shuffleMaster,
+            JobMasterPartitionTracker partitionTracker,
+            ExecutionDeploymentTracker executionDeploymentTracker,
+            long initializationTimestamp,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            FatalErrorHandler fatalErrorHandler,
+            JobStatusListener jobStatusListener)
+            throws Exception {
+
+        checkState(
+                jobGraph.getJobType() == JobType.BATCH,
+                "Adaptive batch scheduler only supports batch jobs");
+        checkIsAllBlockingGraph(jobGraph);
+
+        final SlotPool slotPool =
+                slotPoolService
+                        .castInto(SlotPool.class)
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "The DefaultScheduler requires a SlotPool."));
+
+        final SlotSelectionStrategy slotSelectionStrategy =
+                SlotSelectionStrategyUtils.selectSlotSelectionStrategy(
+                        JobType.BATCH, jobMasterConfiguration);
+        final PhysicalSlotRequestBulkChecker bulkChecker =
+                PhysicalSlotRequestBulkCheckerImpl.createFromSlotPool(
+                        slotPool, SystemClock.getInstance());
+        final PhysicalSlotProvider physicalSlotProvider =
+                new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
+        final ExecutionSlotAllocatorFactory allocatorFactory =
+                new SlotSharingExecutionSlotAllocatorFactory(
+                        physicalSlotProvider, false, bulkChecker, slotRequestTimeout);
+
+        final RestartBackoffTimeStrategy restartBackoffTimeStrategy =
+                RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(
+                                jobGraph.getSerializedExecutionConfig()
+                                        .deserializeValue(userCodeLoader)
+                                        .getRestartStrategy(),
+                                jobMasterConfiguration,
+                                jobGraph.isCheckpointingEnabled())
+                        .create();
+        log.info(
+                "Using restart back off time strategy {} for {} ({}).",
+                restartBackoffTimeStrategy,
+                jobGraph.getName(),
+                jobGraph.getJobID());
+
+        final ExecutionGraphFactory executionGraphFactory =
+                new DefaultExecutionGraphFactory(
+                        jobMasterConfiguration,
+                        userCodeLoader,
+                        executionDeploymentTracker,
+                        futureExecutor,
+                        ioExecutor,
+                        rpcTimeout,
+                        jobManagerJobMetricGroup,
+                        blobWriter,
+                        shuffleMaster,
+                        partitionTracker,
+                        true);
+
+        return new AdaptiveBatchScheduler(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                bulkChecker::start,
+                new ScheduledExecutorServiceAdapter(futureExecutor),
+                userCodeLoader,
+                new CheckpointsCleaner(),
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                new VertexwiseSchedulingStrategy.Factory(),
+                FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
+                restartBackoffTimeStrategy,
+                new DefaultExecutionVertexOperations(),
+                new ExecutionVertexVersioner(),
+                allocatorFactory,
+                initializationTimestamp,
+                mainThreadExecutor,
+                jobStatusListener,
+                executionGraphFactory,
+                shuffleMaster,
+                rpcTimeout,
+                DefaultVertexParallelismDecider.from(jobMasterConfiguration),
+                jobMasterConfiguration.getInteger(
+                        JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM));
+    }
+
+    private void checkIsAllBlockingGraph(final JobGraph jobGraph) {
+        for (JobVertex jobVertex : jobGraph.getVertices()) {
+            for (IntermediateDataSet dataSet : jobVertex.getProducedDataSets()) {
+                checkState(
+                        dataSet.getResultType().isBlocking(),
+                        "Adaptive batch scheduler currently only supports ALL_EDGES_BLOCKING jobs.");

Review comment:
       Let's refine the message to point users to the config `execution.batch-shuffle-mode` and the required value should be `ALL_EXCHANGES_BLOCKING`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtils.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SlotSelectionStrategyUtils {

Review comment:
       java docs is needed

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalVertex;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexOperations;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This scheduler decides the parallelism of JobVertex according to the data volume it consumes. A
+ * dynamically built up ExecutionGraph is used for this purpose.
+ */
+public class AdaptiveBatchScheduler extends DefaultScheduler implements SchedulerOperations {
+
+    private final Logger log;

Review comment:
       It is not needed because there is a `DefaultScheduler#log` already.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.util.SlotSelectionStrategyUtils;
+import org.apache.flink.util.clock.SystemClock;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Factory for {@link AdaptiveBatchScheduler}. */
+public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory {
+
+    @Override
+    public SchedulerNG createInstance(
+            Logger log,
+            JobGraph jobGraph,
+            Executor ioExecutor,
+            Configuration jobMasterConfiguration,
+            SlotPoolService slotPoolService,
+            ScheduledExecutorService futureExecutor,
+            ClassLoader userCodeLoader,
+            CheckpointRecoveryFactory checkpointRecoveryFactory,
+            Time rpcTimeout,
+            BlobWriter blobWriter,
+            JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            Time slotRequestTimeout,
+            ShuffleMaster<?> shuffleMaster,
+            JobMasterPartitionTracker partitionTracker,
+            ExecutionDeploymentTracker executionDeploymentTracker,
+            long initializationTimestamp,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            FatalErrorHandler fatalErrorHandler,
+            JobStatusListener jobStatusListener)
+            throws Exception {
+
+        checkState(
+                jobGraph.getJobType() == JobType.BATCH,
+                "Adaptive batch scheduler only supports batch jobs");
+        checkIsAllBlockingGraph(jobGraph);
+
+        final SlotPool slotPool =
+                slotPoolService
+                        .castInto(SlotPool.class)
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "The DefaultScheduler requires a SlotPool."));
+
+        final SlotSelectionStrategy slotSelectionStrategy =
+                SlotSelectionStrategyUtils.selectSlotSelectionStrategy(
+                        JobType.BATCH, jobMasterConfiguration);
+        final PhysicalSlotRequestBulkChecker bulkChecker =
+                PhysicalSlotRequestBulkCheckerImpl.createFromSlotPool(
+                        slotPool, SystemClock.getInstance());
+        final PhysicalSlotProvider physicalSlotProvider =
+                new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
+        final ExecutionSlotAllocatorFactory allocatorFactory =
+                new SlotSharingExecutionSlotAllocatorFactory(
+                        physicalSlotProvider, false, bulkChecker, slotRequestTimeout);
+
+        final RestartBackoffTimeStrategy restartBackoffTimeStrategy =
+                RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(
+                                jobGraph.getSerializedExecutionConfig()
+                                        .deserializeValue(userCodeLoader)
+                                        .getRestartStrategy(),
+                                jobMasterConfiguration,
+                                jobGraph.isCheckpointingEnabled())
+                        .create();
+        log.info(
+                "Using restart back off time strategy {} for {} ({}).",
+                restartBackoffTimeStrategy,
+                jobGraph.getName(),
+                jobGraph.getJobID());
+
+        final ExecutionGraphFactory executionGraphFactory =
+                new DefaultExecutionGraphFactory(
+                        jobMasterConfiguration,
+                        userCodeLoader,
+                        executionDeploymentTracker,
+                        futureExecutor,
+                        ioExecutor,
+                        rpcTimeout,
+                        jobManagerJobMetricGroup,
+                        blobWriter,
+                        shuffleMaster,
+                        partitionTracker,
+                        true);
+
+        return new AdaptiveBatchScheduler(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                bulkChecker::start,
+                new ScheduledExecutorServiceAdapter(futureExecutor),
+                userCodeLoader,
+                new CheckpointsCleaner(),
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                new VertexwiseSchedulingStrategy.Factory(),
+                FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
+                restartBackoffTimeStrategy,
+                new DefaultExecutionVertexOperations(),
+                new ExecutionVertexVersioner(),
+                allocatorFactory,
+                initializationTimestamp,
+                mainThreadExecutor,
+                jobStatusListener,
+                executionGraphFactory,
+                shuffleMaster,
+                rpcTimeout,
+                DefaultVertexParallelismDecider.from(jobMasterConfiguration),
+                jobMasterConfiguration.getInteger(
+                        JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM));
+    }
+
+    private void checkIsAllBlockingGraph(final JobGraph jobGraph) {

Review comment:
       maybe `checkAllExchangesBlocking`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalVertex;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexOperations;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This scheduler decides the parallelism of JobVertex according to the data volume it consumes. A
+ * dynamically built up ExecutionGraph is used for this purpose.
+ */
+public class AdaptiveBatchScheduler extends DefaultScheduler implements SchedulerOperations {
+
+    private final Logger log;
+
+    private final DefaultLogicalTopology logicalTopology;
+
+    private final VertexParallelismDecider vertexParallelismDecider;
+
+    AdaptiveBatchScheduler(
+            final Logger log,
+            final JobGraph jobGraph,
+            final Executor ioExecutor,
+            final Configuration jobMasterConfiguration,
+            final Consumer<ComponentMainThreadExecutor> startUpAction,
+            final ScheduledExecutor delayExecutor,
+            final ClassLoader userCodeLoader,
+            final CheckpointsCleaner checkpointsCleaner,
+            final CheckpointRecoveryFactory checkpointRecoveryFactory,
+            final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            final SchedulingStrategyFactory schedulingStrategyFactory,
+            final FailoverStrategy.Factory failoverStrategyFactory,
+            final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+            final ExecutionVertexOperations executionVertexOperations,
+            final ExecutionVertexVersioner executionVertexVersioner,
+            final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+            long initializationTimestamp,
+            final ComponentMainThreadExecutor mainThreadExecutor,
+            final JobStatusListener jobStatusListener,
+            final ExecutionGraphFactory executionGraphFactory,
+            final ShuffleMaster<?> shuffleMaster,
+            final Time rpcTimeout,
+            final VertexParallelismDecider vertexParallelismDecider,
+            int defaultMaxParallelism)
+            throws Exception {
+
+        super(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                startUpAction,
+                delayExecutor,
+                userCodeLoader,
+                checkpointsCleaner,
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                schedulingStrategyFactory,
+                failoverStrategyFactory,
+                restartBackoffTimeStrategy,
+                executionVertexOperations,
+                executionVertexVersioner,
+                executionSlotAllocatorFactory,
+                initializationTimestamp,
+                mainThreadExecutor,
+                jobStatusListener,
+                executionGraphFactory,
+                shuffleMaster,
+                rpcTimeout,
+                computeVertexParallelismStoreForDynamicGraph(
+                        jobGraph.getVertices(), defaultMaxParallelism));
+
+        this.log = log;
+
+        this.logicalTopology = DefaultLogicalTopology.fromJobGraph(jobGraph);
+
+        this.vertexParallelismDecider = vertexParallelismDecider;
+    }
+
+    @Override
+    public void startSchedulingInternal() {
+        initializeVerticesIfPossible();
+
+        super.startSchedulingInternal();
+    }
+
+    @Override
+    protected void updateTaskExecutionStateInternal(
+            final ExecutionVertexID executionVertexId,
+            final TaskExecutionStateTransition taskExecutionState) {
+
+        initializeVerticesIfPossible();
+
+        super.updateTaskExecutionStateInternal(executionVertexId, taskExecutionState);
+    }
+
+    private void initializeVerticesIfPossible() {
+        final List<ExecutionJobVertex> newlyInitializedJobVertices = new ArrayList<>();
+        try {
+            final long createTimestamp = System.currentTimeMillis();
+            for (ExecutionJobVertex jobVertex : getExecutionGraph().getVerticesTopologically()) {
+                maybeSetParallelism(jobVertex);
+            }
+            for (ExecutionJobVertex jobVertex : getExecutionGraph().getVerticesTopologically()) {
+                if (canInitialize(jobVertex)) {
+                    getExecutionGraph().initializeJobVertex(jobVertex, createTimestamp);
+                    newlyInitializedJobVertices.add(jobVertex);
+                }
+            }
+        } catch (JobException ex) {
+            log.error("Unexpected error occurred when initializing ExecutionJobVertex", ex);
+            failJob(ex, System.currentTimeMillis());
+        }
+
+        if (newlyInitializedJobVertices.size() > 0) {
+            updateTopology(newlyInitializedJobVertices);
+        }
+    }
+
+    private void maybeSetParallelism(final ExecutionJobVertex jobVertex) {
+        if (jobVertex.isParallelismDecided()) {
+            return;
+        }
+
+        List<BlockingResultInfo> consumedResultsInfo = tryGetConsumedResultsInfo(jobVertex);
+        if (consumedResultsInfo != null) {
+            int parallelism =
+                    vertexParallelismDecider.decideParallelismForVertex(consumedResultsInfo);
+
+            log.info(
+                    "JobVertex: {} parallelism is decided as: {}.",
+                    jobVertex.getName(),
+                    parallelism);
+
+            jobVertex.setParallelism(parallelism);
+        }
+    }
+
+    /** Get information of consumable results. */
+    @Nullable
+    private List<BlockingResultInfo> tryGetConsumedResultsInfo(final ExecutionJobVertex jobVertex) {

Review comment:
       According to previous discussion in Flink dev ML, it's better to use `Optional` as return value than `Nullable` ones.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalVertex;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexOperations;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This scheduler decides the parallelism of JobVertex according to the data volume it consumes. A
+ * dynamically built up ExecutionGraph is used for this purpose.
+ */
+public class AdaptiveBatchScheduler extends DefaultScheduler implements SchedulerOperations {
+
+    private final Logger log;
+
+    private final DefaultLogicalTopology logicalTopology;
+
+    private final VertexParallelismDecider vertexParallelismDecider;
+
+    AdaptiveBatchScheduler(
+            final Logger log,
+            final JobGraph jobGraph,
+            final Executor ioExecutor,
+            final Configuration jobMasterConfiguration,
+            final Consumer<ComponentMainThreadExecutor> startUpAction,
+            final ScheduledExecutor delayExecutor,
+            final ClassLoader userCodeLoader,
+            final CheckpointsCleaner checkpointsCleaner,
+            final CheckpointRecoveryFactory checkpointRecoveryFactory,
+            final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            final SchedulingStrategyFactory schedulingStrategyFactory,
+            final FailoverStrategy.Factory failoverStrategyFactory,
+            final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+            final ExecutionVertexOperations executionVertexOperations,
+            final ExecutionVertexVersioner executionVertexVersioner,
+            final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+            long initializationTimestamp,
+            final ComponentMainThreadExecutor mainThreadExecutor,
+            final JobStatusListener jobStatusListener,
+            final ExecutionGraphFactory executionGraphFactory,
+            final ShuffleMaster<?> shuffleMaster,
+            final Time rpcTimeout,
+            final VertexParallelismDecider vertexParallelismDecider,
+            int defaultMaxParallelism)
+            throws Exception {
+
+        super(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                startUpAction,
+                delayExecutor,
+                userCodeLoader,
+                checkpointsCleaner,
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                schedulingStrategyFactory,
+                failoverStrategyFactory,
+                restartBackoffTimeStrategy,
+                executionVertexOperations,
+                executionVertexVersioner,
+                executionSlotAllocatorFactory,
+                initializationTimestamp,
+                mainThreadExecutor,
+                jobStatusListener,
+                executionGraphFactory,
+                shuffleMaster,
+                rpcTimeout,
+                computeVertexParallelismStoreForDynamicGraph(
+                        jobGraph.getVertices(), defaultMaxParallelism));
+
+        this.log = log;
+
+        this.logicalTopology = DefaultLogicalTopology.fromJobGraph(jobGraph);
+
+        this.vertexParallelismDecider = vertexParallelismDecider;
+    }
+
+    @Override
+    public void startSchedulingInternal() {
+        initializeVerticesIfPossible();
+
+        super.startSchedulingInternal();
+    }
+
+    @Override
+    protected void updateTaskExecutionStateInternal(
+            final ExecutionVertexID executionVertexId,
+            final TaskExecutionStateTransition taskExecutionState) {
+
+        initializeVerticesIfPossible();
+
+        super.updateTaskExecutionStateInternal(executionVertexId, taskExecutionState);
+    }
+
+    private void initializeVerticesIfPossible() {
+        final List<ExecutionJobVertex> newlyInitializedJobVertices = new ArrayList<>();
+        try {
+            final long createTimestamp = System.currentTimeMillis();
+            for (ExecutionJobVertex jobVertex : getExecutionGraph().getVerticesTopologically()) {
+                maybeSetParallelism(jobVertex);
+            }
+            for (ExecutionJobVertex jobVertex : getExecutionGraph().getVerticesTopologically()) {
+                if (canInitialize(jobVertex)) {
+                    getExecutionGraph().initializeJobVertex(jobVertex, createTimestamp);
+                    newlyInitializedJobVertices.add(jobVertex);
+                }
+            }
+        } catch (JobException ex) {
+            log.error("Unexpected error occurred when initializing ExecutionJobVertex", ex);
+            failJob(ex, System.currentTimeMillis());
+        }
+
+        if (newlyInitializedJobVertices.size() > 0) {
+            updateTopology(newlyInitializedJobVertices);
+        }
+    }
+
+    private void maybeSetParallelism(final ExecutionJobVertex jobVertex) {
+        if (jobVertex.isParallelismDecided()) {
+            return;
+        }
+
+        List<BlockingResultInfo> consumedResultsInfo = tryGetConsumedResultsInfo(jobVertex);
+        if (consumedResultsInfo != null) {
+            int parallelism =
+                    vertexParallelismDecider.decideParallelismForVertex(consumedResultsInfo);
+
+            log.info(
+                    "JobVertex: {} parallelism is decided as: {}.",

Review comment:
       maybe refine the log a bit?
   
   "Parallelism of JobVertex: {} ({}) is decided to be {}.", jobVertex.getName(), jobVertex.getJobVertexId(), parallelism
   
   ID can be useful to distinguish vertices with the same name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18462:
URL: https://github.com/apache/flink/pull/18462#issuecomment-1019928259


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018",
       "triggerID" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018) 
   * a7758e78986cbf9857722ec0428b87e1c69172c7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a change in pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on a change in pull request #18462:
URL: https://github.com/apache/flink/pull/18462#discussion_r794441542



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtilTest.java
##########
@@ -162,20 +171,35 @@ public void testOneInputSplitsIntoTwo() {
         v2.getProducedDataSets().get(0).getConsumer().setForward(true);
         v2.getProducedDataSets().get(1).getConsumer().setForward(true);
 
-        Set<ForwardRegion> regions = computePipelinedRegions(v1, v2, v3, v4);
+        Set<ForwardGroup> groups = computeForwardGroups(v1, v2, v3, v4);
 
-        checkRegionSize(regions, 2, 3, 1);
+        checkGroupSize(groups, 1, 3);
     }
 
-    private static Set<ForwardRegion> computePipelinedRegions(JobVertex... vertices) {
+    private static Set<ForwardGroup> computeForwardGroups(JobVertex... vertices) throws Exception {
+        Arrays.asList(vertices).forEach(vertex -> vertex.setInvokableClass(NoOpInvokable.class));
+        ExecutionGraph executionGraph = createDynamicGraph(vertices);
         return new HashSet<>(
-                ForwardRegionComputeUtil.computeForwardRegions(Arrays.asList(vertices)).values());
+                ForwardGroupComputeUtil.computeForwardGroups(
+                                Arrays.asList(vertices), executionGraph::getJobVertex)
+                        .values());
     }
 
-    private static void checkRegionSize(
-            Set<ForwardRegion> regions, int numOfRegions, int... sizes) {
-        assertEquals(numOfRegions, regions.size());
+    private static void checkGroupSize(Set<ForwardGroup> groups, int numOfGroups, int... sizes) {
+        assertEquals(numOfGroups, groups.size());
         containsInAnyOrder(
-                regions.stream().map(ForwardRegion::size).collect(Collectors.toList()), sizes);
+                groups.stream().map(ForwardGroup::size).collect(Collectors.toList()), sizes);
+    }
+
+    private static DefaultExecutionGraph createDynamicGraph(JobVertex... vertices)

Review comment:
       Yes, but the forward groups will only be used in AdaptiveBatchScheduler(dynamic graph)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #18462:
URL: https://github.com/apache/flink/pull/18462#issuecomment-1019928259


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a change in pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on a change in pull request #18462:
URL: https://github.com/apache/flink/pull/18462#discussion_r794442557



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardregion/ForwardRegionComputeUtilTest.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.forwardregion;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+
+/** Unit tests for {@link ForwardRegionComputeUtil}. */
+public class ForwardRegionComputeUtilTest {

Review comment:
       fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #18462:
URL: https://github.com/apache/flink/pull/18462#discussion_r794401127



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
##########
@@ -67,13 +67,13 @@
      * Tarjan's strongly connected components algorithm</a>. For more details please see <a
      * href="https://issues.apache.org/jira/browse/FLINK-17330">FLINK-17330</a>.
      */
-    private static Set<Set<SchedulingExecutionVertex>> mergeRegionsOnCycles(
+    private static Set<Set<SchedulingExecutionVertex>> mergeGroupsOnCycles(

Review comment:
       I would keep the name unchanged because it is part of `SchedulingPipelinedRegionComputeUtil`.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtilTest.java
##########
@@ -162,20 +171,35 @@ public void testOneInputSplitsIntoTwo() {
         v2.getProducedDataSets().get(0).getConsumer().setForward(true);
         v2.getProducedDataSets().get(1).getConsumer().setForward(true);
 
-        Set<ForwardRegion> regions = computePipelinedRegions(v1, v2, v3, v4);
+        Set<ForwardGroup> groups = computeForwardGroups(v1, v2, v3, v4);
 
-        checkRegionSize(regions, 2, 3, 1);
+        checkGroupSize(groups, 1, 3);
     }
 
-    private static Set<ForwardRegion> computePipelinedRegions(JobVertex... vertices) {
+    private static Set<ForwardGroup> computeForwardGroups(JobVertex... vertices) throws Exception {
+        Arrays.asList(vertices).forEach(vertex -> vertex.setInvokableClass(NoOpInvokable.class));
+        ExecutionGraph executionGraph = createDynamicGraph(vertices);
         return new HashSet<>(
-                ForwardRegionComputeUtil.computeForwardRegions(Arrays.asList(vertices)).values());
+                ForwardGroupComputeUtil.computeForwardGroups(
+                                Arrays.asList(vertices), executionGraph::getJobVertex)
+                        .values());
     }
 
-    private static void checkRegionSize(
-            Set<ForwardRegion> regions, int numOfRegions, int... sizes) {
-        assertEquals(numOfRegions, regions.size());
+    private static void checkGroupSize(Set<ForwardGroup> groups, int numOfGroups, int... sizes) {
+        assertEquals(numOfGroups, groups.size());
         containsInAnyOrder(
-                regions.stream().map(ForwardRegion::size).collect(Collectors.toList()), sizes);
+                groups.stream().map(ForwardGroup::size).collect(Collectors.toList()), sizes);
+    }
+
+    private static DefaultExecutionGraph createDynamicGraph(JobVertex... vertices)

Review comment:
       Looks to me the test does not require a dynamic graph?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #18462:
URL: https://github.com/apache/flink/pull/18462#discussion_r794325891



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/** Test for {@link AdaptiveBatchScheduler}. */
+public class AdaptiveBatchSchedulerTest extends TestLogger {
+
+    private static final int SOURCE_PARALLELISM_1 = 6;
+    private static final int SOURCE_PARALLELISM_2 = 4;
+
+    private static final ComponentMainThreadExecutor mainThreadExecutor =
+            ComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+    @Test
+    public void testAdaptiveBatchScheduler() throws Exception {
+        final JobVertex source1 = createJobVertex("source1", SOURCE_PARALLELISM_1);
+        final JobVertex source2 = createJobVertex("source2", SOURCE_PARALLELISM_2);
+        final JobVertex sink = createJobVertex("sink", -1);
+        sink.connectNewDataSetAsInput(
+                source1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+        sink.connectNewDataSetAsInput(
+                source2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+        Configuration configuration = new Configuration();
+        configuration.set(
+                JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.AdaptiveBatch);
+
+        final JobGraph jobGraph = new JobGraph(new JobID(), "test job", source1, source2, sink);
+
+        final AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder schedulerBuilder =
+                (AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder)
+                        new AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder(
+                                        jobGraph, mainThreadExecutor)
+                                .setJobMasterConfiguration(configuration);
+        schedulerBuilder.setJobVertexParallelismDecider((ignored) -> 10);
+
+        final AdaptiveBatchScheduler scheduler = schedulerBuilder.build();
+        final DefaultExecutionGraph graph = (DefaultExecutionGraph) scheduler.getExecutionGraph();
+        final ExecutionJobVertex sinkExecutionJobVertex = graph.getJobVertex(sink.getID());
+
+        scheduler.startScheduling();
+        assertThat(sinkExecutionJobVertex.getParallelism(), is(-1));
+
+        // trigger source1 finished.
+        transitionExecutionsState(scheduler, ExecutionState.FINISHED, source1);
+        assertThat(sinkExecutionJobVertex.getParallelism(), is(-1));
+
+        // trigger source2 finished.
+        transitionExecutionsState(scheduler, ExecutionState.FINISHED, source2);
+        assertThat(sinkExecutionJobVertex.getParallelism(), is(10));
+
+        // check that the jobGraph is updated
+        assertThat(sink.getParallelism(), is(10));
+    }
+
+    @Test
+    public void testDecideParallelismForForwardTarget() throws Exception {
+        final JobVertex source1 = createJobVertex("source1", SOURCE_PARALLELISM_1);
+        final JobVertex source2 = createJobVertex("source2", SOURCE_PARALLELISM_2);
+        final JobVertex sink = createJobVertex("sink", -1);
+        sink.connectNewDataSetAsInput(
+                source1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+        sink.connectNewDataSetAsInput(
+                source2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+        source1.getProducedDataSets().get(0).getConsumer().setForward(true);

Review comment:
       The scheduler creation of the two tests are almost the same except for these 2 lines, can we extract them into two methods `createJobGraph(boolean withForwardEdge)` and `createScheduler(JobGraph)` to reduce duplication?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18462:
URL: https://github.com/apache/flink/pull/18462#issuecomment-1019928259


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018",
       "triggerID" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30361",
       "triggerID" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30387",
       "triggerID" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "452eb5b9035acbe052290af017632157406a2ba2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30406",
       "triggerID" : "452eb5b9035acbe052290af017632157406a2ba2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a7758e78986cbf9857722ec0428b87e1c69172c7 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30361) 
   * 2dbbb86c848f3283f8351fcf016944f9138abca5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30387) 
   * 452eb5b9035acbe052290af017632157406a2ba2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30406) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18462:
URL: https://github.com/apache/flink/pull/18462#issuecomment-1019928259


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018",
       "triggerID" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30361",
       "triggerID" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a7758e78986cbf9857722ec0428b87e1c69172c7 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30361) 
   * 2dbbb86c848f3283f8351fcf016944f9138abca5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #18462:
URL: https://github.com/apache/flink/pull/18462#discussion_r794289920



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
##########
@@ -253,11 +252,11 @@ private void changeJobVertexParallelism(ExecutionJobVertex jobVertex, int parall
                         BlockingResultInfo.createFromIntermediateResult(intermediateResult));
             } else {
                 // not all inputs consumable, return null

Review comment:
       the comment is outdated

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
##########
@@ -173,12 +175,17 @@ public SchedulerNG createInstance(
                         JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM));
     }
 
-    private void checkIsAllBlockingGraph(final JobGraph jobGraph) {
+    private void checkAllExchangesBlocking(final JobGraph jobGraph) {
         for (JobVertex jobVertex : jobGraph.getVertices()) {
             for (IntermediateDataSet dataSet : jobVertex.getProducedDataSets()) {
                 checkState(
                         dataSet.getResultType().isBlocking(),
-                        "Adaptive batch scheduler currently only supports ALL_EDGES_BLOCKING jobs.");
+                        String.format(
+                                "At the moment, fine-grained resource management requires batch workloads "

Review comment:
       fine-grained resource management -> adaptive batch scheduler

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardregion/ForwardRegion.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.forwardregion;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** A forward region is a set of vertices connected via forward edges. */

Review comment:
       maybe add more docs like "Parallelisms of all job vertices in the same {@link ForwardRegion} must be the same."

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalVertex;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexOperations;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import org.apache.flink.runtime.scheduler.adaptivebatch.forwardregion.ForwardRegion;
+import org.apache.flink.runtime.scheduler.adaptivebatch.forwardregion.ForwardRegionComputeUtil;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This scheduler decides the parallelism of JobVertex according to the data volume it consumes. A
+ * dynamically built up ExecutionGraph is used for this purpose.
+ */
+public class AdaptiveBatchScheduler extends DefaultScheduler implements SchedulerOperations {
+
+    private final DefaultLogicalTopology logicalTopology;
+
+    private final VertexParallelismDecider vertexParallelismDecider;
+
+    private final Map<JobVertexID, ForwardRegion> forwardRegionsByJobVertexId;
+
+    AdaptiveBatchScheduler(
+            final Logger log,
+            final JobGraph jobGraph,
+            final Executor ioExecutor,
+            final Configuration jobMasterConfiguration,
+            final Consumer<ComponentMainThreadExecutor> startUpAction,
+            final ScheduledExecutor delayExecutor,
+            final ClassLoader userCodeLoader,
+            final CheckpointsCleaner checkpointsCleaner,
+            final CheckpointRecoveryFactory checkpointRecoveryFactory,
+            final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            final SchedulingStrategyFactory schedulingStrategyFactory,
+            final FailoverStrategy.Factory failoverStrategyFactory,
+            final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+            final ExecutionVertexOperations executionVertexOperations,
+            final ExecutionVertexVersioner executionVertexVersioner,
+            final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+            long initializationTimestamp,
+            final ComponentMainThreadExecutor mainThreadExecutor,
+            final JobStatusListener jobStatusListener,
+            final ExecutionGraphFactory executionGraphFactory,
+            final ShuffleMaster<?> shuffleMaster,
+            final Time rpcTimeout,
+            final VertexParallelismDecider vertexParallelismDecider,
+            int defaultMaxParallelism)
+            throws Exception {
+
+        super(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                startUpAction,
+                delayExecutor,
+                userCodeLoader,
+                checkpointsCleaner,
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                schedulingStrategyFactory,
+                failoverStrategyFactory,
+                restartBackoffTimeStrategy,
+                executionVertexOperations,
+                executionVertexVersioner,
+                executionSlotAllocatorFactory,
+                initializationTimestamp,
+                mainThreadExecutor,
+                jobStatusListener,
+                executionGraphFactory,
+                shuffleMaster,
+                rpcTimeout,
+                computeVertexParallelismStoreForDynamicGraph(
+                        jobGraph.getVertices(), defaultMaxParallelism));
+
+        this.logicalTopology = DefaultLogicalTopology.fromJobGraph(jobGraph);
+
+        this.vertexParallelismDecider = vertexParallelismDecider;
+
+        this.forwardRegionsByJobVertexId =
+                ForwardRegionComputeUtil.computeForwardRegions(jobGraph.getVertices());
+    }
+
+    @Override
+    public void startSchedulingInternal() {
+        initializeVerticesIfPossible();
+
+        super.startSchedulingInternal();
+    }
+
+    @Override
+    protected void updateTaskExecutionStateInternal(
+            final ExecutionVertexID executionVertexId,
+            final TaskExecutionStateTransition taskExecutionState) {
+
+        initializeVerticesIfPossible();
+
+        super.updateTaskExecutionStateInternal(executionVertexId, taskExecutionState);
+    }
+
+    private void initializeVerticesIfPossible() {
+        final List<ExecutionJobVertex> newlyInitializedJobVertices = new ArrayList<>();
+        try {
+            final long createTimestamp = System.currentTimeMillis();
+            for (ExecutionJobVertex jobVertex : getExecutionGraph().getVerticesTopologically()) {
+                maybeSetParallelism(jobVertex);
+            }
+            for (ExecutionJobVertex jobVertex : getExecutionGraph().getVerticesTopologically()) {
+                if (canInitialize(jobVertex)) {
+                    getExecutionGraph().initializeJobVertex(jobVertex, createTimestamp);
+                    newlyInitializedJobVertices.add(jobVertex);
+                }
+            }
+        } catch (JobException ex) {
+            log.error("Unexpected error occurred when initializing ExecutionJobVertex", ex);
+            failJob(ex, System.currentTimeMillis());
+        }
+
+        if (newlyInitializedJobVertices.size() > 0) {
+            updateTopology(newlyInitializedJobVertices);
+        }
+    }
+
+    private void maybeSetParallelism(final ExecutionJobVertex jobVertex) {
+        if (jobVertex.isParallelismDecided()) {
+            return;
+        }
+
+        Optional<List<BlockingResultInfo>> consumedResultsInfo =
+                tryGetConsumedResultsInfo(jobVertex);
+        if (!consumedResultsInfo.isPresent()) {
+            return;
+        }
+
+        ForwardRegion forwardRegion = forwardRegionsByJobVertexId.get(jobVertex.getJobVertexId());
+        int parallelism;
+        if (forwardRegion.isParallelismDecided()) {
+            parallelism = forwardRegion.getParallelism();
+            log.info(
+                    "Parallelism of JobVertex: {} ({}) is decided to be {} according to forward region's parallelism.",
+                    jobVertex.getName(),
+                    jobVertex.getJobVertexId(),
+                    parallelism);
+
+        } else {
+            parallelism =
+                    vertexParallelismDecider.decideParallelismForVertex(consumedResultsInfo.get());
+            forwardRegion.setParallelism(parallelism);
+
+            log.info(
+                    "Parallelism of JobVertex: {} ({}) is decided to be {}.",
+                    jobVertex.getName(),
+                    jobVertex.getJobVertexId(),
+                    parallelism);
+        }
+
+        changeJobVertexParallelism(jobVertex, parallelism);
+    }
+
+    private void changeJobVertexParallelism(ExecutionJobVertex jobVertex, int parallelism) {
+        // update PlanJson, it's needed to enable REST APIs to return the latest parallelism of job

Review comment:
       `PlanJson,` -> `the JSON plan.`

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardregion/ForwardRegion.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.forwardregion;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** A forward region is a set of vertices connected via forward edges. */
+public class ForwardRegion {
+
+    private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+
+    private final Set<JobVertexID> jobVertexIds = new HashSet<>();
+
+    public ForwardRegion(final Set<JobVertex> jobVertices) {

Review comment:
       maybe receive `Set<ExecutionJobVertex>` as the param and use `ExecutionJobVertex#isParallelismDecided()` in the logic below? It will be more consistent with the scheduler's view. 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
##########
@@ -173,12 +175,17 @@ public SchedulerNG createInstance(
                         JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM));
     }
 
-    private void checkIsAllBlockingGraph(final JobGraph jobGraph) {
+    private void checkAllExchangesBlocking(final JobGraph jobGraph) {
         for (JobVertex jobVertex : jobGraph.getVertices()) {
             for (IntermediateDataSet dataSet : jobVertex.getProducedDataSets()) {
                 checkState(
                         dataSet.getResultType().isBlocking(),
-                        "Adaptive batch scheduler currently only supports ALL_EDGES_BLOCKING jobs.");
+                        String.format(
+                                "At the moment, fine-grained resource management requires batch workloads "
+                                        + "to be executed with types of all edges being BLOCKING. "
+                                        + "To do that, you need to configure '%s' to '%s'.",
+                                ExecutionOptions.BATCH_SHUFFLE_MODE,

Review comment:
       ExecutionOptions.BATCH_SHUFFLE_MODE  -> ExecutionOptions.BATCH_SHUFFLE_MODE.key()

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardregion/ForwardRegionComputeUtil.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.forwardregion;
+
+import org.apache.flink.runtime.executiongraph.RegionComputeUtil;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Common utils for computing forward regions. */
+public class ForwardRegionComputeUtil {
+
+    public static Map<JobVertexID, ForwardRegion> computeForwardRegions(
+            final Iterable<JobVertex> topologicallySortedVertices) {
+
+        final Map<JobVertex, Set<JobVertex>> vertexToRegion = new IdentityHashMap<>();
+
+        // iterate all the vertices which are topologically sorted
+        for (JobVertex vertex : topologicallySortedVertices) {
+            Set<JobVertex> currentRegion = new HashSet<>();
+            currentRegion.add(vertex);
+            vertexToRegion.put(vertex, currentRegion);
+
+            for (JobEdge input : getForwardInputs(vertex)) {
+                final JobVertex producerVertex = input.getSource().getProducer();
+                final Set<JobVertex> producerRegion = vertexToRegion.get(producerVertex);
+
+                if (producerRegion == null) {
+                    throw new IllegalStateException(
+                            "Producer task "
+                                    + producerVertex.getID()
+                                    + " forward region is null"
+                                    + " while calculating forward region for the consumer task "
+                                    + vertex.getID()
+                                    + ". This should be a forward region building bug.");
+                }
+
+                if (currentRegion != producerRegion) {
+                    currentRegion =
+                            RegionComputeUtil.mergeRegions(
+                                    currentRegion, producerRegion, vertexToRegion);
+                }
+            }
+        }
+
+        Set<ForwardRegion> forwardRegions =
+                RegionComputeUtil.uniqueRegions(vertexToRegion).stream()
+                        .map(ForwardRegion::new)
+                        .collect(Collectors.toSet());

Review comment:
       I would suggest to exclude regions which only has one vertex, which means it is actually in no forward region. This allows use to avoid keeping unnecessary information.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18462:
URL: https://github.com/apache/flink/pull/18462#issuecomment-1019928259


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018",
       "triggerID" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30361",
       "triggerID" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30387",
       "triggerID" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "452eb5b9035acbe052290af017632157406a2ba2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30406",
       "triggerID" : "452eb5b9035acbe052290af017632157406a2ba2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d6151cbb9fe8146686df7b9eea917cc57a0559fe",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30445",
       "triggerID" : "d6151cbb9fe8146686df7b9eea917cc57a0559fe",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d6151cbb9fe8146686df7b9eea917cc57a0559fe Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30445) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18462:
URL: https://github.com/apache/flink/pull/18462#issuecomment-1019928259


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018",
       "triggerID" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30361",
       "triggerID" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018) 
   * a7758e78986cbf9857722ec0428b87e1c69172c7 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30361) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18462:
URL: https://github.com/apache/flink/pull/18462#issuecomment-1019928259


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018",
       "triggerID" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30361",
       "triggerID" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a7758e78986cbf9857722ec0428b87e1c69172c7 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30361) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18462:
URL: https://github.com/apache/flink/pull/18462#issuecomment-1019928259


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018",
       "triggerID" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18462:
URL: https://github.com/apache/flink/pull/18462#issuecomment-1019928259


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018",
       "triggerID" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30361",
       "triggerID" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30387",
       "triggerID" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a7758e78986cbf9857722ec0428b87e1c69172c7 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30361) 
   * 2dbbb86c848f3283f8351fcf016944f9138abca5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30387) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a change in pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on a change in pull request #18462:
URL: https://github.com/apache/flink/pull/18462#discussion_r794448485



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtilTest.java
##########
@@ -162,20 +171,35 @@ public void testOneInputSplitsIntoTwo() {
         v2.getProducedDataSets().get(0).getConsumer().setForward(true);
         v2.getProducedDataSets().get(1).getConsumer().setForward(true);
 
-        Set<ForwardRegion> regions = computePipelinedRegions(v1, v2, v3, v4);
+        Set<ForwardGroup> groups = computeForwardGroups(v1, v2, v3, v4);
 
-        checkRegionSize(regions, 2, 3, 1);
+        checkGroupSize(groups, 1, 3);
     }
 
-    private static Set<ForwardRegion> computePipelinedRegions(JobVertex... vertices) {
+    private static Set<ForwardGroup> computeForwardGroups(JobVertex... vertices) throws Exception {
+        Arrays.asList(vertices).forEach(vertex -> vertex.setInvokableClass(NoOpInvokable.class));
+        ExecutionGraph executionGraph = createDynamicGraph(vertices);
         return new HashSet<>(
-                ForwardRegionComputeUtil.computeForwardRegions(Arrays.asList(vertices)).values());
+                ForwardGroupComputeUtil.computeForwardGroups(
+                                Arrays.asList(vertices), executionGraph::getJobVertex)
+                        .values());
     }
 
-    private static void checkRegionSize(
-            Set<ForwardRegion> regions, int numOfRegions, int... sizes) {
-        assertEquals(numOfRegions, regions.size());
+    private static void checkGroupSize(Set<ForwardGroup> groups, int numOfGroups, int... sizes) {
+        assertEquals(numOfGroups, groups.size());
         containsInAnyOrder(
-                regions.stream().map(ForwardRegion::size).collect(Collectors.toList()), sizes);
+                groups.stream().map(ForwardGroup::size).collect(Collectors.toList()), sizes);
+    }
+
+    private static DefaultExecutionGraph createDynamicGraph(JobVertex... vertices)

Review comment:
       The `ForwardGroup` is only used in dyanmic graph, so I think using dynamic graph here is better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18462:
URL: https://github.com/apache/flink/pull/18462#issuecomment-1019928259


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018",
       "triggerID" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30361",
       "triggerID" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30387",
       "triggerID" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "452eb5b9035acbe052290af017632157406a2ba2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30406",
       "triggerID" : "452eb5b9035acbe052290af017632157406a2ba2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2dbbb86c848f3283f8351fcf016944f9138abca5 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30387) 
   * 452eb5b9035acbe052290af017632157406a2ba2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30406) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18462:
URL: https://github.com/apache/flink/pull/18462#issuecomment-1019928259


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018",
       "triggerID" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30361",
       "triggerID" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30387",
       "triggerID" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "452eb5b9035acbe052290af017632157406a2ba2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "452eb5b9035acbe052290af017632157406a2ba2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a7758e78986cbf9857722ec0428b87e1c69172c7 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30361) 
   * 2dbbb86c848f3283f8351fcf016944f9138abca5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30387) 
   * 452eb5b9035acbe052290af017632157406a2ba2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18462:
URL: https://github.com/apache/flink/pull/18462#issuecomment-1019928259


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018",
       "triggerID" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30361",
       "triggerID" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30387",
       "triggerID" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "452eb5b9035acbe052290af017632157406a2ba2",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30406",
       "triggerID" : "452eb5b9035acbe052290af017632157406a2ba2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 452eb5b9035acbe052290af017632157406a2ba2 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30406) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18462:
URL: https://github.com/apache/flink/pull/18462#issuecomment-1019928259


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018",
       "triggerID" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30361",
       "triggerID" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30387",
       "triggerID" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "452eb5b9035acbe052290af017632157406a2ba2",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30406",
       "triggerID" : "452eb5b9035acbe052290af017632157406a2ba2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d6151cbb9fe8146686df7b9eea917cc57a0559fe",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30445",
       "triggerID" : "d6151cbb9fe8146686df7b9eea917cc57a0559fe",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 452eb5b9035acbe052290af017632157406a2ba2 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30406) 
   * d6151cbb9fe8146686df7b9eea917cc57a0559fe Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30445) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #18462:
URL: https://github.com/apache/flink/pull/18462#issuecomment-1019925646


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155 (Mon Jan 24 10:07:18 UTC 2022)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18462:
URL: https://github.com/apache/flink/pull/18462#issuecomment-1019928259


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018",
       "triggerID" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #18462:
URL: https://github.com/apache/flink/pull/18462#discussion_r794327849



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardregion/ForwardRegionComputeUtilTest.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.forwardregion;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+
+/** Unit tests for {@link ForwardRegionComputeUtil}. */
+public class ForwardRegionComputeUtilTest {

Review comment:
       must `extends TestLogger`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18462:
URL: https://github.com/apache/flink/pull/18462#issuecomment-1019928259


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30018",
       "triggerID" : "4c28b1cd8efefa96ef9c0d7d4e425ed1a4ca8155",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30361",
       "triggerID" : "a7758e78986cbf9857722ec0428b87e1c69172c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30387",
       "triggerID" : "2dbbb86c848f3283f8351fcf016944f9138abca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "452eb5b9035acbe052290af017632157406a2ba2",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30406",
       "triggerID" : "452eb5b9035acbe052290af017632157406a2ba2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d6151cbb9fe8146686df7b9eea917cc57a0559fe",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d6151cbb9fe8146686df7b9eea917cc57a0559fe",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 452eb5b9035acbe052290af017632157406a2ba2 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30406) 
   * d6151cbb9fe8146686df7b9eea917cc57a0559fe UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk closed pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
zhuzhurk closed pull request #18462:
URL: https://github.com/apache/flink/pull/18462


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on pull request #18462:
URL: https://github.com/apache/flink/pull/18462#issuecomment-1024812248


   Squashing and merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org