You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/16 08:20:30 UTC

[GitHub] tillrohrmann closed pull request #7115: [FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out

tillrohrmann closed pull request #7115: [FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out
URL: https://github.com/apache/flink/pull/7115
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index cbf51037b8b..e3b501e52e8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -41,6 +41,7 @@
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -427,7 +428,18 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) {
 			deploymentFuture.whenComplete(
 				(Void ignored, Throwable failure) -> {
 					if (failure != null) {
-						markFailed(ExceptionUtils.stripCompletionException(failure));
+						final Throwable stripCompletionException = ExceptionUtils.stripCompletionException(failure);
+						final Throwable schedulingFailureCause;
+
+						if (stripCompletionException instanceof TimeoutException) {
+							schedulingFailureCause = new NoResourceAvailableException(
+								"Could not allocate enough slots within timeout of " + allocationTimeout + " to run the job. " +
+									"Please make sure that the cluster has enough resources.");
+						} else {
+							schedulingFailureCause = stripCompletionException;
+						}
+
+						markFailed(schedulingFailureCause);
 					}
 				});
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3b55e009116..56315e07146 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -911,7 +911,7 @@ public void scheduleForExecution() throws JobException {
 				final CompletableFuture<Void> schedulingJobVertexFuture = ejv.scheduleAll(
 					slotProvider,
 					allowQueuedScheduling,
-					LocationPreferenceConstraint.ALL,// since it is an input vertex, the input based location preferences should be empty
+					LocationPreferenceConstraint.ALL, // since it is an input vertex, the input based location preferences should be empty
 					Collections.emptySet());
 
 				schedulingFutures.add(schedulingJobVertexFuture);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index c108eaee36a..585e1badf47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -101,7 +101,42 @@ public void runJobWithMultipleRpcServices() throws Exception {
 	}
 
 	@Test
-	public void testHandleJobsWhenNotEnoughSlot() throws Exception {
+	public void testHandleStreamingJobsWhenNotEnoughSlot() throws Exception {
+		try {
+			setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.EAGER);
+			fail("Job should fail.");
+		} catch (JobExecutionException e) {
+			assertTrue(findThrowableWithMessage(e, "Job execution failed.").isPresent());
+			assertTrue(findThrowable(e, NoResourceAvailableException.class).isPresent());
+			assertTrue(findThrowableWithMessage(e, "Slots required: 2, slots allocated: 1").isPresent());
+		}
+	}
+
+	@Test
+	public void testHandleBatchJobsWhenNotEnoughSlot() throws Exception {
+		try {
+			setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.LAZY_FROM_SOURCES);
+			fail("Job should fail.");
+		} catch (JobExecutionException e) {
+			assertTrue(findThrowableWithMessage(e, "Job execution failed.").isPresent());
+			assertTrue(findThrowable(e, NoResourceAvailableException.class).isPresent());
+			assertTrue(findThrowableWithMessage(e, "Could not allocate enough slots").isPresent());
+		}
+	}
+
+	private void setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode scheduleMode) throws Exception {
+		final JobVertex vertex = new JobVertex("Test Vertex");
+		vertex.setParallelism(2);
+		vertex.setMaxParallelism(2);
+		vertex.setInvokableClass(BlockingNoOpInvokable.class);
+
+		final JobGraph jobGraph = new JobGraph("Test Job", vertex);
+		jobGraph.setScheduleMode(scheduleMode);
+
+		runHandleJobsWhenNotEnoughSlots(jobGraph);
+	}
+
+	private void runHandleJobsWhenNotEnoughSlots(final JobGraph jobGraph) throws Exception {
 		final Configuration configuration = getDefaultConfiguration();
 		configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 100L);
 
@@ -114,24 +149,7 @@ public void testHandleJobsWhenNotEnoughSlot() throws Exception {
 		try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
 			miniCluster.start();
 
-			final JobVertex vertex = new JobVertex("Test Vertex");
-			vertex.setParallelism(2);
-			vertex.setMaxParallelism(2);
-			vertex.setInvokableClass(BlockingNoOpInvokable.class);
-
-			final JobGraph jobGraph = new JobGraph("Test Job", vertex);
-			jobGraph.setScheduleMode(ScheduleMode.EAGER);
-
-			try {
-				miniCluster.executeJobBlocking(jobGraph);
-
-				fail("Job should fail.");
-			} catch (JobExecutionException e) {
-				assertTrue(findThrowableWithMessage(e, "Job execution failed.").isPresent());
-
-				assertTrue(findThrowable(e, NoResourceAvailableException.class).isPresent());
-				assertTrue(findThrowableWithMessage(e, "Slots required: 2, slots allocated: 1").isPresent());
-			}
+			miniCluster.executeJobBlocking(jobGraph);
 		}
 	}
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services