You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2021/02/23 19:29:52 UTC

[flink] branch master updated: [FLINK-21101][Azure] Add nightly profile to run all tests with the adaptive scheduler

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dc0c416  [FLINK-21101][Azure] Add nightly profile to run all tests with the adaptive scheduler
dc0c416 is described below

commit dc0c416a580a3837139ccf3d24331f6fd9c53f36
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Thu Feb 18 16:52:21 2021 +0100

    [FLINK-21101][Azure] Add nightly profile to run all tests with the adaptive scheduler
---
 .../flink/connector/file/sink/FileSinkITBase.java  |  3 +
 .../file/src/FileSourceTextLinesITCase.java        |  3 +
 flink-end-to-end-tests/run-nightly-tests.sh        | 97 ++++++++++++----------
 .../flink/runtime/jobmaster/JobMasterTest.java     | 10 +++
 .../runtime/stream/table/TableSinkITCase.scala     |  6 +-
 .../junit/FailsWithAdaptiveScheduler.java          | 22 +++++
 .../EventTimeAllWindowCheckpointingITCase.java     |  3 +
 .../EventTimeWindowCheckpointingITCase.java        |  3 +
 .../test/checkpointing/LocalRecoveryITCase.java    |  3 +
 .../ProcessingTimeWindowCheckpointingITCase.java   |  3 +
 .../flink/test/checkpointing/SavepointITCase.java  |  3 +
 .../UnalignedCheckpointCompatibilityITCase.java    |  3 +
 .../checkpointing/UnalignedCheckpointITCase.java   |  3 +
 .../checkpointing/UnalignedCheckpointTestBase.java |  3 +
 .../test/streaming/runtime/TimestampITCase.java    |  3 +
 pom.xml                                            | 18 ++++
 tools/azure-pipelines/build-apache-repo.yml        | 11 +++
 tools/ci/test_controller.sh                        | 10 ++-
 18 files changed, 162 insertions(+), 45 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java
index 6c424ec..35ea76c 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java
@@ -27,10 +27,12 @@ import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runners.Parameterized;
 
@@ -61,6 +63,7 @@ public abstract class FileSinkITBase extends TestLogger {
     }
 
     @Test
+    @Category(FailsWithAdaptiveScheduler.class) // FLINK-21400
     public void testFileSink() throws Exception {
         String path = TEMPORARY_FOLDER.newFolder().getAbsolutePath();
 
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
index 09e6539..0e1a5bc 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -32,12 +32,14 @@ import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.FunctionWithException;
 
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
@@ -164,6 +166,7 @@ public class FileSourceTextLinesITCase extends TestLogger {
      * record format (text lines) and restarts TaskManager.
      */
     @Test
+    @Category(FailsWithAdaptiveScheduler.class)
     public void testContinuousTextFileSourceWithTaskManagerFailover() throws Exception {
         testContinuousTextFileSource(FailoverType.TM);
     }
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 3bb4ec4..eb0f8cd 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -57,6 +57,11 @@ if [ ! -z "$TF_BUILD" ] ; then
 	on_exit run_on_exit
 fi
 
+if [[ ${PROFILE} == *"enable-adaptive-scheduler"* ]]; then
+	echo "Enabling adaptive scheduler properties"
+	export JVM_ARGS="-Dflink.tests.enable-adaptive-scheduler=true"
+fi
+
 FLINK_DIR="`( cd \"$FLINK_DIR\" && pwd -P)`" # absolutized and normalized
 
 echo "flink-end-to-end-test directory: $END_TO_END_DIR"
@@ -89,36 +94,40 @@ printf "========================================================================
 # Checkpointing tests
 ################################################################################
 
-run_test "Resuming Savepoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file true"
-run_test "Resuming Savepoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file false"
-run_test "Resuming Savepoint (file, async, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file true"
-run_test "Resuming Savepoint (file, sync, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file false"
-run_test "Resuming Savepoint (file, async, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file true"
-run_test "Resuming Savepoint (file, sync, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file false"
-run_test "Resuming Savepoint (rocks, no parallelism change, heap timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 rocks false heap"
-run_test "Resuming Savepoint (rocks, scale up, heap timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks false heap"
-run_test "Resuming Savepoint (rocks, scale down, heap timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks false heap"
-run_test "Resuming Savepoint (rocks, no parallelism change, rocks timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 rocks false rocks"
-run_test "Resuming Savepoint (rocks, scale up, rocks timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks false rocks"
-run_test "Resuming Savepoint (rocks, scale down, rocks timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks false rocks"
-
-run_test "Resuming Externalized Checkpoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file true true" "skip_check_exceptions"
-run_test "Resuming Externalized Checkpoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file false true" "skip_check_exceptions"
-run_test "Resuming Externalized Checkpoint (file, async, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 file true true" "skip_check_exceptions"
-run_test "Resuming Externalized Checkpoint (file, sync, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 file false true" "skip_check_exceptions"
-run_test "Resuming Externalized Checkpoint (file, async, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 file true true" "skip_check_exceptions"
-run_test "Resuming Externalized Checkpoint (file, sync, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 file false true" "skip_check_exceptions"
-run_test "Resuming Externalized Checkpoint (rocks, non-incremental, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true false" "skip_check_exceptions"
-run_test "Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true true" "skip_check_exceptions"
-run_test "Resuming Externalized Checkpoint (rocks, non-incremental, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks true false" "skip_check_exceptions"
-run_test "Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks true true" "skip_check_exceptions"
-run_test "Resuming Externalized Checkpoint (rocks, non-incremental, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks true false" "skip_check_exceptions"
-run_test "Resuming Externalized Checkpoint (rocks, incremental, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks true true" "skip_check_exceptions"
-
-run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file true false true" "skip_check_exceptions"
-run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file false false true" "skip_check_exceptions"
-run_test "Resuming Externalized Checkpoint after terminal failure (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true false true" "skip_check_exceptions"
-run_test "Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true true true" "skip_check_exceptions"
+
+# Remove this condition once FLINK-21333 is done
+if [[ ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then
+	run_test "Resuming Savepoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file true"
+	run_test "Resuming Savepoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file false"
+	run_test "Resuming Savepoint (file, async, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file true"
+	run_test "Resuming Savepoint (file, sync, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file false"
+	run_test "Resuming Savepoint (file, async, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file true"
+	run_test "Resuming Savepoint (file, sync, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file false"
+	run_test "Resuming Savepoint (rocks, no parallelism change, heap timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 rocks false heap"
+	run_test "Resuming Savepoint (rocks, scale up, heap timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks false heap"
+	run_test "Resuming Savepoint (rocks, scale down, heap timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks false heap"
+	run_test "Resuming Savepoint (rocks, no parallelism change, rocks timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 rocks false rocks"
+	run_test "Resuming Savepoint (rocks, scale up, rocks timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks false rocks"
+	run_test "Resuming Savepoint (rocks, scale down, rocks timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks false rocks"
+
+	run_test "Resuming Externalized Checkpoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file true true" "skip_check_exceptions"
+	run_test "Resuming Externalized Checkpoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file false true" "skip_check_exceptions"
+	run_test "Resuming Externalized Checkpoint (file, async, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 file true true" "skip_check_exceptions"
+	run_test "Resuming Externalized Checkpoint (file, sync, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 file false true" "skip_check_exceptions"
+	run_test "Resuming Externalized Checkpoint (file, async, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 file true true" "skip_check_exceptions"
+	run_test "Resuming Externalized Checkpoint (file, sync, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 file false true" "skip_check_exceptions"
+	run_test "Resuming Externalized Checkpoint (rocks, non-incremental, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true false" "skip_check_exceptions"
+	run_test "Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true true" "skip_check_exceptions"
+	run_test "Resuming Externalized Checkpoint (rocks, non-incremental, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks true false" "skip_check_exceptions"
+	run_test "Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks true true" "skip_check_exceptions"
+	run_test "Resuming Externalized Checkpoint (rocks, non-incremental, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks true false" "skip_check_exceptions"
+	run_test "Resuming Externalized Checkpoint (rocks, incremental, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks true true" "skip_check_exceptions"
+
+	run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file true false true" "skip_check_exceptions"
+	run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file false false true" "skip_check_exceptions"
+	run_test "Resuming Externalized Checkpoint after terminal failure (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true false true" "skip_check_exceptions"
+	run_test "Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true true true" "skip_check_exceptions"
+fi
 
 run_test "RocksDB Memory Management end-to-end test" "$END_TO_END_DIR/test-scripts/test_rocksdb_state_memory_control.sh"
 
@@ -127,7 +136,7 @@ run_test "RocksDB Memory Management end-to-end test" "$END_TO_END_DIR/test-scrip
 ################################################################################
 
 # These tests are known to fail on JDK11. See FLINK-13719
-if [[ ${PROFILE} != *"jdk11"* ]]; then
+if [[ ${PROFILE} != *"jdk11"* && ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then
 	run_test "Wordcount on Docker test (custom fs plugin)" "$END_TO_END_DIR/test-scripts/test_docker_embedded_job.sh dummy-fs"
 
 	run_test "Run Kubernetes test" "$END_TO_END_DIR/test-scripts/test_kubernetes_embedded_job.sh"
@@ -185,12 +194,14 @@ run_test "Batch SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_sq
 run_test "Streaming SQL end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh old" "skip_check_exceptions"
 run_test "Streaming SQL end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh blink" "skip_check_exceptions"
 
-run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh local StreamingFileSink" "skip_check_exceptions"
-run_test "Streaming File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 StreamingFileSink" "skip_check_exceptions"
-run_test "New File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh local FileSink" "skip_check_exceptions"
-run_test "New File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 FileSink" "skip_check_exceptions"
+if [[ ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then # FLINK-21400
+  run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh local StreamingFileSink" "skip_check_exceptions"
+  run_test "Streaming File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 StreamingFileSink" "skip_check_exceptions"
+  run_test "New File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh local FileSink" "skip_check_exceptions"
+  run_test "New File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 FileSink" "skip_check_exceptions"
 
-run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
+  run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
+fi
 
 run_test "Netty shuffle direct memory consumption end-to-end test" "$END_TO_END_DIR/test-scripts/test_netty_shuffle_memory_control.sh"
 
@@ -236,12 +247,14 @@ fi
 # Sticky Scheduling
 ################################################################################
 
-run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 file false false" "skip_check_exceptions"
-run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 file false true" "skip_check_exceptions"
-run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks false false" "skip_check_exceptions"
-run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks true false" "skip_check_exceptions"
-run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks false true" "skip_check_exceptions"
-run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks true true" "skip_check_exceptions"
+if [[ ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then #FLINK-21450
+	run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 file false false" "skip_check_exceptions"
+	run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 file false true" "skip_check_exceptions"
+	run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks false false" "skip_check_exceptions"
+	run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks true false" "skip_check_exceptions"
+	run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks false true" "skip_check_exceptions"
+	run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks true true" "skip_check_exceptions"
+fi
 
 printf "\n[PASS] All bash e2e-tests passed\n"
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index ca04421..f0d2637 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -74,6 +74,7 @@ import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTr
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -127,6 +128,7 @@ import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.ClassLoaderUtils;
+import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -141,6 +143,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nonnull;
@@ -762,6 +765,7 @@ public class JobMasterTest extends TestLogger {
      * submission.
      */
     @Test
+    @Category(FailsWithAdaptiveScheduler.class) // FLINK-21398
     public void testRestoringFromSavepoint() throws Exception {
 
         // create savepoint data
@@ -803,6 +807,7 @@ public class JobMasterTest extends TestLogger {
      * allowed.
      */
     @Test
+    @Category(FailsWithAdaptiveScheduler.class) // FLINK-21398
     public void testRestoringModifiedJobFromSavepoint() throws Exception {
 
         // create savepoint data
@@ -860,6 +865,7 @@ public class JobMasterTest extends TestLogger {
 
     /** Tests that an existing checkpoint will have precedence over an savepoint. */
     @Test
+    @Category(FailsWithAdaptiveScheduler.class) // FLINK-21398
     public void testCheckpointPrecedesSavepointRecovery() throws Exception {
 
         // create savepoint data
@@ -1045,6 +1051,7 @@ public class JobMasterTest extends TestLogger {
      * if this execution fails.
      */
     @Test
+    @Category(FailsWithAdaptiveScheduler.class) // FLINK-21450
     public void testRequestNextInputSplitWithLocalFailover() throws Exception {
 
         configuration.setString(
@@ -1058,6 +1065,7 @@ public class JobMasterTest extends TestLogger {
     }
 
     @Test
+    @Category(FailsWithAdaptiveScheduler.class) // FLINK-21399
     public void testRequestNextInputSplitWithGlobalFailover() throws Exception {
         configuration.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
         configuration.set(
@@ -1092,6 +1100,7 @@ public class JobMasterTest extends TestLogger {
         source.setInvokableClass(AbstractInvokable.class);
 
         final JobGraph inputSplitJobGraph = new JobGraph(source);
+        jobGraph.setJobType(JobType.STREAMING);
 
         final ExecutionConfig executionConfig = new ExecutionConfig();
         executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 0));
@@ -1926,6 +1935,7 @@ public class JobMasterTest extends TestLogger {
     private JobGraph createJobGraphFromJobVerticesWithCheckpointing(
             SavepointRestoreSettings savepointRestoreSettings, JobVertex... jobVertices) {
         final JobGraph jobGraph = new JobGraph(jobVertices);
+        jobGraph.setJobType(JobType.STREAMING);
 
         // enable checkpointing which is required to resume from a savepoint
         final CheckpointCoordinatorConfiguration checkpoinCoordinatorConfiguration =
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index e2fa85c..bc52144 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -26,15 +26,15 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory.changelog
 import org.apache.flink.table.planner.runtime.utils.StreamingTestBase
 import org.apache.flink.table.planner.runtime.utils.TestData.{data1, nullData4, smallTupleData3, tupleData3, tupleData5}
 import org.apache.flink.table.utils.LegacyRowResource
+import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler
 import org.apache.flink.util.ExceptionUtils
-
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
+import org.junit.experimental.categories.Category
 import org.junit.{Rule, Test}
 
 import java.lang.{Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
 import java.util.concurrent.atomic.AtomicInteger
-
 import scala.collection.JavaConversions._
 import scala.util.{Failure, Success, Try}
 
@@ -691,6 +691,7 @@ class TableSinkITCase extends StreamingTestBase {
   }
 
   @Test
+  @Category(Array(classOf[FailsWithAdaptiveScheduler])) // FLINK-21403
   def testParallelismWithSinkFunction(): Unit = {
     val negativeParallelism = -1
     val validParallelism = 1
@@ -732,6 +733,7 @@ class TableSinkITCase extends StreamingTestBase {
   }
 
   @Test
+  @Category(Array(classOf[FailsWithAdaptiveScheduler])) // FLINK-21403
   def testParallelismWithOutputFormat(): Unit = {
     val negativeParallelism = -1
     val oversizedParallelism = Int.MaxValue
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/FailsWithAdaptiveScheduler.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/FailsWithAdaptiveScheduler.java
new file mode 100644
index 0000000..6d7215e
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/FailsWithAdaptiveScheduler.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+/** Marker for explicitly ignoring a test which fails with adaptive scheduler. */
+public interface FailsWithAdaptiveScheduler {}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index f5c1299..f97e880 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -37,11 +37,13 @@ import org.apache.flink.test.checkpointing.utils.FailingSource;
 import org.apache.flink.test.checkpointing.utils.IntType;
 import org.apache.flink.test.checkpointing.utils.ValidatingSink;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -53,6 +55,7 @@ import static org.junit.Assert.fail;
  * <p>This is a version of {@link EventTimeWindowCheckpointingITCase} for All-Windows.
  */
 @SuppressWarnings("serial")
+@Category(FailsWithAdaptiveScheduler.class) // FLINK-21400
 public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 
     private static final int PARALLELISM = 4;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 12bdeea..12f8cbd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -49,6 +49,7 @@ import org.apache.flink.test.checkpointing.utils.FailingSource;
 import org.apache.flink.test.checkpointing.utils.IntType;
 import org.apache.flink.test.checkpointing.utils.ValidatingSink;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -58,6 +59,7 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
@@ -84,6 +86,7 @@ import static org.junit.Assert.fail;
  */
 @SuppressWarnings("serial")
 @RunWith(Parameterized.class)
+@Category(FailsWithAdaptiveScheduler.class) // FLINK-21400
 public class EventTimeWindowCheckpointingITCase extends TestLogger {
 
     private static final int MAX_MEM_STATE_SIZE = 20 * 1024 * 1024;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
index 80a18be..29f1ff8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
@@ -20,11 +20,13 @@ package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -46,6 +48,7 @@ import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingIT
  * EventTimeWindowCheckpointingITCase}.
  */
 @RunWith(Parameterized.class)
+@Category(FailsWithAdaptiveScheduler.class) // FLINK-21400
 public class LocalRecoveryITCase extends TestLogger {
 
     @Rule public TestName testName = new TestName();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ProcessingTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ProcessingTimeWindowCheckpointingITCase.java
index b9177c5..0146fc0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ProcessingTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ProcessingTimeWindowCheckpointingITCase.java
@@ -38,11 +38,13 @@ import org.apache.flink.test.checkpointing.utils.FailingSource;
 import org.apache.flink.test.checkpointing.utils.IntType;
 import org.apache.flink.test.checkpointing.utils.ValidatingSink;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.Map;
 
@@ -56,6 +58,7 @@ import static org.junit.Assert.fail;
  * handled correctly.
  */
 @SuppressWarnings("serial")
+@Category(FailsWithAdaptiveScheduler.class) // FLINK-21400
 public class ProcessingTimeWindowCheckpointingITCase extends TestLogger {
 
     private static final int PARALLELISM = 4;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index a29c62b..ffef666 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -63,6 +63,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.testutils.EntropyInjectingTestFileSystem;
+import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
@@ -74,6 +75,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -440,6 +442,7 @@ public class SavepointITCase extends TestLogger {
     }
 
     @Test
+    @Category(FailsWithAdaptiveScheduler.class) // FLINK-21333
     public void testStopSavepointWithBoundedInput() throws Exception {
         final int numTaskManagers = 2;
         final int numSlotsPerTaskManager = 2;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
index 1281e8f..8285833 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.test.checkpointing.utils.AccumulatingIntegerSink;
 import org.apache.flink.test.checkpointing.utils.CancellingIntegerSource;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -38,6 +39,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -69,6 +71,7 @@ import static org.junit.Assert.assertEquals;
  * enabled/disabled).
  */
 @RunWith(Parameterized.class)
+@Category(FailsWithAdaptiveScheduler.class) // FLINK-21333
 public class UnalignedCheckpointCompatibilityITCase extends TestLogger {
 
     @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index 6afb602..03f2b6d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -37,9 +37,11 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.Collector;
 
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -95,6 +97,7 @@ import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterable
  * </ul>
  */
 @RunWith(Parameterized.class)
+@Category(FailsWithAdaptiveScheduler.class) // FLINK-21400
 public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
 
     @Parameterized.Parameters(name = "{0}")
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index 2e7fd36..ae059a3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -54,11 +54,13 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
 import org.junit.Rule;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ErrorCollector;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -85,6 +87,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.fail;
 
 /** Base class for tests related to unaligned checkpoints. */
+@Category(FailsWithAdaptiveScheduler.class) // FLINK-21400
 public abstract class UnalignedCheckpointTestBase extends TestLogger {
     protected static final Logger LOG = LoggerFactory.getLogger(UnalignedCheckpointTestBase.class);
     protected static final String NUM_OUTPUTS = "outputs";
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 113ac56..c26a7b7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -50,6 +50,7 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -57,6 +58,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -159,6 +161,7 @@ public class TimestampITCase extends TestLogger {
     }
 
     @Test
+    @Category(FailsWithAdaptiveScheduler.class) // FLINK-21333
     public void testWatermarkPropagationNoFinalWatermarkOnStop() throws Exception {
 
         // for this test to work, we need to be sure that no other jobs are being executed
diff --git a/pom.xml b/pom.xml
index 595d2ad..8310016 100644
--- a/pom.xml
+++ b/pom.xml
@@ -938,6 +938,24 @@ under the License.
 		</profile>
 
 		<profile>
+			<id>enable-adaptive-scheduler</id>
+			<build>
+				<plugins>
+					<plugin>
+						<groupId>org.apache.maven.plugins</groupId>
+						<artifactId>maven-surefire-plugin</artifactId>
+						<configuration>
+							<systemProperties>
+								<flink.tests.enable-adaptive-scheduler>true</flink.tests.enable-adaptive-scheduler>
+							</systemProperties>
+							<excludedGroups>org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler</excludedGroups>
+						</configuration>
+					</plugin>
+				</plugins>
+			</build>
+		</profile>
+
+		<profile>
 			<id>java11</id>
 			<activation>
 				<jdk>[11,)</jdk>
diff --git a/tools/azure-pipelines/build-apache-repo.yml b/tools/azure-pipelines/build-apache-repo.yml
index 7197d8e..1fc0f9e 100644
--- a/tools/azure-pipelines/build-apache-repo.yml
+++ b/tools/azure-pipelines/build-apache-repo.yml
@@ -157,6 +157,17 @@ stages:
           run_end_to_end: true
           container: flink-build-container
           jdk: jdk11
+      - template: jobs-template.yml
+        parameters:
+          stage_name: cron_adaptive_scheduler
+          test_pool_definition:
+            name: Default
+          e2e_pool_definition:
+            vmImage: 'ubuntu-16.04'
+          environment: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11 -Penable-adaptive-scheduler"
+          run_end_to_end: true
+          container: flink-build-container
+          jdk: jdk8
       - job: docs_404_check # run on a MSFT provided machine
         pool:
           vmImage: 'ubuntu-16.04'
diff --git a/tools/ci/test_controller.sh b/tools/ci/test_controller.sh
index 5a20fd9..470f5d3 100755
--- a/tools/ci/test_controller.sh
+++ b/tools/ci/test_controller.sh
@@ -106,10 +106,18 @@ if [ $STAGE == $STAGE_PYTHON ]; then
 else
 	MVN_TEST_OPTIONS="-Dflink.tests.with-openssl"
 	if [ $STAGE = $STAGE_LEGACY_SLOT_MANAGEMENT ]; then
+		if [[ ${PROFILE} == *"enable-adaptive-scheduler"* ]]; then
+			echo "Skipping legacy slot management test stage in adaptive scheduler job"
+			exit 0
+		fi
 		MVN_TEST_OPTIONS="$MVN_TEST_OPTIONS -Dflink.tests.disable-declarative"
 	fi
 	if [ $STAGE = $STAGE_FINEGRAINED_RESOURCE_MANAGEMENT ]; then
-	  MVN_TEST_OPTIONS="$MVN_TEST_OPTIONS -Dflink.tests.enable-fine-grained"
+		if [[ ${PROFILE} == *"enable-adaptive-scheduler"* ]]; then
+			echo "Skipping fine grained resource management test stage in adaptive scheduler job"
+			exit 0
+		fi
+		MVN_TEST_OPTIONS="$MVN_TEST_OPTIONS -Dflink.tests.enable-fine-grained"
 	fi
 	MVN_TEST_MODULES=$(get_test_modules_for_stage ${STAGE})