You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/02/24 17:24:29 UTC
[flink] 02/02: [FLINK-21399][coordination][tests] Provide enough
slots for job deployment
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7c286d78826a3bac7a3a99c0335854292217e231
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Feb 22 08:31:40 2021 +0100
[FLINK-21399][coordination][tests] Provide enough slots for job deployment
---
.../flink/runtime/jobmaster/JobMasterTest.java | 26 +++++++++++++++-------
.../runtime/jobmaster/JobMasterTestUtils.java | 15 ++++++++++++-
2 files changed, 32 insertions(+), 9 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index a1e6255..6f83dc1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -1065,7 +1065,6 @@ public class JobMasterTest extends TestLogger {
}
@Test
- @Category(FailsWithAdaptiveScheduler.class) // FLINK-21399
public void testRequestNextInputSplitWithGlobalFailover() throws Exception {
configuration.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
configuration.set(
@@ -1100,7 +1099,7 @@ public class JobMasterTest extends TestLogger {
source.setInvokableClass(AbstractInvokable.class);
final JobGraph inputSplitJobGraph = new JobGraph(source);
- jobGraph.setJobType(JobType.STREAMING);
+ inputSplitJobGraph.setJobType(JobType.STREAMING);
final ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 0));
@@ -1119,6 +1118,10 @@ public class JobMasterTest extends TestLogger {
final JobMasterGateway jobMasterGateway =
jobMaster.getSelfGateway(JobMasterGateway.class);
+ registerSlotsRequiredForJobExecution(jobMasterGateway, parallelism);
+
+ waitUntilAllExecutionsAreScheduledOrDeployed(jobMasterGateway);
+
final JobVertexID sourceId = source.getID();
final List<AccessExecution> executions = getExecutions(jobMasterGateway, sourceId);
@@ -1139,8 +1142,6 @@ public class JobMasterTest extends TestLogger {
allRequestedInputSplits,
containsInAnyOrder(allInputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS)));
- waitUntilAllExecutionsAreScheduled(jobMasterGateway);
-
// fail the first execution to trigger a failover
jobMasterGateway
.updateTaskExecutionState(
@@ -1148,7 +1149,7 @@ public class JobMasterTest extends TestLogger {
.get();
// wait until the job has been recovered
- waitUntilAllExecutionsAreScheduled(jobMasterGateway);
+ waitUntilAllExecutionsAreScheduledOrDeployed(jobMasterGateway);
final ExecutionAttemptID restartedAttemptId =
getFirstExecution(jobMasterGateway, sourceId).getAttemptId();
@@ -1181,8 +1182,8 @@ public class JobMasterTest extends TestLogger {
return () -> getInputSplit(jobMasterGateway, jobVertexID, initialAttemptId);
}
- private void waitUntilAllExecutionsAreScheduled(final JobMasterGateway jobMasterGateway)
- throws Exception {
+ private void waitUntilAllExecutionsAreScheduledOrDeployed(
+ final JobMasterGateway jobMasterGateway) throws Exception {
final Duration duration = Duration.ofMillis(testingTimeout.toMilliseconds());
final Deadline deadline = Deadline.fromNow(duration);
@@ -1191,7 +1192,9 @@ public class JobMasterTest extends TestLogger {
getExecutions(jobMasterGateway).stream()
.allMatch(
execution ->
- execution.getState() == ExecutionState.SCHEDULED),
+ execution.getState() == ExecutionState.SCHEDULED
+ || execution.getState()
+ == ExecutionState.DEPLOYING),
deadline);
}
@@ -1977,4 +1980,11 @@ public class JobMasterTest extends TestLogger {
@Override
public void disposeStorageLocation() throws IOException {}
}
+
+ private static void registerSlotsRequiredForJobExecution(
+ JobMasterGateway jobMasterGateway, int numSlots)
+ throws ExecutionException, InterruptedException {
+ JobMasterTestUtils.registerTaskExecutorAndOfferSlots(
+ rpcService, jobMasterGateway, numSlots, testingTimeout);
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java
index cd9609f..e4d3106 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java
@@ -20,14 +20,18 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -43,7 +47,16 @@ public class JobMasterTestUtils {
throws ExecutionException, InterruptedException {
final TaskExecutorGateway taskExecutorGateway =
- new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+ new TestingTaskExecutorGatewayBuilder()
+ .setCancelTaskFunction(
+ executionAttemptId -> {
+ jobMasterGateway.updateTaskExecutionState(
+ new TaskExecutionState(
+ executionAttemptId, ExecutionState.CANCELED));
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ })
+ .createTestingTaskExecutorGateway();
+
final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
new LocalUnresolvedTaskManagerLocation();