You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/23 23:49:40 UTC
[3/6] git commit: Improve error message when scheduler cannot find a
slot for immediate scheduling.
Improve error message when scheduler cannot find a slot for immediate scheduling.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b87f2fa2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b87f2fa2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b87f2fa2
Branch: refs/heads/master
Commit: b87f2fa2a06841973c5aaf424e8984bda24a9276
Parents: 1ddec93
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 23 19:54:01 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 23 19:54:01 2014 +0200
----------------------------------------------------------------------
.../scheduler/NoResourceAvailableException.java | 11 +++++++++--
.../runtime/jobmanager/scheduler/Scheduler.java | 20 +++++++++++++++++---
2 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b87f2fa2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
index c1c3f94..11fec72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -23,16 +23,23 @@ import org.apache.flink.runtime.JobException;
public class NoResourceAvailableException extends JobException {
private static final long serialVersionUID = -2249953165298717803L;
+
+ private static final String BASE_MESSAGE = "Not enough free slots available to run the job. "
+ + "You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.";
public NoResourceAvailableException() {
- super("Not enough free slots available to run the job. "
- + "You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.");
+ super(BASE_MESSAGE);
}
public NoResourceAvailableException(ScheduledUnit unit) {
super("No resource available to schedule unit " + unit
+ ". You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.");
}
+
+ NoResourceAvailableException(int numInstances, int numSlotsTotal) {
+ super(String.format("%s Resources available to scheduler: Number of instances=%d, total number of slots=%d",
+ BASE_MESSAGE, numInstances, numSlotsTotal));
+ }
public NoResourceAvailableException(String message) {
super(message);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b87f2fa2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index a3b8471..9ef30b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -108,6 +108,20 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
return count;
}
+ public int getTotalNumberOfSlots() {
+ int count = 0;
+
+ synchronized (globalLock) {
+ for (Instance instance : allInstances) {
+ if (instance.isAlive()) {
+ count += instance.getTotalNumberOfSlots();
+ }
+ }
+ }
+
+ return count;
+ }
+
// --------------------------------------------------------------------------------------------
// Scheduling
// --------------------------------------------------------------------------------------------
@@ -198,7 +212,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
if (slotFromGroup == null) {
// both null
if (constraint == null || constraint.isUnassigned()) {
- throw new NoResourceAvailableException();
+ throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots());
} else {
throw new NoResourceAvailableException("Could not allocate a slot on instance " +
constraint.getLocation() + ", as required by the co-location constraint.");
@@ -271,7 +285,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
return future;
}
else {
- throw new NoResourceAvailableException(task);
+ throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots());
}
}
}
@@ -439,7 +453,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
throw new RuntimeException(locality.name());
}
}
-
+
// --------------------------------------------------------------------------------------------
// Instance Availability
// --------------------------------------------------------------------------------------------