You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/02/24 17:24:29 UTC

[flink] 02/02: [FLINK-21399][coordination][tests] Provide enough slots for job deployment

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7c286d78826a3bac7a3a99c0335854292217e231
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Feb 22 08:31:40 2021 +0100

    [FLINK-21399][coordination][tests] Provide enough slots for job deployment
---
 .../flink/runtime/jobmaster/JobMasterTest.java     | 26 +++++++++++++++-------
 .../runtime/jobmaster/JobMasterTestUtils.java      | 15 ++++++++++++-
 2 files changed, 32 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index a1e6255..6f83dc1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -1065,7 +1065,6 @@ public class JobMasterTest extends TestLogger {
     }
 
     @Test
-    @Category(FailsWithAdaptiveScheduler.class) // FLINK-21399
     public void testRequestNextInputSplitWithGlobalFailover() throws Exception {
         configuration.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
         configuration.set(
@@ -1100,7 +1099,7 @@ public class JobMasterTest extends TestLogger {
         source.setInvokableClass(AbstractInvokable.class);
 
         final JobGraph inputSplitJobGraph = new JobGraph(source);
-        jobGraph.setJobType(JobType.STREAMING);
+        inputSplitJobGraph.setJobType(JobType.STREAMING);
 
         final ExecutionConfig executionConfig = new ExecutionConfig();
         executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 0));
@@ -1119,6 +1118,10 @@ public class JobMasterTest extends TestLogger {
             final JobMasterGateway jobMasterGateway =
                     jobMaster.getSelfGateway(JobMasterGateway.class);
 
+            registerSlotsRequiredForJobExecution(jobMasterGateway, parallelism);
+
+            waitUntilAllExecutionsAreScheduledOrDeployed(jobMasterGateway);
+
             final JobVertexID sourceId = source.getID();
 
             final List<AccessExecution> executions = getExecutions(jobMasterGateway, sourceId);
@@ -1139,8 +1142,6 @@ public class JobMasterTest extends TestLogger {
                     allRequestedInputSplits,
                     containsInAnyOrder(allInputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS)));
 
-            waitUntilAllExecutionsAreScheduled(jobMasterGateway);
-
             // fail the first execution to trigger a failover
             jobMasterGateway
                     .updateTaskExecutionState(
@@ -1148,7 +1149,7 @@ public class JobMasterTest extends TestLogger {
                     .get();
 
             // wait until the job has been recovered
-            waitUntilAllExecutionsAreScheduled(jobMasterGateway);
+            waitUntilAllExecutionsAreScheduledOrDeployed(jobMasterGateway);
 
             final ExecutionAttemptID restartedAttemptId =
                     getFirstExecution(jobMasterGateway, sourceId).getAttemptId();
@@ -1181,8 +1182,8 @@ public class JobMasterTest extends TestLogger {
         return () -> getInputSplit(jobMasterGateway, jobVertexID, initialAttemptId);
     }
 
-    private void waitUntilAllExecutionsAreScheduled(final JobMasterGateway jobMasterGateway)
-            throws Exception {
+    private void waitUntilAllExecutionsAreScheduledOrDeployed(
+            final JobMasterGateway jobMasterGateway) throws Exception {
         final Duration duration = Duration.ofMillis(testingTimeout.toMilliseconds());
         final Deadline deadline = Deadline.fromNow(duration);
 
@@ -1191,7 +1192,9 @@ public class JobMasterTest extends TestLogger {
                         getExecutions(jobMasterGateway).stream()
                                 .allMatch(
                                         execution ->
-                                                execution.getState() == ExecutionState.SCHEDULED),
+                                                execution.getState() == ExecutionState.SCHEDULED
+                                                        || execution.getState()
+                                                                == ExecutionState.DEPLOYING),
                 deadline);
     }
 
@@ -1977,4 +1980,11 @@ public class JobMasterTest extends TestLogger {
         @Override
         public void disposeStorageLocation() throws IOException {}
     }
+
+    private static void registerSlotsRequiredForJobExecution(
+            JobMasterGateway jobMasterGateway, int numSlots)
+            throws ExecutionException, InterruptedException {
+        JobMasterTestUtils.registerTaskExecutorAndOfferSlots(
+                rpcService, jobMasterGateway, numSlots, testingTimeout);
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java
index cd9609f..e4d3106 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java
@@ -20,14 +20,18 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
 
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -43,7 +47,16 @@ public class JobMasterTestUtils {
             throws ExecutionException, InterruptedException {
 
         final TaskExecutorGateway taskExecutorGateway =
-                new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+                new TestingTaskExecutorGatewayBuilder()
+                        .setCancelTaskFunction(
+                                executionAttemptId -> {
+                                    jobMasterGateway.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    executionAttemptId, ExecutionState.CANCELED));
+                                    return CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .createTestingTaskExecutorGateway();
+
         final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
                 new LocalUnresolvedTaskManagerLocation();