You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/23 23:49:40 UTC

[3/6] git commit: Improve error message when scheduler cannot find a slot for immediate scheduling.

Improve error message when scheduler cannot find a slot for immediate scheduling.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b87f2fa2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b87f2fa2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b87f2fa2

Branch: refs/heads/master
Commit: b87f2fa2a06841973c5aaf424e8984bda24a9276
Parents: 1ddec93
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 23 19:54:01 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 23 19:54:01 2014 +0200

----------------------------------------------------------------------
 .../scheduler/NoResourceAvailableException.java | 11 +++++++++--
 .../runtime/jobmanager/scheduler/Scheduler.java | 20 +++++++++++++++++---
 2 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b87f2fa2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
index c1c3f94..11fec72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -23,16 +23,23 @@ import org.apache.flink.runtime.JobException;
 public class NoResourceAvailableException extends JobException {
 
 	private static final long serialVersionUID = -2249953165298717803L;
+	
+	private static final String BASE_MESSAGE = "Not enough free slots available to run the job. "
+			+ "You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.";
 
 	public NoResourceAvailableException() {
-		super("Not enough free slots available to run the job. "
-				+ "You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.");
+		super(BASE_MESSAGE);
 	}
 	
 	public NoResourceAvailableException(ScheduledUnit unit) {
 		super("No resource available to schedule unit " + unit
 				+ ". You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.");
 	}
+	
+	NoResourceAvailableException(int numInstances, int numSlotsTotal) {
+		super(String.format("%s Resources available to scheduler: Number of instances=%d, total number of slots=%d", 
+				BASE_MESSAGE, numInstances, numSlotsTotal));
+	}
 
 	public NoResourceAvailableException(String message) {
 		super(message);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b87f2fa2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index a3b8471..9ef30b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -108,6 +108,20 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 		return count;
 	}
 	
+	public int getTotalNumberOfSlots() {
+		int count = 0;
+		
+		synchronized (globalLock) {
+			for (Instance instance : allInstances) {
+				if (instance.isAlive()) {
+					count += instance.getTotalNumberOfSlots();
+				}
+			}
+		}
+		
+		return count;
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  Scheduling
 	// --------------------------------------------------------------------------------------------
@@ -198,7 +212,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 						if (slotFromGroup == null) {
 							// both null
 							if (constraint == null || constraint.isUnassigned()) {
-								throw new NoResourceAvailableException();
+								throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots());
 							} else {
 								throw new NoResourceAvailableException("Could not allocate a slot on instance " + 
 											constraint.getLocation() + ", as required by the co-location constraint.");
@@ -271,7 +285,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 					return future;
 				}
 				else {
-					throw new NoResourceAvailableException(task);
+					throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots());
 				}
 			}
 		}
@@ -439,7 +453,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 			throw new RuntimeException(locality.name());
 		}
 	}
-
+	
 	// --------------------------------------------------------------------------------------------
 	//  Instance Availability
 	// --------------------------------------------------------------------------------------------