You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2015/03/15 15:20:49 UTC
svn commit: r1666803 - in /uima/sandbox/uima-ducc/trunk: src/main/resources/
uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/
uima-ducc-common/src/main/java/org/apache/uima/ducc/common/
uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ uima-ducc...
Author: challngr
Date: Sun Mar 15 14:20:49 2015
New Revision: 1666803
URL: http://svn.apache.org/r1666803
Log:
UIMA-4275 Rework handling of non-preemptive requests.
Modified:
uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties
uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/CliFixups.java
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/NodeConfiguration.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java
Modified: uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties (original)
+++ uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties Sun Mar 15 14:20:49 2015
@@ -346,6 +346,10 @@ ducc.rm.state.update.endpoint=ducc.rm.st
ducc.rm.state.update.endpoint.type=topic
# the frequency, relative to or publicatations, at which RM runs a schedule
ducc.rm.state.publish.ratio = 1
+
+# maximum allotment in GB for Non-preemptable shares - default is unlimited
+#ducc.rm.global_allotment = 360
+
# Base size of dram quantum in Gb
ducc.rm.share.quantum = 1
# Implementation class for actual scheduling algorithm
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/CliFixups.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/CliFixups.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/CliFixups.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/CliFixups.java Sun Mar 15 14:20:49 2015
@@ -40,6 +40,10 @@ public class CliFixups {
System.out.println("CLI ignored deprecated option: " + arg);
args[i] = null;
if (++i < args.length && !args[i].startsWith("--")) args[i] = null;
+ } else if (arg.equals("--number_of_instances")) {
+ System.out.println("CLI ignored deprecated option: " + arg);
+ args[i] = null;
+ if (++i < args.length && !args[i].startsWith("--")) args[i] = null;
}
}
}
@@ -53,6 +57,9 @@ public class CliFixups {
} else if (key.equals("classpath_order")) {
props.remove(key);
System.out.println("CLI ignored deprecated option: " + key);
+ } else if (key.equals("number_of_instances")) {
+ props.remove(key);
+ System.out.println("CLI ignored deprecated option: " + key);
}
}
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/NodeConfiguration.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/NodeConfiguration.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/NodeConfiguration.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/NodeConfiguration.java Sun Mar 15 14:20:49 2015
@@ -104,7 +104,6 @@ public class NodeConfiguration
defaultFairShareClass.put("use-prediction", ""+SystemPropertyResolver.getBooleanProperty("ducc.rm.prediction", true));
defaultFairShareClass.put("prediction-fudge", ""+SystemPropertyResolver.getIntProperty("ducc.rm.prediction.fudge", 60000));
defaultFairShareClass.put("max-processes", "<optional>");
- defaultFairShareClass.put("max-allotment", Integer.toString(Integer.MAX_VALUE));
defaultFairShareClass.put("nodepool", "<required>");
defaultFairShareClass.put("users", "<optional>");
defaultFairShareClass.put("debug", "fixed");
@@ -124,7 +123,7 @@ public class NodeConfiguration
defaultFixedShareClass.put("priority", "5");
defaultFixedShareClass.put("default", "<optional>");
defaultFixedShareClass.put("max-processes", "<optional>");
- defaultFixedShareClass.put("max-allotment", Integer.toString(Integer.MAX_VALUE));
+ defaultFixedShareClass.put("max-allotment", "<optional>");
defaultFixedShareClass.put("cap", "<optional>");
defaultFixedShareClass.put("nodepool", "<required>");
defaultFixedShareClass.put("users", "<optional>");
@@ -138,7 +137,7 @@ public class NodeConfiguration
defaultReserveClass.put("priority", "1");
defaultReserveClass.put("default", "<optional>");
defaultReserveClass.put("max-machines", "<optional>");
- defaultReserveClass.put("max-allotment", Integer.toString(Integer.MAX_VALUE));
+ defaultReserveClass.put("max-allotment", "<optional>");
defaultReserveClass.put("cap", "<optional>");
defaultReserveClass.put("nodepool", "<required>");
defaultReserveClass.put("users", "<optional>");
@@ -152,6 +151,7 @@ public class NodeConfiguration
defaultUser.put("type", "user");
defaultUser.put("name", "<optional>");
+ defaultUser.put("max-allotment", Integer.toString(Integer.MAX_VALUE));
}
/**
@@ -1172,7 +1172,7 @@ public class NodeConfiguration
for (Object o : cl.keySet() ) {
String k = (String) o;
- if ( k.startsWith("max-allotment.") ) {
+ if ( k.startsWith("max-allotment") ) {
printProperty(k, cl.get(k));
}
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java Sun Mar 15 14:20:49 2015
@@ -367,12 +367,15 @@ public class JobManagerConverter
j.setNQuestions(total_work, remaining_work, arith_mean);
// formatSchedulingInfo(job.getDuccId(), si, remaining_work);
-
if ( job instanceof IDuccWorkJob ) {
if ( j.setInitWait( ((IDuccWorkJob) job).isRunnable()) ) {
logger.info(methodName, jobid, "Set Initialized.");
scheduler.signalInitialized(j);
}
+ // UIMA-4275 Avoid race so we don't keep trying to give out new processes
+ if ( ((IDuccWorkJob) job).isCompleting() ) {
+ j.markComplete();
+ }
} else {
j.setInitWait(true); // pop is always ready to go
}
@@ -505,7 +508,7 @@ public class JobManagerConverter
if ( name == null ) {
name = "A Job With No Name.";
}
- String user_name = sti.getUser();
+ String user_name = sti.getUser().trim();
j.setUserName(user_name);
j.setJobName(name);
@@ -622,46 +625,49 @@ public class JobManagerConverter
// }
switch ( job.getDuccType() ) {
+ // UIMA-4275, must enforce max allocations as 1 for Service and Pop/
case Service:
case Pop:
+ switch ( rescl.getPolicy() ) {
+ case FAIR_SHARE:
+ refuse(j, "Services and managed reservations are not allowed to be FAIR_SHARE");
+ break;
+
+ case FIXED_SHARE:
+ j.setMaxShares(1);
+ break;
+
+ case RESERVE:
+ j.setMaxShares(1);
+ break;
+ }
+ status = receiveExecutable(j, job, mustRecover); // UIMA-4142, add mustRecover flag
+ logger.trace(methodName, j.getId(), "Serivce, or Pop arrives, accepted:", status);
+ break;
case Job:
// instance and share count are a function of the class
+ max_processes = toInt(si.getSharesMax(), DEFAULT_PROCESSES);
switch ( rescl.getPolicy() ) {
case FAIR_SHARE:
- max_processes = toInt(si.getSharesMax(), DEFAULT_PROCESSES);
- // max_processes = Math.min(rescl.getMaxProcesses(), max_processes);
j.setMaxShares(max_processes);
- j.setNInstances(-1);
break;
case FIXED_SHARE:
- max_processes = toInt(si.getSharesMax(), DEFAULT_INSTANCES);
j.setMaxShares(max_processes);
- j.setNInstances(max_processes);
break;
case RESERVE:
max_machines = toInt(si.getSharesMax(), DEFAULT_INSTANCES);
- j.setMaxShares(max_machines);
- j.setNInstances(max_machines);
+ j.setMaxShares(max_processes);
break;
}
status = receiveExecutable(j, job, mustRecover); // UIMA-4142, add mustRecover flag
- logger.trace(methodName, j.getId(), "Serivce, Pop, or Job arrives, accepted:", status);
+ logger.trace(methodName, j.getId(), "Job arrives, accepted:", status);
break;
case Reservation:
- switch ( rescl.getPolicy() ) {
- case FIXED_SHARE:
- max_machines = toInt(si.getInstancesCount(), DEFAULT_INSTANCES);
- break;
- case RESERVE:
- max_machines = toInt(si.getInstancesCount(), DEFAULT_INSTANCES);
- break;
- }
-
- j.setMaxShares(-1);
- j.setNInstances(max_machines);
+ // UIMA-4275. non-jobs restricted to exactly one allocation per request
+ j.setMaxShares(1);
status = receiveReservation(j, job, mustRecover); // UIMA-4142, add mustRecover flag
logger.trace(methodName, j.getId(), "Reservation arrives, accepted:", status);
@@ -1005,37 +1011,38 @@ public class JobManagerConverter
return ret;
}
- boolean isPendingNonPreemptable(IRmJob j)
- {
- String methodName = "isPendingNonPreemptable";
- // If fair share it definitely isn't any kind of preemptable
- if ( j.getResourceClass().getPolicy() == Policy.FAIR_SHARE) return false;
-
- // otherwise, if the shares it has allocated is < the number it wants, it is in fact
- // pending but not complete.
- logger.trace(methodName, j.getId(), "countNShares", j.countNShares(), "countInstances", j.countInstances(), "isComplete", j.isCompleted());
-
- if ( j.isCompleted() ) {
- return false;
- }
-
- // 2014-02-18 - countTotalAssignments is the total nodes this job ever got - we're not allowed to
- // add more. But if a node dies and the share is canceled, countNShares() CAN return
- // 0, preventing this cutoff check from working, and the job looks "refused" when in
- // fact it's just hungy. Hence, the change from countNShares to countTotalAssignments.
- //
- // Note: The NodePool code that detects dead nodes is responsible for removing dead shares
- // from jobs and should not remove shares from reservations, but it can remove shares
- // from non-preemptables that aren't reservations.
- // UIMA-3613 jrc
- //if ( j.countNShares() == j.countInstances() ) {
- if ( j.countTotalAssignments() == j.countInstances() ) {
- j.markComplete(); // non-preemptable, remember it finally got it's max
- return false;
- }
+ // No longer needed after UIMA-4275
+ // boolean isPendingNonPreemptable(IRmJob j)
+ // {
+ // String methodName = "isPendingNonPreemptable";
+ // // If fair share it definitely isn't any kind of preemptable
+ // if ( j.getResourceClass().getPolicy() == Policy.FAIR_SHARE) return false;
+
+ // // otherwise, if the shares it has allocated is < the number it wants, it is in fact
+ // // pending but not complete.
+ // logger.trace(methodName, j.getId(), "countNShares", j.countNShares(), "countInstances", j.getMaxShares(), "isComplete", j.isCompleted());
+
+ // if ( j.isCompleted() ) {
+ // return false;
+ // }
+
+ // // 2014-02-18 - countTotalAssignments is the total nodes this job ever got - we're not allowed to
+ // // add more. But if a node dies and the share is canceled, countNShares() CAN return
+ // // 0, preventing this cutoff check from working, and the job looks "refused" when in
+ // // fact it's just hungy. Hence, the change from countNShares to countTotalAssignments.
+ // //
+ // // Note: The NodePool code that detects dead nodes is responsible for removing dead shares
+ // // from jobs and should not remove shares from reservations, but it can remove shares
+ // // from non-preemptables that aren't reservations.
+ // // UIMA-3613 jrc
+ // //if ( j.countNShares() == j.countInstances() ) {
+ // if ( j.countTotalAssignments() == j.getMaxShares() ) {
+ // j.markComplete(); // non-preemptable, remember it finally got it's max
+ // return false;
+ // }
- return (j.countNShares() < j.countInstances());
- }
+ // return (j.countNShares() < j.getMaxShares());
+ // }
/**
* If no state has changed, we just resend that last one.
@@ -1098,41 +1105,37 @@ public class JobManagerConverter
Map<Share, Share> shares = null;
Map<Share, Share> redrive = null;
- if (isPendingNonPreemptable(j) ) {
- logger.info(methodName, j.getId(), "Delaying publication of expansion because it's not yet complete.");
- } else {
- shares = j.getAssignedShares();
- if ( shares != null ) {
- ArrayList<Share> sorted = new ArrayList<Share>(shares.values());
- Collections.sort(sorted, new RmJob.ShareByInvestmentSorter());
- for ( Share s : sorted ) {
- Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), s.getInitializationTime());
- all_shares.put(s.getId(), r);
- }
- redrive = sanityCheckForOrchestrator(j, shares, expanded.get(j.getId()));
+ shares = j.getAssignedShares();
+ if ( shares != null ) {
+ ArrayList<Share> sorted = new ArrayList<Share>(shares.values());
+ Collections.sort(sorted, new RmJob.ShareByInvestmentSorter());
+ for ( Share s : sorted ) {
+ Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), s.getInitializationTime());
+ all_shares.put(s.getId(), r);
}
+ redrive = sanityCheckForOrchestrator(j, shares, expanded.get(j.getId()));
+ }
- shares = shrunken.get(j.getId());
- if ( shares != null ) {
- for ( Share s : shares.values() ) {
- Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0);
- shrunken_shares.put(s.getId(), r);
- }
- }
+ shares = shrunken.get(j.getId());
+ if ( shares != null ) {
+ for ( Share s : shares.values() ) {
+ Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0);
+ shrunken_shares.put(s.getId(), r);
+ }
+ }
- shares = expanded.get(j.getId());
- if ( shares != null ) {
- for ( Share s : shares.values() ) {
- Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0);
- expanded_shares.put(s.getId(), r);
- }
+ shares = expanded.get(j.getId());
+ if ( shares != null ) {
+ for ( Share s : shares.values() ) {
+ Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0);
+ expanded_shares.put(s.getId(), r);
}
+ }
- if ( redrive != null ) {
- for ( Share s : redrive.values() ) {
- Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0);
- expanded_shares.put(s.getId(), r);
- }
+ if ( redrive != null ) {
+ for ( Share s : redrive.values() ) {
+ Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0);
+ expanded_shares.put(s.getId(), r);
}
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java Sun Mar 15 14:20:49 2015
@@ -41,6 +41,8 @@ public interface IRmJob
public DuccId getId();
+ public String getShortType(); // S, R, M, J - service reservation managed-reservation, job
+
public long getFriendlyId();
public String getName();
@@ -244,8 +246,8 @@ public interface IRmJob
public ResourceClass getResourceClass();
- public int countInstances();
- public void setNInstances(int m);
+ //public int countInstances();
+ //public void setNInstances(int m);
public int nThreads();
public void setThreads(int threads);
@@ -273,4 +275,9 @@ public interface IRmJob
public boolean isService(); // UIMA-4142
public boolean isInitialized();
+
+ // Total number of shares to account to me - either actually assigned, or
+ // counted afresh in the current scheduling cycle, for allotments
+ public int countOccupancy(); // UIMA-4275
+
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java Sun Mar 15 14:20:49 2015
@@ -81,7 +81,6 @@ class NodePool
int nSharesByOrder[]; // shares of each size for each share order [ 0 21 9 5 4 ] - collective N Shares for each order
//int nFreeSharesByOrder[]; // for each order, the theoretical number of shares to give away [ 0 1 0 3 16 ] - free Q shares per order
- int machinesToPreempt[]; // number of full machines to preempt for reservations, by order
int nPendingByOrder[]; // number of N-shares with pending evictinos
//int neededByOrder[]; // for each order, how many N-shares do I want to add?
@@ -271,17 +270,12 @@ class NodePool
/**
* Counts just local, for reservations.
*/
- int countFreeMachines(int order, boolean enforce)
+ int countFreeMachines(int order)
{
int cnt = 0;
HashMap<Node, Machine> mlist = null;
- if ( enforce ) {
- mlist = machinesByOrder.get(order);
- } else {
- mlist = allMachines;
- }
-
+ mlist = machinesByOrder.get(order);
if ( mlist == null ) return 0;
for ( Machine m : mlist.values() ) {
@@ -297,7 +291,7 @@ class NodePool
*/
int countAllFreeMachines()
{
- int count = countFreeMachines(0, false); // don't care about the order
+ int count = countFreeMachines(0); // don't care about the order
for ( NodePool np : children.values() ) {
count += np.countAllFreeMachines();
}
@@ -719,12 +713,13 @@ class NodePool
void accountForShares(HashMap<Share, Share> shares)
{
+ if ( shares == null ) return;
+
for ( Share s : shares.values() ) {
int order = s.getShareOrder();
Machine m = s.getMachine();
rearrangeVirtual(m, order);
}
- //calcNSharesByOrder();
}
/**
@@ -750,8 +745,6 @@ class NodePool
nPendingByOrder = new int[maxorder + 1];
- machinesToPreempt = new int[maxorder + 1];
-
// UIMA-4142 Must set vMachinesByOrder and virtualMachinesByOrder independently of
// machinesByOrder because blacklisting can cause v_order != r_order
// during reset.
@@ -1119,8 +1112,18 @@ class NodePool
*/
/**
+ * A quick check to see if there are any machines of the right size. We make a more
+ * comprehensive check to see if they're usable in countFreeableMachines later.
+ */
+ int countReservables(IRmJob j)
+ {
+ int order = j.getShareOrder();
+ if ( ! machinesByOrder.containsKey(order) ) return 0;
+ return machinesByOrder.get(order).size();
+ }
+
+ /**
* Adjust counts for something that takes full machines, like a reservation.
- * If "enforce" is set the machine order must match, otherwise we just do best effort to match.
*
* This is intended for use by reservations only; as such it does NOT recurse into child nodepools.
*
@@ -1129,24 +1132,20 @@ class NodePool
*
* @returns number of machines given
*/
- int countFreeableMachines(IRmJob j, boolean enforce)
+ int countOutMachinesByOrder(IRmJob j, int needed)
{
- String methodName = "countFreeableMachines";
-
- logger.info(methodName, j.getId(), "Enter nodepool", id, "with enforce", enforce, "preemptables.size() =", preemptables.size());
- int needed = j.countInstances();
+ String methodName = "countOutMachinesByOrder";
+
int order = j.getShareOrder();
+ int given = 0;
ArrayList<Machine> machs = new ArrayList<Machine>();
- if ( enforce ) {
- if ( machinesByOrder.containsKey(order) ) {
- machs.addAll(machinesByOrder.get(order).values());
- } else {
- return 0;
- }
+ if ( machinesByOrder.containsKey(order) ) {
+ machs.addAll(machinesByOrder.get(order).values());
} else {
- machs.addAll(allMachines.values());
+ return 0; // oops, nothing here
}
+
StringBuffer sb = new StringBuffer("Machines to search:");
for ( Machine m : machs ) {
sb.append(" ");
@@ -1156,7 +1155,6 @@ class NodePool
Collections.sort(machs, new MachineByAscendingOrderSorter());
- int given = 0; // total to give, free or freeable
Iterator<Machine> iter = machs.iterator();
ArrayList<Machine> pables = new ArrayList<Machine>();
@@ -1174,37 +1172,22 @@ class NodePool
}
if ( m.isFree() ) {
- logger.info(methodName, j.getId(), "Giving", m.getId(), "because it is free");
+ logger.info(methodName, j.getId(), m.getId(), "is free for reservations.");
+ nMachinesByOrder[m.getShareOrder()]--;
given++;
continue;
}
if ( m.isFreeable() ) {
- logger.info(methodName, j.getId(), "Giving", m.getId(), "because it is freeable");
+ logger.info(methodName, j.getId(), "Setting up", m.getId(), "to clear for reservation");
+ nMachinesByOrder[m.getShareOrder()]--;
given++;
- pables.add(m);
+ preemptables.put(m.key(), m);
+ continue;
} else {
logger.info(methodName, j.getId(), "Bypass because machine", m.getId(), "is not freeable");
}
}
-
- if ( given < needed ) {
- return 0;
- }
-
- // Remember how many full machines we need to free up when we get to preemption stage.
- if ( enforce ) {
- machinesToPreempt[0] += preemptables.size();
- } else {
- machinesToPreempt[order] += preemptables.size();
- }
-
- for ( Machine m : pables ) {
- logger.info(methodName, j.getId(), "Setting up", m.getId(), "to clear for reservation");
- preemptables.put(m.key(), m);
- nMachinesByOrder[m.getShareOrder()]--;
- }
-
calcNSharesByOrder();
return given;
}
@@ -1276,13 +1259,12 @@ class NodePool
}
/**
- * We need to make enough space for 'cnt' full machines. If enforce is true the machines need
- * to be of the indicated order; otherwise we just nuke any old thing.
+ * We need to make enough space for 'cnt' full machines.
*
* Returns number of machines that are freeable, up to 'needed', or 0, if we can't get enough.
* If we return 0, we must refuse the reservation.
*/
- protected int setupPreemptions(int needed, int order, boolean enforce)
+ protected int setupPreemptions(int needed, int order)
{
String methodName = "setupPreemptions";
int given = 0;
@@ -1292,9 +1274,10 @@ class NodePool
while ( iter.hasNext() && (given < needed) ) {
Machine m = iter.next();
int o = m.getShareOrder();
- if ( enforce && ( order != o) ) {
+ if ( order != o ) {
continue;
}
+ logger.info(methodName, null, "Clearing", m.getId(), "from preemptable list for reservations.");
HashMap<Share, Share> shares = m.getActiveShares();
for ( Share s : shares.values() ) {
if ( s.isPreemptable() ) {
@@ -1306,36 +1289,35 @@ class NodePool
// log its state to try to figure out why it didn't evict
if ( ! (s.isEvicted() || s.isPurged() ) ) {
IRmJob j = s.getJob();
- logger.warn(methodName, j.getId(), "Found non-preemptable share", s.getId(), "fixed:", s.isFixed(), "j.NShares", j.countNShares(), "j.NSharesGiven", j.countNSharesGiven());
+ logger.info(methodName, j.getId(), "Found non-preemptable share", s.getId(), "fixed:", s.isFixed(),
+ "j.NShares", j.countNShares(), "j.NSharesGiven", j.countNSharesGiven());
}
}
}
given++;
iter.remove();
- logger.info(methodName, null, "Remove", m.getId(), "from preemptables list");
}
return given;
}
/**
- * Here we have to dig around and find either a fully free machine, or a machine that we
+ * Here we have to dig around and find either fully free machines, or machines that we
* can preempt to fully free it.
- *
- * If 'enforce' is set memory must match, otherwise we can pick anything from the pool.
- *
- * This routine should NEVER fail because we already counted to be sure we had enough. The
- * caller should probably throw if it does not get exactly what it wants.
*/
- void allocateForReservation(IRmJob job, ResourceClass rc)
+ void findMachines(IRmJob job, ResourceClass rc)
{
- String methodName = "allocateForReservation";
- ArrayList<Machine> answer = new ArrayList<Machine>();
+ String methodName = "findMachines";
ArrayList<Machine> machs;
int order = job.getShareOrder();
- int needed = job.countInstances();
- boolean enforce = rc.enforceMemory();
+
+ int counted = job.countNSharesGiven(); // allotment from the counter
+ int current = job.countNShares(); // currently allocated, plus pending, less those removed by earlier preemption
+ int needed = (counted - current);
+
+ logger.info(methodName, job.getId(), "counted", counted, "current", current, "needed", needed, "order", order);
+ if ( needed <= 0 ) return;
//
// Build up 'machs' array, containing all candidate machines, sorted by
@@ -1345,56 +1327,27 @@ class NodePool
// Free machines always sort to the front of the list of course.
//
- int cnt = countFreeMachines(order, enforce);
+ int cnt = countFreeMachines(order);
if ( cnt < needed ) {
- logger.info(methodName, job.getId(), "Reservation waiting on evictions. Have", cnt, "free, needed", needed);
- setupPreemptions(needed-cnt, order, enforce); // if returns 0, must refuse the job
- return;
+ // Get the preemptions started
+ logger.info(methodName, job.getId(), "Setup preemptions. Have", cnt, "free machines, needed", needed);
+ setupPreemptions(needed-cnt, order); // if returns 0, must refuse the job
}
- //
- // If we get here we MUST have enough machines because we would have refused the job otherwise, or else
- // started preemptions.
- //
- if ( enforce ) { // enforcing order, only look in the right subset of machines
- if ( ! machinesByOrder.containsKey(order) ) { // hosed if this happens
- throw new SchedInternalError(job.getId(), "Scheduling counts are wrong - machinesByOrder does not match nMachinesByOrder");
- }
- machs = sortedForReservation(machinesByOrder.get(order));
- } else { // no enforcement - anything in the pool is fair game
- machs = sortedForReservation(allMachines);
+ // something awful happened if we throw here.
+ if ( ! machinesByOrder.containsKey(order) ) { // hosed if this happens
+ throw new SchedInternalError(job.getId(), "Scheduling counts are wrong - machinesByOrder does not match nMachinesByOrder");
}
+ machs = sortedForReservation(machinesByOrder.get(order));
- if ( machs.size() < needed ) { // totally completely hosed if this happens. but let's do the sanity check.
- throw new SchedInternalError(job.getId(), "Scheduling counts are wrong - can't schedule reservation");
- }
-
- // machs is all candidate machines, ordered by empty, then most preferable, according to the eviction policy.
- // We don't have to preempt explicitly, the counting should have worked this out. Only thing to watch is that
- // when we do preemption in fair-share that we free up sufficient machines fully.
- for ( Machine m : machs ) {
- if ( m.isFree() ) {
- answer.add(m);
- needed--;
- }
-
- if ( needed == 0 ) {
- break;
- }
- }
-
- if ( needed == 0 ) { // all-or-nothing - don't return anything if we're waiting for preemption
- for ( Machine mm : answer ) {
+ // Machs is all candidate machines, ordered by empty, then most preferable, according to the eviction policy.
+ for ( Machine mm : machs ) {
+ if ( mm.isFree() ) {
Share s = new Share(mm, job, mm.getShareOrder());
s.setFixed();
connectShare(s, mm, job, mm.getShareOrder());
- //job.assignShare(s);
- //mm.assignShare(s);
- //allShares.put(s, s);
- //rearrangeVirtual(mm, order);
+ if ( --needed == 0 ) break;
}
- } else {
- throw new SchedInternalError(job.getId(), "Thought we had enough machines for reservation, but we don't");
}
}
@@ -1584,7 +1537,7 @@ class NodePool
int order = j.getShareOrder();
int given = 0;
- logger.trace(methodName, j.getId(), "counted", counted, "current", current, "needed", needed, "order", order, "given", given);
+ logger.info(methodName, j.getId(), "counted", counted, "current", current, "needed", needed, "order", order, "given", given);
if ( needed > 0 ) {
whatof: {
@@ -1655,6 +1608,8 @@ class NodePool
sb.append(getId());
sb.append(" Expansions in this order: ");
for ( IRmJob j : jobs ) {
+ if ( j.isCompleted() ) continue; // deal with races while job is completing
+
j.undefer();
sb.append(j.getId());
sb.append(":");
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java Sun Mar 15 14:20:49 2015
@@ -55,12 +55,12 @@ public class NodepoolScheduler
int fragmentationThreshold = 2;
boolean do_defragmentation = true;
+ boolean use_global_allotment = true;
+ int global_allotment = Integer.MAX_VALUE;
+ int scheduling_quantum;
NodepoolScheduler()
{
- fragmentationThreshold = SystemPropertyResolver.getIntProperty("ducc.rm.fragmentation.threshold",
- fragmentationThreshold);
- do_defragmentation = SystemPropertyResolver.getBooleanProperty("ducc.rm.defragmentation", do_defragmentation);
}
public void setClasses(Map<ResourceClass, ResourceClass> prclasses)
@@ -106,6 +106,11 @@ public class NodepoolScheduler
classes[ndx++] = sorter.get(k);
}
+ fragmentationThreshold = SystemPropertyResolver.getIntProperty("ducc.rm.fragmentation.threshold", fragmentationThreshold);
+ scheduling_quantum = SystemPropertyResolver.getIntProperty("ducc.rm.share.quantum", scheduling_quantum);
+ do_defragmentation = SystemPropertyResolver.getBooleanProperty("ducc.rm.defragmentation", do_defragmentation);
+ use_global_allotment = SystemPropertyResolver.getBooleanProperty("ducc.rm.use_global_allotment", use_global_allotment);
+ global_allotment = SystemPropertyResolver.getIntProperty("ducc.rm.global_allotment", global_allotment);
}
public void setNodePool(NodePool np)
@@ -118,6 +123,95 @@ public class NodepoolScheduler
this.evictionPolicy = ep;
}
+ /**
+ * Check the allotment for the user, given that we want to allocate
+ * - nprocs new processes for
+ * - job j
+ */
+ boolean validSingleAllotment(IRmJob j)
+ {
+ String methodName = "validAllotment";
+ //
+ // Original design and implementation for UIMA-4275: class based. Subsequent discussion resulted in
+ // changing to a single global allotment. I'll leave the class-based allotment in for now because
+ // I suspect it will re-raise its ugly head. (jrc)
+ //
+
+ if ( use_global_allotment ) {
+ User u = j.getUser();
+ int lim = u.getOverrideLimit();
+ if ( lim < 0 ) {
+ lim = global_allotment;
+ }
+
+ int shares = u.countNPShares();
+ long sharesInGB = ((shares + j.getShareOrder()) * scheduling_quantum);
+ if ( sharesInGB > lim ) {
+ schedulingUpdate.defer(j, "Deferred because allotment of " + lim + "GB is exceeded by user " + j.getUserName());
+ logger.info(methodName, j.getId(), "Deferred because allotment of " + lim + "GB is exceeded by user " + j.getUserName());
+ return false;
+ }
+ } else {
+ ResourceClass rc = j.getResourceClass();
+ if ( rc.allotmentExceeded(j) ) {
+ schedulingUpdate.defer(j, "Deferred because allotment of " + rc.getAllotment(j) + "GB is exceeded by user " + j.getUserName());
+ logger.info(methodName, j.getId(), "Deferred because allotment of " + rc.getAllotment(j) + "GB is exceeded by user " + j.getUserName());
+ return false;
+ }
+ }
+ return true;
+ }
+
+ int getAllotmentForJob(IRmJob j)
+ {
+ String methodName = "getAllotmentForJob";
+ User u = j.getUser();
+
+ // Let the job ask for the world. This accounts for init cap, prediction, number usable, etc
+ int order = j.getShareOrder();
+ int wanted = j.getJobCap(); // in nshares
+ logger.info(methodName, j.getId(), "Job cap", nSharesToString(wanted, order));
+
+ // Find out how many qshares we're allowed
+ int allotment_in_gb = u.getOverrideLimit();
+ if ( allotment_in_gb < 0 ) {
+ allotment_in_gb = global_allotment;
+ }
+ int user_allotment = allotment_in_gb / scheduling_quantum; // qshares
+
+ // How many qshares we, the user, have used
+ int allocated = u.countNPShares();
+ logger.info(methodName, j.getId(), "Current NP allocation for user", allocated, "qshares", (allocated * scheduling_quantum), "GB",
+ "user_allotment", user_allotment, "user_allotment in GB", allotment_in_gb );
+
+ // This is how many QShares we get to allocate for the job
+ int additional = Math.max(0, user_allotment - allocated);
+ int additional_processes = additional / order;
+ logger.info(methodName, j.getId(), "Additional shares allowed for request:", nSharesToString(additional_processes, order));
+
+ // No shares, so we show deferred
+ if ( (additional_processes == 0) ) {
+ if (j.countNShares() == 0) {
+ // over allotment, never had anything, can get anything so is deferred
+ schedulingUpdate.defer(j, "Deferred because allotment of " + allotment_in_gb + "GB is exceeded.");
+ logger.info(methodName, j.getId(), "Deferred because allotment of", allotment_in_gb, "GB is exceeded.");
+ } else {
+ logger.info(methodName, j.getId(), "Allotment of", allotment_in_gb, "GB caps request. Return with", allocated, "qshares allocated.");
+ }
+ return j.countNShares();
+ }
+
+ int allowed = j.countNShares() + additional_processes;
+
+ if ( allowed < wanted ) {
+ logger.info(methodName, j.getId(), "Capping job on allotment: ", allotment_in_gb + " GB. Remaining allowed nshares [",
+ allowed, "] wanted [", wanted, "]");
+ }
+
+ logger.info(methodName, j.getId(), "Allowed", nSharesToString(allowed, order), "wanted", nSharesToString(wanted, order));
+ return Math.min(wanted, allowed);
+ }
+
private void reworknShares(int[] vshares, int[] nshares)
{
// now redo nshares
@@ -1014,7 +1108,7 @@ public class NodepoolScheduler
logger.debug(methodName, null, "NP[", np.getId(), "Expand needy reservations.", listJobSet(reservations));
for ( IRmJob j : reservations ) {
ResourceClass rc = j.getResourceClass();
- np.allocateForReservation(j, rc);
+ np.findMachines(j, rc);
}
Collections.sort(fixed_share_jobs, new JobByTimeSorter());
@@ -1137,7 +1231,7 @@ public class NodepoolScheduler
* Make sure there are enough shares to allocate either directly, or through preemption,
* and count them out.
*/
- protected void howMuchFixed(ArrayList<ResourceClass> rcs)
+ void howMuchFixed(ArrayList<ResourceClass> rcs)
{
String methodName = "howMuchFixedShare";
@@ -1154,97 +1248,124 @@ public class NodepoolScheduler
for ( ResourceClass rc : rcs ) {
HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
total_jobs += jobs.size();
- if ( jobs.size() == 0 ) {
- logger.info(methodName, null, "No jobs to schedule in class ", rc.getName());
- } else {
- StringBuffer buf = new StringBuffer();
- for ( IRmJob j : jobs.values() ) {
- buf.append(" ");
- buf.append(j.getId());
- }
- logger.info(methodName, null, "Scheduling jobs in class:", rc.getName(), buf.toString());
- }
}
if ( total_jobs == 0 ) {
return;
}
for ( ResourceClass rc : rcs ) {
- ArrayList<IRmJob> jobs = rc.getAllJobsSorted(new JobByTimeSorter());
+ ArrayList<IRmJob> jobs = rc.getAllJobsSorted(new JobByTimeSorter());
for ( IRmJob j : jobs ) {
- j.clearShares(); // reset shares assigned at start of each schedling cycle
- j.undefer(); // in case it works this time!
- }
+ logger.info(methodName, j.getId(), "Scheduling job to class:", rc.getName());
- NodePool np = rc.getNodepool();
+ j.clearShares(); // reset virtual shares at start of each schedling cycle
+ j.undefer(); // in case it works this time!
+ switch ( j.getDuccType() ) {
+ case Job:
+ countFixedForJob(j, rc);
+ break;
+ case Service:
+ case Pop:
+ case Reservation:
+ default:
+ countSingleFixedProcess(j, rc);
+ break;
+ }
+ }
+ }
+ }
- for ( IRmJob j : jobs ) {
+
+ void countFixedForJob(IRmJob j, ResourceClass rc)
+ {
+ String methodName = "countFixedForJob";
- int n_instances = j.countInstances(); // n-shrares; virtual shares
+ logger.info(methodName, j.getId(), "Counting shares for", j.getShortType() + "." + j.getId());
- if ( j.countNShares() > 0 ) { // all-or-nothing check
- // already accounted for as well, since it is a non-preemptable share
- logger.info(methodName, j.getId(), "[stable]", "requested", n_instances, "assigned", j.countNShares(), "processes, ",
- (j.countNShares() * j.getShareOrder()), "QS");
- int[] gbo = NodePool.makeArray();
+ // Allowed something if we get here. Must see if we have something to give.
+ NodePool np = rc.getNodepool();
- gbo[j.getShareOrder()] = j.countNShares(); // must set the allocation so eviction works right
+ int order = j.getShareOrder();
+ int available = np.countLocalNSharesByOrder(order);
+ logger.info(methodName, j.getId(), "available shares of order", order, "in np:", available);
- j.setGivenByOrder(gbo);
- continue;
- }
+ if ( available == 0 ) {
+ if (j.countNShares() == 0) {
+ schedulingUpdate.defer(j, "Deferred because insufficient resources are availble.");
+ logger.info(methodName, j.getId(), "Deferring, insufficient shares available. NP", np.getId(),
+ "available[", np.countNSharesByOrder(order), "]");
+ } else {
+ logger.info(methodName, j.getId(), "Nodepool is out of shares: NP", np.getId(),
+ "available[", np.countNSharesByOrder(order), "]");
+ }
+ return;
+ }
- if ( j.isCompleted() ) { // allocation once filled? then we're done
- continue; // UIMA-3614, don't refuse, just stop allocating
- }
+ int granted = getAllotmentForJob(j); // in nshares, processes
- int order = j.getShareOrder();
+ //
+ // The job passes; give the job a count
+ //
+ logger.info(methodName, j.getId(), "+++++ nodepool", np.getId(), "class", rc.getName(), "order", order, "shares", nSharesToString(granted, order));
+ int[] gbo = NodePool.makeArray();
+ gbo[order] = granted; // what we get
+ j.setGivenByOrder(gbo);
+
+ // The difference between what we pass to 'what of', and what we already have. The shares we already have are accounted
+ // for in a special step at the start of the scheduling round.
+ np.countOutNSharesByOrder(order, granted - j.countNShares());
+ }
- // Don't schedule non-preemptable shares over subpools
- if ( np.countLocalShares() < n_instances ) {
- schedulingUpdate.defer(j, "Job deferred because insufficient resources are availble for this class.");
-
- logger.warn(methodName, j.getId(), "1 Deferring sixed share job because nodepool " + np.getId()
- + " has insufficient space left. Available["
- + np.countLocalShares()
- + "] requested[" + n_instances + "]");
- continue;
- }
-
- //
- // Now see if we have sufficient shares in the system for this allocation.
- //
- if ( np.countNSharesByOrder(order) < n_instances ) { // countSharesByOrder is N shares, as is minshares
- schedulingUpdate.defer(j, "Job deferred because insufficient resources are availble.");
- logger.warn(methodName, j.getId(), "2 Deferring fixed share job, insufficient shares available. Available[" + np.countNSharesByOrder(order) + "] requested[" + n_instances + "]");
- continue;
- }
+ void countSingleFixedProcess(IRmJob j, ResourceClass rc)
+ {
+ String methodName = "countSingleFixedProcess";
- //
- // Make sure this allocation does not blow the class cap.
- //
- if ( rc.allotmentExceeded(j) ) {
- schedulingUpdate.defer(j, "Job deferred because allotment of " + rc.getAllotment(j) + "GB is exceeded by user " + j.getUserName());
- continue;
- }
- //
- // The job passes. Assign it a count and get on with life ...
- //
- logger.info(methodName, j.getId(), "+++++ nodepool", np.getId(), "class", rc.getName(), "order", order, "shares", nSharesToString(n_instances, order));
- int[] gbo = NodePool.makeArray();
- gbo[order] = n_instances;
- j.setGivenByOrder(gbo);
+ logger.info(methodName, j.getId(), "Counting shares for", j.getShortType() + "." + j.getId(), "in class", rc.getName());
+ NodePool np = rc.getNodepool();
- np.countOutNSharesByOrder(order, n_instances);
- }
+ if ( j.countNShares() > 0 ) { // only 1 allowed, UIMA-4275
+ // already accounted for as well, since it is a non-preemptable share
+ logger.info(methodName, j.getId(), "[stable]", "assigned", j.countNShares(), "processes, ",
+ (j.countNShares() * j.getShareOrder()), "QS");
+ int[] gbo = NodePool.makeArray();
+
+ gbo[j.getShareOrder()] = 1; // must set the allocation so eviction works right
+ j.setGivenByOrder(gbo);
+ return;
+ }
+
+ int order = j.getShareOrder();
+
+ //
+ // Now see if we have sufficient shares in the nodepool for this allocation.
+ //
+ if ( np.countLocalNSharesByOrder(order) == 0 ) {
+ schedulingUpdate.defer(j, "Deferred because insufficient resources are availble.");
+ logger.info(methodName, j.getId(), "Deferring, insufficient shares available. NP", np.getId(), "available[", np.countNSharesByOrder(order), "]");
+ return;
}
+
+ //
+ // Make sure this allocation does not blow the allotment cap.
+ //
+ if ( ! validSingleAllotment(j) ) return; // this method will defer the job and log it
+
+ //
+ // The job passes. Assign it a count and get on with life ...
+ //
+ logger.info(methodName, j.getId(), "+++++ nodepool", np.getId(), "class", rc.getName(), "order", order, "shares", nSharesToString(1, order));
+ int[] gbo = NodePool.makeArray();
+ gbo[order] = 1;
+ j.setGivenByOrder(gbo);
+
+ np.countOutNSharesByOrder(order, 1);
}
/**
- * All-or-nothing makes this easy. If there are free shares of the right order just assign them. Otherwise
+ * If there are free shares of the right order just assign them. Otherwise
* the counts will cause evictions in lower-priority code so we just wait.
*/
protected void whatOfFixedShare(ArrayList<ResourceClass> rcs)
@@ -1255,12 +1376,7 @@ public class NodepoolScheduler
NodePool np = rc.getNodepool();
for ( IRmJob j : jobs ) {
-
- if ( j.countNShares() > 0 ) { // all or nothing - if we have any, we're fully satisfied
- continue;
- }
-
- if ( j.isCompleted() ) { // UIMA-3614 - may have bene purged, don't give it more
+ if ( j.countNShares() == j.countNSharesGiven() ) { // got what we need, we're done
continue;
}
@@ -1268,18 +1384,14 @@ public class NodepoolScheduler
continue;
}
- if ( j.isDeferred() ) { // UIMA-4275 - still waiting for an allocation
+ if ( j.isDeferred() ) { // UIMA-4275 - still waiting for an allocation
continue;
}
int order = j.getShareOrder();
int count = j.countNSharesGiven();
- int avail = np.countNSharesByOrder(order);
- if ( avail >= count ) {
- if ( np.findShares(j) != count ) {
- throw new SchedInternalError(j.getId(), "Can't get enough shares but counts say there should be plenty.");
- }
+ if ( np.findShares(j) > 0 ) { // UIMA-4275, no longer require full allocation, we'll take what we can
//
// Need to fix the shares here, if any, because the findShares() code is same for fixed and fair share so it
// won't have done that yet.
@@ -1287,15 +1399,16 @@ public class NodepoolScheduler
for ( Share s : j.getPendingShares().values() ) {
s.setFixed();
}
- logger.info(methodName, j.getId(), "Assign:", nSharesToString(count, j.getShareOrder()));
- } else {
- j.setReason("Waiting for preemptions.");
+ logger.info(methodName, j.getId(), "Assign:", nSharesToString(count, order));
}
-
//
- // If nothing assigned we're waiting on preemptions which will occur naturally, or by forcible eviction of squatters.
+ // If nothing assigned we're waiting on preemptions which will occur naturally, or by forcible eviction of squatters,
+ // or defrag.
//
+ if ( j.countNShares() == 0 ) {
+ j.setReason("Waiting for preemptions.");
+ }
}
}
}
@@ -1307,7 +1420,7 @@ public class NodepoolScheduler
// ==========================================================================================
private void howMuchReserve(ArrayList<ResourceClass> rcs)
{
- String methodName = "howMuchToreserve";
+ String methodName = "howMuchreserve";
if ( logger.isTrace() ) {
logger.trace(methodName, null, "Calculating counts for RESERVATION for these classes:");
@@ -1318,114 +1431,118 @@ public class NodepoolScheduler
}
}
+ int total_jobs = 0;
for ( ResourceClass rc : rcs ) {
+ HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
+ total_jobs += jobs.size();
+ }
+ if ( total_jobs == 0 ) {
+ return;
+ }
- // Get jobs into order by submission time - new ones ones may just be out of luck
+ for ( ResourceClass rc : rcs ) {
ArrayList<IRmJob> jobs = rc.getAllJobsSorted(new JobByTimeSorter());
- NodePool np = rc.getNodepool();
- // Find out what is given out already, for class caps. These are already accounted for
- // in the global counts.
- Iterator<IRmJob> jlist = jobs.iterator();
- while ( jlist.hasNext() ) {
- IRmJob j = jlist.next();
- j.undefer();
+ // Now pick up the work that can be scheduled, if any
+ for ( IRmJob j : jobs) {
+ j.clearShares(); // reset shares assigned at start of each schedling cycle
+ j.undefer(); // in case it works this time!
- if ( np == null ) { // oops - no nodes here yet, must refuse all jobs
-
- schedulingUpdate.defer(j, "Reservation deferred because insufficient resources are available.");
- logger.warn(methodName, j.getId(), "Job scheduled to class "
- + rc.getName()
- + " but associated nodepool has no resources");
- continue;
+ switch ( j.getDuccType() ) {
+ case Job:
+ countReservationForJob(j, rc);
+ break;
+ case Service:
+ case Pop:
+ case Reservation:
+ default:
+ countSingleReservation(j, rc);
+ break;
}
+ }
+ }
+ }
- j.setReservation(); // in case it's a new reservation
+ void countReservationForJob(IRmJob j, ResourceClass rc)
+ {
+ String methodName = "countReservationForJob";
- int nshares = j.countNShares(); // for reservation each share is one full machine
- if ( nshares > 0 ) { // if it has any, it has all of them, so take off list
- int[] gbo = NodePool.makeArray(); // needed to for defrag
- // gbo[j.getShareOrder()] = j.countInstances();
- gbo[j.getShareOrder()] = j.countNShares(); // UIMA-3614 - may be < Instances if machine is purged
- j.setGivenByOrder(gbo);
-
- jlist.remove();
- continue;
- }
+ logger.info(methodName, j.getId(), "Counting full machines for", j.getShortType() + "." + j.getId());
- if ( j.isCompleted() ) { // maybe nothibng left, but it once had stuff
- jlist.remove(); // don't try to reallocate. UIMA-3614
- continue;
- }
- }
+ // Allowed something if we get here. Must see if we have something to give.
+ NodePool np = rc.getNodepool();
- if ( np == null ) { // no np. jobs have been deferred, cannot continue.
- return;
+ // Are there any machines of the right size in the NP that can be reserved for this job?
+ int available = np.countReservables(j);
+ if ( available == 0 ) {
+ if (j.countNShares() == 0) {
+ schedulingUpdate.defer(j, "Deferred because there are no hosts of the correct size.");
+ logger.info(methodName, j.getId(), "Deferred because no hosts of correct size. NP", np.getId());
+
+ } else {
+ logger.info(methodName, j.getId(), "Nodepool is out of shares: NP", np.getId());
}
+ return;
+ }
- // Now pick up the jobs that can still be scheduled, if any
- jlist = jobs.iterator();
+ int granted = getAllotmentForJob(j);
- while ( jlist.hasNext() ) {
- IRmJob j = jlist.next();
- logger.info(methodName, j.getId(), "Scheduling (reserve) job in class ", rc.getName());
+ //
+ // The job passes; give the job a count
+ //
+ int order = j.getShareOrder();
+ logger.info(methodName, j.getId(), "+++++ nodepool", np.getId(), "class", rc.getName(), "order", order, "shares", nSharesToString(granted, order));
+ int[] gbo = NodePool.makeArray();
+ gbo[order] = granted; // what we are allowed
+ j.setGivenByOrder(gbo);
- if ( j.countNShares() > 0 ) {
- logger.info(methodName, j.getId(), "Already scheduled with ", j.countInstances(), "shares");
- continue;
- }
+ np.countOutNSharesByOrder(order, granted - j.countNShares());
+ }
- int order = j.getShareOrder(); // memory, coverted to order, so we can find stuff
- int nrequested = j.countInstances(); // in machines
-
- if ( np.countLocalMachines() == 0 ) {
- schedulingUpdate.defer(j, "Reservation deferred because resources are exhausted.");
- logger.warn(methodName, j.getId(), "Job asks for "
- + nrequested
- + " reserved machines but reservable resources are exhausted for nodepool "
- + np.getId());
- continue;
- }
+ void countSingleReservation(IRmJob j, ResourceClass rc)
+ {
+ String methodName = "countSingleReservation";
- if ( rc.allotmentExceeded(j) ) { // Does it blow the configured limit for this class?
- schedulingUpdate.defer(j, "Reservation deferred because allotment of " + rc.getAllotment(j) + "GB is exceeded by user " + j.getUserName());
- continue;
- }
-
- logger.info(methodName, j.getId(), "Job is granted " + nrequested + " machines for reservation.");
- //j.addQShares(nrequested * order);
- int[] gbo = NodePool.makeArray();
- gbo[order] = nrequested;
- j.setGivenByOrder(gbo);
-
- int given = 0;
- if ( rc.enforceMemory() ) {
- given = np.countFreeableMachines(j, true);
- } else {
- given = np.countFreeableMachines(j, false);
- }
+ logger.info(methodName, j.getId(), "Counting shares for", j.getShortType() + "." + j.getId(), "in class", rc.getName());
+ NodePool np = rc.getNodepool();
- // The counts worked out but for some reason we can't find / evict enough space
- if ( given == 0 ) {
- schedulingUpdate.defer(j, "Reservation is deferred because insufficient resources are available.");
- if ( rc.enforceMemory() ) {
- logger.warn(methodName, j.getId(), "Reservation deferred: asks for "
- + nrequested
- + " reserved machines with exactly "
- + j.getShareOrder()
- + " shares but there are insufficient freeable machines.");
- } else {
- logger.warn(methodName, j.getId(), "Reservation deferred: ask for "
- + nrequested
- + " reserved machines with at least "
- + j.getShareOrder()
- + " shares but there are insufficient freeable machines.");
- }
- continue;
- }
- }
+ if ( j.countNShares() > 0 ) {
+ logger.info(methodName, j.getId(), "[stable]", "assigned", j.countNShares(), "processes, ",
+ (j.countNShares() * j.getShareOrder()), "QS");
+
+ int[] gbo = NodePool.makeArray();
+
+ gbo[j.getShareOrder()] = 1; // UIMA4275 - only one
+ j.setGivenByOrder(gbo);
+ return;
+ }
+
+ if ( np.countReservables(j) == 0 ) {
+ schedulingUpdate.defer(j, "Deferred because requested memory " + j.getMemory() + " does not match any machine.");
+ logger.warn(methodName, j.getId(), "Deferred because requested memory " + j.getMemory() + " does not match any machine.");
+ return;
+ }
+
+ int order = j.getShareOrder(); // memory, coverted to order, so we can find stuff
+
+ if ( np.countLocalMachines() == 0 ) {
+ schedulingUpdate.defer(j, "Deferred because resources are exhausted.");
+ logger.warn(methodName, j.getId(), "Deferred because resources are exhausted in nodepool " + np.getId());
+ return;
}
+
+ //
+ // Make sure this allocation does not blow the allotment cap.
+ //
+ if ( ! validSingleAllotment(j) ) return; // defers and logs
+
+ logger.info(methodName, j.getId(), "Request is granted a machine for reservation.");
+ int[] gbo = NodePool.makeArray();
+ gbo[order] = 1;
+ j.setGivenByOrder(gbo);
+
+ np.countOutNSharesByOrder(order, 1);
}
/**
@@ -1447,16 +1564,8 @@ public class NodepoolScheduler
continue;
}
- if ( j.countNShares() > 0 ) { // shares already allocated, nothing to do (all-or-nothing policy in effect)
- continue;
- }
-
- if ( j.isCompleted() ) { // UIMA-3614 - may have bene purged, don't give it more
- continue;
- }
-
try {
- np.allocateForReservation(j, rc);
+ np.findMachines(j, rc);
} catch (Exception e) {
logger.error(methodName, j.getId(), "Reservation issues:", e);
continue;
@@ -1469,9 +1578,7 @@ public class NodepoolScheduler
if ( j.countNShares() == 0 ) {
j.setReason("Waiting for preemptions.");
}
-
}
-
}
}
@@ -1486,27 +1593,13 @@ public class NodepoolScheduler
break;
case FIXED_SHARE:
- {
- NodePool np = rc.getNodepool();
- HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
- for ( IRmJob j : jobs.values() ) {
- if ( j.countNShares() > 0 ) { // all-or-nothing - if there's anything, it's fully scheduled.
- HashMap<Share, Share> shares = j.getAssignedShares();
- np.accountForShares(shares);
- }
- }
- }
- break;
-
case RESERVE:
{
NodePool np = rc.getNodepool();
HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
for ( IRmJob j : jobs.values() ) {
- if ( j.countNShares() > 0 ) { // all-or-nothing - if there's anything, it's fully scheduled.
- HashMap<Share, Share> shares = j.getAssignedShares();
- np.accountForShares(shares);
- }
+ HashMap<Share, Share> shares = j.getAssignedShares();
+ np.accountForShares(shares);
}
}
break;
@@ -1522,7 +1615,6 @@ public class NodepoolScheduler
for (ResourceClass rc : resourceClasses.values() ) {
if ( rc.getPolicy() == Policy.FAIR_SHARE ) {
NodePool np = rc.getNodepool();
- // NodePool check = getNodepool(rc);
HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
for ( IRmJob j : jobs.values() ) {
HashMap<Share, Share> shares = j.getAssignedShares();
@@ -1767,25 +1859,24 @@ public class NodepoolScheduler
for ( Machine m : machines.values() ) {
if ( m.getShareOrder() < orderNeeded ) { // nope, too small
- logger.trace(methodName, nj.getId(), "Bypass ", m.getId(), ": too small for request of order", orderNeeded);
+ logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": too small for request of order", orderNeeded);
continue;
}
// if the job is a reservation the machine size has to matchm and machine must be clearable
- if ( nj.isReservation() ) {
+ if ( nj.getSchedulingPolicy() == Policy.RESERVE ) {
if ( m.getShareOrder() != orderNeeded ) {
- logger.trace(methodName, nj.getId(), "Bypass ", m.getId(), ": reservation requires exact match for order", orderNeeded);
+ logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": RESERVE policy requires exact match for order", orderNeeded);
continue;
}
// machine must be clearable as well
Collection<Share> shares = m.getActiveShares().values();
for ( Share s : shares ) {
if ( ! candidateJobs.containsKey(s.getJob()) ) {
- logger.trace(methodName, nj.getId(), "Bypass ", m.getId(), ": for reservation, machine contains non-candidate job", s.getJob().getId());
+ logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": for reservation, machine contains non-candidate job", s.getJob().getId());
continue machine_loop;
}
- }
-
+ }
}
Map<Share, Share> as = m.getActiveShares(); // everything alloacated here
@@ -1808,13 +1899,6 @@ public class NodepoolScheduler
// Now eligibleMachines is the set of candidate machines for defrag
- // All-or-nothing policy, can we satisfy the reservation with defrag? If not, we're done.
- if ( nj.isReservation() && ( eligibleMachines.size() < needed ) ) {
- // if we can't clear enough for the reservation we have to wait. Very unlikely, but not impossible.
- logger.info(methodName, nj.getId(), "Found insufficient machines (", eligibleMachines.size(), "for reservation. Not clearing.");
- return 0;
- }
-
logger.info(methodName, nj.getId(), "Found", eligibleMachines.size(), "machines to be searched in this order:");
StringBuffer buf = new StringBuffer();
for ( Machine m : eligibleMachines.keySet() ) {
@@ -1825,7 +1909,7 @@ public class NodepoolScheduler
// first part done, we know where to look.
- // Now just bop through the machines until either we can't find anything, or we find everything.
+ // Now just bop through the machines to see if we can get anything for this specific job (nj)
int given_per_round = 0;
do {
int g = 0;
@@ -2002,10 +2086,13 @@ public class NodepoolScheduler
logger.debug(methodName, nj.getId(), "Job", j.getId(), "is a candidate with processes[", nshares, "] qshares[", qshares, "]");
rich_candidates.put(j, j);
}
- }
+ } // End search for candidate donors
- HashMap<User, TreeMap<IRmJob, IRmJob>> jobs_by_user = new HashMap<User, TreeMap<IRmJob, IRmJob>>(); // use this to track where the wealth originatse
- TreeMap<User, User> users_by_wealth = new TreeMap<User, User>(new UserByWealthSorter()); // orders users by wealth
+ //
+ // Here start looking at 'needy' and trying to match them agains the candidates
+ //
+ HashMap<User, TreeMap<IRmJob, IRmJob>> jobs_by_user = new HashMap<User, TreeMap<IRmJob, IRmJob>>(); // use this to track where the wealth originatses
+ TreeMap<User, User> users_by_wealth = new TreeMap<User, User>(new UserByWealthSorter()); // orders users by wealth
collectWealth(rich_candidates, users_by_wealth, jobs_by_user);
@@ -2042,6 +2129,7 @@ public class NodepoolScheduler
}
}
logger.info(methodName, nj.getId(), "Could not get enough from the rich. Asked for", needy.get(nj), "still needing", needed);
+ nj.setReason("Waiting for defragmentation.");
}
}
@@ -2168,17 +2256,7 @@ public class NodepoolScheduler
continue;
}
- int counted = 0;
- switch ( rc.getPolicy() ) {
- case FAIR_SHARE:
- counted = j.countNSharesGiven(); // fair share allocation, accounting for
- // ramp-up, various caps, etc. could be more, could be less than
- // the "pure" fair share.
- break;
- default:
- counted = j.countInstances(); // fixed, all, or nothing
- break;
- }
+ int counted = j.countNSharesGiven(); // accounting for ramp-up, various caps, etc.
int current = j.countNShares(); // currently allocated, plus pending, less those removed by earlier preemption
int needed = counted - current; // could go negative if its evicting
@@ -2197,8 +2275,8 @@ public class NodepoolScheduler
must_defrag = true;
}
} else { // if not fair-share, must always try to defrag if needed
- if ( j.isCompleted() ) continue; // non-preemptable, this means it has at least once received
// its full allocation, and therefore cannot be needy
+ // UIMA-4275 We rely on quotas to keep 'needed' under control
if ( needed > 0 ) {
jobmap.put(j, needed);
must_defrag = true;
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java Sun Mar 15 14:20:49 2015
@@ -288,6 +288,10 @@ public class ResourceClass
* See if the total memory for job 'j' plus the occupancy of the 'jobs' exceeds 'max'
* Returns 'true' if occupancy is exceeded, else returns 'false'
* UIMA-4275
+ *
+ * NOTE: After discussion this may not be the ideal place for this, in favor of a global
+ * cap on all NPShares, but I suspect it will come back to haunt us that we need
+ * the finer granularity of class-based caps, so the logic is staying here for now
*/
private boolean occupancyExceeded(int max, IRmJob j, Map<IRmJob, IRmJob> jobs)
{
@@ -302,7 +306,7 @@ public class ResourceClass
// Then multiply by the scheduling quantum to convert to GB
occupancy += ( job.countNSharesGiven() * job.getShareOrder() * share_quantum ); // convert to GB
}
- int requested = j.getMemory() * j.countInstances();
+ int requested = j.getMemory() * j.getMaxShares();
if ( max - ( occupancy + requested ) < 0 ) {
return true;
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java Sun Mar 15 14:20:49 2015
@@ -50,7 +50,9 @@ public class RmJob
protected ResourceClass resource_class; // The actual class, assigned as job is received in scheduler.
protected int user_priority; // user "priority", really apportionment
- protected int n_machines; // RESERVE: minimum machines to allocate
+ // @deprecated
+ // protected int n_machines; // RESERVE: minimum machines to allocate
+
protected int max_shares; // FAIR_SHARE: maximum N shares to allocate
protected boolean is_reservation = false;
@@ -267,7 +269,7 @@ public class RmJob
public int queryDemand()
{
if ( getSchedulingPolicy() == Policy.FAIR_SHARE ) return getJobCap();
- return countInstances();
+ return max_shares;
}
/**
@@ -337,6 +339,18 @@ public class RmJob
}
+ // UIMA-4275
+ public int countOccupancy()
+ {
+ if ( (given_by_order == null) || (given_by_order[share_order] == 0) ) {
+ // must use current allocation because we haven't been counted yet
+ return countNShares() * share_order;
+ } else {
+ // new allocation may not match current, so we use that one
+ return given_by_order[share_order] * share_order;
+ }
+ }
+
public int countNSharesGiven()
{
if ( given_by_order == null) { return 0; }
@@ -813,13 +827,13 @@ public class RmJob
*
* @Deprecated. Keeping until I can nuke or update the simple fair share code.
*/
- public void shrinkTo(int k)
- {
+ //public void shrinkTo(int k)
+ //{
// int count = assignedShares.size() - k;
// for ( int i = 0; i < count; i++ ) {
// pendingRemoves.add(assignedShares.get(i));
// }
- }
+ //}
/**
* Waiting for somebody to deal with my shrinkage?
@@ -1022,7 +1036,14 @@ public class RmJob
return;
}
- if ( getSchedulingPolicy() != Policy.FAIR_SHARE ) return;
+ if ( isCompleted() ) {
+ // job is finishing up and will relinquish all shares soon, let's avoid complicating the
+ // world and just wait for it to happen.
+ job_cap = countNShares();
+ return;
+ }
+
+ // if ( getSchedulingPolicy() != Policy.FAIR_SHARE ) return;
int c = nquestions_remaining / threads;
@@ -1166,14 +1187,16 @@ public class RmJob
return resource_class;
}
- public int countInstances() {
- return n_machines;
- }
+ // @deprecated
+ //public int countInstances() {
+ // return n_machines;
+ //}
- public void setNInstances(int m)
- {
- this.n_machines = m;
- }
+ // @deprecated
+ // public void setNInstances(int m)
+ // {
+ // this.n_machines = m;
+ // }
public int nThreads() {
return threads;
@@ -1268,7 +1291,7 @@ public class RmJob
return buf.toString();
}
- String getShortType()
+ public String getShortType()
{
String st = "?";
switch ( ducc_type ) {
@@ -1313,7 +1336,7 @@ public class RmJob
shares, share_order, (shares * share_order), // 5 6 7
0, memory, // 8 9
0, 0, 0, // 10 11 12
- n_machines); // 13
+ max_shares); // 13
} else {
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Sun Mar 15 14:20:49 2015
@@ -614,14 +614,21 @@ public class Scheduler
String val = ((String) dp.get(l)).trim();
int lim = Integer.parseInt( val ); // verified parsable int during parsing
- String[] tmp = ((String)l).split("\\."); // max_allotment.classname
+
+
User user = users.get(n);
if (user == null) {
user = new User(n);
users.put(n, user);
}
- ResourceClass rc = resourceClassesByName.get(tmp[1]);
- user.overrideLimit(rc, lim); // constrain allotment for this class to value in l
+
+ if ( val.contains(".") ) {
+ String[] tmp = ((String)l).split("\\."); // max_allotment.classname
+ ResourceClass rc = resourceClassesByName.get(tmp[1]);
+ user.overrideLimit(rc, lim); // constrain allotment for this class to value in l
+ } else {
+ user.overrideGlobalLimit(lim);
+ }
}
}
@@ -735,9 +742,9 @@ public class Scheduler
HashMap<Share, Share> sharesE = j.getAssignedShares();
HashMap<Share, Share> sharesN = j.getPendingShares();
- if ( sharesE.size() == j.countInstances() ) {
+ if ( sharesE.size() == j.getMaxShares() ) {
logger.trace(methodName, j.getId(), "reserve_stable", sharesE.size(), "machines");
- } else if ( sharesN.size() == j.countInstances() ) { // reservation is complete but not yet confirmed?
+ } else if ( sharesN.size() == j.getMaxShares() ) { // reservation is complete but not yet confirmed?
logger.trace(methodName, j.getId(), "reserve_adding", sharesN.size(), "machines");
for ( Share s : sharesN.values()) {
logger.trace(methodName, j.getId(), " reserve_expanding ", s.toString());
@@ -745,7 +752,7 @@ public class Scheduler
jmu.addShares(j, sharesN);
j.promoteShares();
} else {
- logger.trace(methodName, j.getId(), "reserve_pending", j.countInstances(), "machines");
+ logger.trace(methodName, j.getId(), "reserve_pending", j.getMaxShares(), "machines");
}
logger.trace(methodName, j.getId(), "<<<<<<<<<<");
}
@@ -1201,7 +1208,7 @@ public class Scheduler
np.getOnlineByOrder(onlineMachines);
for ( int i = 1; i < freeMachines.length; i++ ) {
- freeMachines[i] += np.countFreeMachines(i, true);
+ freeMachines[i] += np.countFreeMachines(i);
}
int[] t = np.cloneVMachinesByOrder();
@@ -1486,8 +1493,6 @@ public class Scheduler
logger.info(methodName, j.getId(), "Set fixed bit for FIXED job");
s.setShareOrder(share_order);
s.setFixed();
- j.markComplete(); // in recovery: if there are any shares at all for this job
- // we know it once had all its shares so its allocation is complete
if ( !compatibleNodepool(s, j) ) { // UIMA-4142
if ( j.isService() ) {
sharesToShrink.add(s); // nodepool reconfig snafu, SM will reallocate the process
@@ -1499,8 +1504,6 @@ public class Scheduler
case RESERVE:
logger.info(methodName, j.getId(), "Set fixed bit for RESERVE job");
s.setFixed();
- j.markComplete(); // in recovery: if there are any shares at all for this job
- // we know it once had all its shares so its allocation is complete
if ( j.isService() && !compatibleNodepool(s, j) ) { // UIMA-4142
sharesToShrink.add(s); // nodepool reconfig snafu, SM will reallocate the process
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java Sun Mar 15 14:20:49 2015
@@ -22,6 +22,8 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
+import org.apache.uima.ducc.rm.scheduler.SchedConstants.Policy;
+
public class User
implements IEntity
{
@@ -32,6 +34,7 @@ public class User
private Map<Integer, Map<IRmJob, IRmJob>> jobsByOrder = new HashMap<Integer, Map<IRmJob, IRmJob>>();
private Map<ResourceClass, Integer> classLimits = new HashMap<ResourceClass, Integer>(); // UIMA-4275
+ private int globalLimit = -1; // use global limit by default;
//private int user_shares; // number of shares to apportion to jobs in this user in current epoch
private int pure_fair_share; // uncapped un-bonused counts
@@ -51,12 +54,41 @@ public class User
return 0;
}
- // UIMA-4275
+ // UIMA-4275, class-based limit
void overrideLimit(ResourceClass rc, int lim)
{
classLimits.put(rc, lim);
}
+ // UIMA-4275 Global NPshare limit override from registry
+ void overrideGlobalLimit(int lim)
+ {
+ globalLimit = lim;
+ }
+
+ // UIMA-4275 Get the override on the global limit
+ int getOverrideLimit()
+ {
+ return globalLimit;
+ }
+
+
+ // UIMA-4275, count all Non-Preemptable shares for this user, quantum shares
+ int countNPShares()
+ {
+ int occupancy = 0;
+ for ( IRmJob j : jobs.values() ) {
+ if ( j.getSchedulingPolicy() != Policy.FAIR_SHARE ) {
+ // nshares_given is shares counted out for the job but maybe not assigned
+ // nshares is shares given
+ // share_order is used to convert nshares to qshares so
+ // so ( nshares_give + nshares ) * share_order is the current potential occupancy of the job
+ occupancy += ( j.countOccupancy() );
+ }
+ }
+ return occupancy;
+ }
+
int getClassLimit(ResourceClass rc)
{
if ( classLimits.containsKey(rc) ) return classLimits.get(rc);