You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2013/03/19 15:31:02 UTC
svn commit: r1458311 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler:
NodepoolScheduler.java Scheduler.java
Author: challngr
Date: Tue Mar 19 14:31:02 2013
New Revision: 1458311
URL: http://svn.apache.org/r1458311
Log:
UIMA-2667
Rewrite the 'takeFromTheRich' routine for defragmentation.
Also fixes a 1-line bug in the counting method apportion_qshares, a loop that terminated incorrectly,
causing a divisor to go negative, and hence class/user/job counts to go negative, under high load.
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java?rev=1458311&r1=1458310&r2=1458311&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java Tue Mar 19 14:31:02 2013
@@ -459,6 +459,7 @@ public class NodepoolScheduler
allweights -= e.getShareWeight();
}
}
+ if ( allweights <=0 ) break; // JRC JRC
}
// Remove entities that have everything they want
@@ -1529,113 +1530,137 @@ public class NodepoolScheduler
* ALL the user's jobs, we will have culled everything that makes no sense to
* take from in the caller.
*
- * @return THe number of processes recovered.
+ * @return The number of processes we found space for. Note this could be different from the number
+ * of processes evicted, if it took more than one eviction to make spece. Also We may have
+ * evicted a process smaller than is needed, because there was already some free space on
+ * the machine.
*/
int takeFromTheRich(IRmJob nj, int needed,
TreeMap<User, User> users_by_wealth,
HashMap<User, TreeMap<IRmJob, IRmJob>> jobs_by_user)
{
String methodName = "takeFromTheRich";
- int given = 0;
-
- logger.debug(methodName, nj.getId(), "needed[", needed, "]");
+ // 1. Collect all machines that have shares, which if evicted, would make enough space
+ // - in compatible NP
+ // - g + sum(shares belonging to rich users on the machine);
+ // 2. Order the machiens by
+ // a) richest user
+ // b) largest machine
+ // 3. Pick next machine,
+ // - clear enough shares
+ // - remove machine from list
+ // - update wealth
+ // 4. Repeat at 2 until
+ // a) have given what is needed
+ // b) nothing left to give
+
+ // Map<Share, Share> exemptShares = new HashMap<Share, Share>(); // not eligible for various reasons
+ Map<IRmJob, IRmJob> candidateJobs = new HashMap<IRmJob, IRmJob>();
+ Map<Machine, Machine> eligibleMachines = new TreeMap<Machine, Machine>(new EligibleMachineSorter());
- TreeMap<IRmJob, IRmJob> job_set = new TreeMap<IRmJob, IRmJob>(new JobByShareSorter()); // the collection of rich users jobs to take from
- Map<Share, Share> shares = new TreeMap<Share, Share>(new FinalEvictionSorter());
- Map<Share, Share> removed = new HashMap<Share, Share>();
-
- List<User> allUsers = new ArrayList<User>(); // for debug
-
- for ( User next_user : users_by_wealth.keySet() ) {
- job_set.putAll(jobs_by_user.get(next_user)); // on each iter, the set of jobs with candidates grows
-
- allUsers.add(next_user);
- logger.debug(methodName, nj.getId(), "Donating users:", allUsers);
- logger.debug(methodName, nj.getId(), "Donating jobs:", listJobSet(job_set));
-
- Map<IRmJob, IRmJob> donorJobs = new HashMap<IRmJob, IRmJob>();
- List<Share> donorShares = new ArrayList<Share>();
- boolean shares_found = false;
- do {
- shares_found = false;
- IRmJob rich_j = job_set.firstKey(); // de rrrichest kind
-
- logger.debug(methodName, nj.getId(), "Inspecting job", rich_j.getId());
- //
- // First lets see if something is pending and we can just reassign it. Nobody knows
- // about this share but RM yet, so it's safe to do this.
- //
- shares.putAll(rich_j.getPendingShares()); // each new job makes the candidate set richer
- shares.putAll(rich_j.getAssignedShares());
- removed.putAll(rich_j.getPendingRemoves());
-
- for ( Share s : shares.keySet() ) {
+ for ( TreeMap<IRmJob, IRmJob> jobs : jobs_by_user.values() ) {
+ candidateJobs.putAll(jobs);
+ }
- if ( removed.containsKey(s) ) continue; // already gone, ignore it
-
- if ( ! compatibleNodepools(s, nj) ) {
- logger.trace(methodName, nj.getId(), "Bypassing pending share", s.toString(), "becaose of incompatible nodepool");
- continue;
- }
- logger.trace(methodName, nj.getId(), "Pending share", s.toString(), "is compatible with class", nj.getResourceClass().getName());
+ int given = 0;
+ int orderNeeded = nj.getShareOrder();
+
+ ResourceClass cl = nj.getResourceClass();
+ String npname = cl.getNodepoolName();
+ NodePool np = globalNodepool.getSubpool(npname);
+ Map<Node, Machine> machines = np.getAllMachines(); // everything here is a candidate, nothing else is
+
+ for ( Machine m : machines.values() ) {
+ if ( m.getShareOrder() < orderNeeded ) {
+ // logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": too small for request of order", orderNeeded);
+ continue;
+ }
+ Map<Share, Share> as = m.getActiveShares();
+ int g = m.getVirtualShareOrder();
+ for ( Share s : as.values() ) {
+ IRmJob j = s.getJob();
+ if ( s.isForceable() && candidateJobs.containsKey(j) ) {
+ g += j.getShareOrder();
+ }
+ }
+ if ( g >= orderNeeded ) {
+ logger.trace(methodName, nj.getId(), "Candidate machine:", m.getId());
+ eligibleMachines.put(m, m);
+ } else {
+ // (a) the share is not forceable (non-preemptbable, or already being removed), or
+ // (b) the share is not owned by a rich job
+ logger.trace(methodName, nj.getId(), "Not a candidate, insufficient rich jobs:", m.getId());
+ }
+ }
+ logger.debug(methodName, nj.getId(), "Found", eligibleMachines.size(), "machines to be searched in this order:");
+ for ( Machine m : eligibleMachines.keySet() ) {
+ logger.debug(methodName, nj.getId(), "Eligible machine:", m.getId());
+ }
+ // first part done
+
+ // Now just bop through the machines until either we can't find anything, or we find everything.
+ int given_per_round = 0;
+ do {
+ int g = 0;
+ for ( Machine m : eligibleMachines.keySet() ) {
+ HashMap<Share, Share> sh = m.getActiveShares();
+ g = m.getVirtualShareOrder();
+ List<Share> potentialShares = new ArrayList<Share>();
+ for ( Share s : sh.values() ) {
+ IRmJob j = s.getJob();
+ User u = j.getUser();
- Machine m = s.getMachine();
- // clear everything from this machine from jobs in the current job set
- if ( m.getShareOrder() < nj.getShareOrder() ) {
- logger.debug(methodName, nj.getId(), "Bypassing pending share", s.toString(), "because machine size[", m.getShareOrder(), "< job order[", nj.getShareOrder());
- continue; // job will never fit here
- }
- Map<Share, Share> activeShares = m.getActiveShares(); // shares on this machine
- List<Share> candidates = new ArrayList<Share>();
- int total_available = m.getVirtualShareOrder(); // start with free space
-
- logger.debug(methodName, nj.getId(), "Machine", m.getId(), "activeShares", activeShares.keySet(), "total_available", total_available);
- for ( Share as : activeShares.values() ) {
- IRmJob tentative = as.getJob();
- if ( job_set.containsKey(tentative) ) { // share belong to a rich j?
- donorJobs.put(tentative, tentative);
- candidates.add(as); // yes it's a candidate
- total_available += as.getShareOrder(); // add in shares that might work
- //logger.debug(methodName, nj.getId(), "share", as.getId(), "in job", tentative.getId(), "is a candidate");
- } else {
- //logger.debug(methodName, nj.getId(), "share", as.getId(), "in job", tentative.getId(), "is not a candidate");
- }
- }
-
- int nj_shares = total_available / nj.getShareOrder(); // figure how many nj-sized shares we can get
- nj_shares = Math.min(needed, nj_shares); // cap on actual needed
- logger.debug(methodName, nj.getId(), "Machine", m.getId(), "total_available", total_available, "nj_shares", nj_shares);
- if ( nj_shares > 0 ) { // if it works, pull candidates until we have enough
- int g = m.getVirtualShareOrder();
- for ( Share cs : candidates ) {
- if ( clearShare(cs, nj) ) removed.put(cs, cs); // return TRUE if evicted, and FALSE if reassigned to nj
- User u = rich_j.getUser();
- u.subtractWealth(s.getShareOrder());
- donorShares.add(cs);
- g += cs.getShareOrder();
- shares_found = true;
- if ( (g / nj.getShareOrder() ) >= nj_shares ) {
- break;
+ if ( s.isForceable() ) {
+ TreeMap<IRmJob, IRmJob> potentialJobs = jobs_by_user.get(u);
+ if ( (potentialJobs != null) && ( potentialJobs.containsKey(j) ) ) {
+ g += s.getShareOrder();
+ if ( s.getShareOrder() == orderNeeded ) {
+ potentialShares.add(0, s); // exact matches first
+ } else {
+ potentialShares.add(s);
}
}
}
- given += nj_shares;
- needed -= nj_shares;
- if ( needed == 0 ) return given;
+ if ( g >= orderNeeded ) break;
}
+
+ if ( g >= orderNeeded ) {
+ // found enough on this machine for 1 share!
+ logger.debug(methodName, nj.getId(), "Clearing shares: g[", g, "], orderNeeded[", orderNeeded, "]");
+ g = m.getVirtualShareOrder(); // reset
+ for ( Share s : potentialShares ) {
+ IRmJob j = s.getJob();
+ User u = j.getUser();
+
+ g += s.getShareOrder();
+ given_per_round++;
+ clearShare(s, nj);
+ u.subtractWealth(s.getShareOrder());
+ logger.debug(methodName, nj.getId(), "Clearing share", s, "order[", s.getShareOrder(),
+ "]: g[", g, "], orderNeeded[", orderNeeded, "]");
+ if ( g >= orderNeeded) break; // inner loop, could break on exact match without giving everything away
+ }
+ break; // outer loop, if anything was found
+ }
+ }
- // TODO: This can't happen inside the loop
- for ( IRmJob j : donorJobs.values() ) { // rebalance tree on job wealth after possible removal
- job_set.remove(j);
- job_set.put(j, j);
+ if ( given_per_round > 0 ) {
+ // Must reorder the eligible list to get the "next" best candidate. We could try to remove
+ // machines that were exhausted above ...
+ Map<Machine, Machine> tmp = new HashMap<Machine, Machine>();
+ tmp.putAll(eligibleMachines);
+ eligibleMachines.clear();
+ for ( Machine m : tmp.keySet() ) {
+ eligibleMachines.put(m, m);
}
- for ( Share s : donorShares ) { // given away, and removed from consideration
- shares.remove(s);
- }
- } while (shares_found && ( given < needed) );
- }
+ // and also must track how many processes we ma made space for
+ given = given + (g / orderNeeded); // at least one,or else we have a bug
+ logger.debug(methodName, nj.getId(), "LOOPEND: given[", given, "] g[", g, "] orderNeeded[", orderNeeded, "]");
+ }
+ } while ( (given_per_round > 0) && ( given < needed ));
+
return given;
}
@@ -2281,31 +2306,70 @@ public class NodepoolScheduler
// This is a sorter for a tree map so we have to be sure not to return equality unless the objects
// are the same objects.
//
- static private class FinalEvictionSorter
- implements Comparator<Share>
+// static private class FinalEvictionSorter
+// implements Comparator<Share>
+// {
+//
+// public int compare(Share s1, Share s2)
+// {
+// if ( s1 == s2 ) return 0;
+//
+// // pending shares first, no point expanding them if we don't have to
+// if ( s1.isPending() && s2.isPending() ) return -1;
+// if ( s1.isPending() ) return -1;
+// if (s2.isPending() ) return 1;
+//
+// // Shares on machines with more space first, deal with defrag, which is why we're here
+// int vso1 = s1.getMachine().countFreedUpShares();
+// int vso2 = s2.getMachine().countFreedUpShares();
+//
+// if ( vso1 != vso2 ) {
+// return vso2 - vso1; // (more space first)
+// }
+//
+// // All else being equal, use investment
+// int inv = (int) (s1.getInvestment() - s2.getInvestment());
+// if ( inv == 0 ) return -1; // careful not to return 0
+// return inv;
+// }
+// }
+
+ //
+ // Sort machines for defrag.
+ // a) machines with richest users first
+ // b) largest machine second
+ //
+ static private class EligibleMachineSorter
+ implements Comparator<Machine>
{
- public int compare(Share s1, Share s2)
+ public int compare(Machine m1, Machine m2)
{
- if ( s1 == s2 ) return 0;
+ if ( m1 == m2 ) return 0;
- // pending shares first, no point expanding them if we don't have to
- if ( s1.isPending() && s2.isPending() ) return -1;
- if ( s1.isPending() ) return -1;
- if (s2.isPending() ) return 1;
+ int m1wealth = 0;
+ int m2wealth = 0;
+ Map<Share, Share> sh1 = m1.getActiveShares();
+ for ( Share s : sh1.values() ) {
+ IRmJob j = s.getJob();
+ User u = j.getUser();
+ m1wealth = Math.max(m1wealth, u.getShareWealth());
+ }
+
+ Map<Share, Share> sh2 = m2.getActiveShares();
+ for ( Share s : sh2.values() ) {
+ IRmJob j = s.getJob();
+ User u = j.getUser();
+ m2wealth = Math.max(m2wealth, u.getShareWealth());
+ }
- // Shares on machines with more space first, deal with defrag, which is why we're here
- int vso1 = s1.getMachine().countFreedUpShares();
- int vso2 = s2.getMachine().countFreedUpShares();
+ if ( m1wealth != m2wealth ) return m2wealth - m1wealth; // richest uesr first
- if ( vso1 != vso2 ) {
- return vso2 - vso1; // (more space first)
- }
+ long m1mem = m1.getMemory();
+ long m2mem = m2.getMemory();
- // All else being equal, use investment
- int inv = (int) (s1.getInvestment() - s2.getInvestment());
- if ( inv == 0 ) return -1; // careful not to return 0
- return inv;
+ if ( m1mem == m2mem ) return -1; // for tree map, must not return 0 unless same object
+ return (int) (m2mem - m1mem); // largest machine first.
}
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1458311&r1=1458310&r2=1458311&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Tue Mar 19 14:31:02 2013
@@ -303,16 +303,20 @@ public class Scheduler
* We only get one such name, so we give up the search if we find
* it.
*/
+ static String cached_domain = null;
private String getDomainName()
{
String methodName = "getDomainName";
+
+ if ( cached_domain != null ) return cached_domain;
try {
NodeIdentity ni = new NodeIdentity();
for ( IIdentity id : ni.getNodeIdentities()) {
String n = id.getName();
int ndx = n.indexOf(".");
if ( ndx > 0 ) {
- return n.substring(ndx + 1);
+ cached_domain = n.substring(ndx + 1);
+ return cached_domain;
}
}
} catch (Exception e) {