You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2013/03/19 15:31:02 UTC

svn commit: r1458311 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler: NodepoolScheduler.java Scheduler.java

Author: challngr
Date: Tue Mar 19 14:31:02 2013
New Revision: 1458311

URL: http://svn.apache.org/r1458311
Log:
UIMA-2667
Rewrite the 'takeFromTheRich' routine for defragmentation. 
Also fixes a 1-line bug in the counting method apportion_qshares, a loop that terminated incorrectly,
causing a divisor to go negative, and hence class/user/job counts to go negative, under high load.

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java?rev=1458311&r1=1458310&r2=1458311&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java Tue Mar 19 14:31:02 2013
@@ -459,6 +459,7 @@ public class NodepoolScheduler
                         allweights -= e.getShareWeight();
                     }
                 }                
+                if ( allweights <=0 ) break;   // JRC JRC
             }
 
             // Remove entities that have everything they want
@@ -1529,113 +1530,137 @@ public class NodepoolScheduler
      *                        ALL the user's jobs, we will have culled everything that makes no sense to
      *                        take from in the caller.
      *
-     * @return THe number of processes recovered.
+     * @return The number of processes we found space for.  Note this could be different from the number
+     *         of processes evicted, if it took more than one eviction to make spece.  Also We may have
+     *         evicted a process smaller than is needed, because there was already some free space on
+     *         the machine.
      */
     int takeFromTheRich(IRmJob nj, int needed,
                             TreeMap<User, User> users_by_wealth,
                             HashMap<User, TreeMap<IRmJob, IRmJob>> jobs_by_user)
     {
     	String methodName = "takeFromTheRich";
-        int given = 0;
-
-        logger.debug(methodName, nj.getId(), "needed[", needed, "]");
+        // 1. Collect all machines that have shares, which if evicted, would make enough space
+        //    - in compatible NP
+        //    - g + sum(shares belonging to rich users on the machine);
+        // 2. Order the machiens by 
+        //    a) richest user
+        //    b) largest machine
+        // 3. Pick next machine,
+        //    - clear enough shares
+        //    - remove machine from list
+        //    - update wealth
+        // 4. Repeat at 2 until
+        //    a) have given what is needed
+        //    b) nothing left to give
+
+        // Map<Share, Share>     exemptShares = new HashMap<Share, Share>(); // not eligible for various reasons
+        Map<IRmJob, IRmJob>   candidateJobs = new HashMap<IRmJob, IRmJob>();
+        Map<Machine, Machine> eligibleMachines = new TreeMap<Machine, Machine>(new EligibleMachineSorter());
 
-        TreeMap<IRmJob, IRmJob> job_set  = new TreeMap<IRmJob, IRmJob>(new JobByShareSorter());    // the collection of rich users jobs to take from
-        Map<Share, Share>   shares   = new TreeMap<Share, Share>(new FinalEvictionSorter());                
-        Map<Share, Share>   removed  = new HashMap<Share, Share>();
-
-        List<User> allUsers = new ArrayList<User>();                                           // for debug
-
-        for ( User next_user : users_by_wealth.keySet() ) {
-            job_set.putAll(jobs_by_user.get(next_user));                                       // on each iter, the set of jobs with candidates grows
-
-            allUsers.add(next_user);
-            logger.debug(methodName, nj.getId(), "Donating users:", allUsers);
-            logger.debug(methodName, nj.getId(), "Donating jobs:", listJobSet(job_set));
-
-            Map<IRmJob, IRmJob> donorJobs   = new HashMap<IRmJob, IRmJob>();
-            List<Share>         donorShares = new ArrayList<Share>();
-            boolean shares_found = false;
-            do {
-                shares_found = false;
-                IRmJob rich_j = job_set.firstKey();                                             // de rrrichest kind
-                
-                logger.debug(methodName, nj.getId(), "Inspecting job", rich_j.getId());
-                //
-                // First lets see if something is pending and we can just reassign it.  Nobody knows
-                // about this share but RM yet, so it's safe to do this.
-                //
-                shares.putAll(rich_j.getPendingShares());                                       // each new job makes the candidate set richer
-                shares.putAll(rich_j.getAssignedShares());
-                removed.putAll(rich_j.getPendingRemoves());
-
-                for ( Share s : shares.keySet() ) {
+        for ( TreeMap<IRmJob, IRmJob> jobs : jobs_by_user.values() ) {
+            candidateJobs.putAll(jobs);
+        }
 
-                    if ( removed.containsKey(s) ) continue;                                    // already gone, ignore it
-                    
-                    if ( ! compatibleNodepools(s, nj) ) {
-                        logger.trace(methodName, nj.getId(), "Bypassing pending share", s.toString(), "becaose of incompatible nodepool");
-                        continue;
-                    }
-                    logger.trace(methodName, nj.getId(), "Pending share", s.toString(), "is compatible with class", nj.getResourceClass().getName());
+        int given = 0;
+        int orderNeeded = nj.getShareOrder();
+        
+        ResourceClass cl = nj.getResourceClass();
+        String npname = cl.getNodepoolName();
+        NodePool np = globalNodepool.getSubpool(npname);
+        Map<Node, Machine> machines = np.getAllMachines();          // everything here is a candidate, nothing else is
+
+        for ( Machine m : machines.values() ) {
+            if ( m.getShareOrder() < orderNeeded ) {
+                // logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": too small for request of order", orderNeeded);
+                continue;
+            }
+            Map<Share, Share> as = m.getActiveShares();
+            int g = m.getVirtualShareOrder();
+            for ( Share s : as.values() ) {
+                IRmJob j = s.getJob();
+                if ( s.isForceable() && candidateJobs.containsKey(j) ) {
+                    g += j.getShareOrder();
+                }
+            }
+            if ( g >= orderNeeded ) {
+                logger.trace(methodName, nj.getId(), "Candidate machine:", m.getId());
+                eligibleMachines.put(m, m);
+            } else {
+                // (a) the share is not forceable (non-preemptbable, or already being removed), or
+                // (b) the share is not owned by a rich job
+                logger.trace(methodName, nj.getId(), "Not a candidate, insufficient rich jobs:", m.getId());
+            }
+        }
+        logger.debug(methodName, nj.getId(), "Found", eligibleMachines.size(), "machines to be searched in this order:");
+        for ( Machine m : eligibleMachines.keySet() ) {
+            logger.debug(methodName, nj.getId(), "Eligible machine:", m.getId());
+        }
+        // first part done
+
+        // Now just bop through the machines until either we can't find anything, or we find everything.
+        int given_per_round = 0;
+        do {
+            int g = 0;
+            for ( Machine m : eligibleMachines.keySet() ) {
+                HashMap<Share, Share> sh = m.getActiveShares();
+                g = m.getVirtualShareOrder();
+                List<Share> potentialShares     = new ArrayList<Share>();
+                for ( Share s : sh.values() ) {
+                    IRmJob j = s.getJob();
+                    User u = j.getUser();
                     
-                    Machine m = s.getMachine();
-                    // clear everything from this machine from jobs in the current job set
-                    if ( m.getShareOrder() < nj.getShareOrder() ) {
-                        logger.debug(methodName, nj.getId(), "Bypassing pending share", s.toString(), "because machine size[", m.getShareOrder(), "< job order[", nj.getShareOrder());
-                        continue;                  // job will never fit here
-                    }
-                    Map<Share, Share> activeShares = m.getActiveShares();                    // shares on this machine
-                    List<Share> candidates = new ArrayList<Share>();
-                    int total_available = m.getVirtualShareOrder();                          // start with free space
-
-                    logger.debug(methodName, nj.getId(), "Machine", m.getId(), "activeShares", activeShares.keySet(), "total_available", total_available); 
-                    for ( Share as : activeShares.values() ) {
-                        IRmJob tentative = as.getJob();
-                        if ( job_set.containsKey(tentative) ) {                              // share belong to a rich j?
-                            donorJobs.put(tentative, tentative);
-                            candidates.add(as);                                              // yes it's a candidate
-                            total_available += as.getShareOrder();                           // add in shares that might work
-                            //logger.debug(methodName, nj.getId(), "share", as.getId(), "in job", tentative.getId(), "is a candidate");
-                        } else {
-                            //logger.debug(methodName, nj.getId(), "share", as.getId(), "in job", tentative.getId(), "is not a candidate");
-                        }
-                    }
-
-                    int nj_shares = total_available / nj.getShareOrder();                    // figure how many nj-sized shares we can get
-                    nj_shares = Math.min(needed, nj_shares);                                 // cap on actual needed
-                    logger.debug(methodName, nj.getId(), "Machine", m.getId(), "total_available", total_available, "nj_shares", nj_shares);
-                    if ( nj_shares > 0 ) {                                                   // if it works, pull candidates until we have enough
-                        int g = m.getVirtualShareOrder();
-                        for ( Share cs : candidates ) {
-                            if ( clearShare(cs, nj) ) removed.put(cs, cs);                      // return TRUE if evicted, and FALSE if reassigned to nj
-                            User u = rich_j.getUser();
-                            u.subtractWealth(s.getShareOrder());
-                            donorShares.add(cs);
-                            g += cs.getShareOrder();
-                            shares_found = true;
-                            if ( (g / nj.getShareOrder() ) >= nj_shares ) {
-                                break;
+                    if ( s.isForceable() ) {
+                        TreeMap<IRmJob, IRmJob> potentialJobs = jobs_by_user.get(u);
+                        if ( (potentialJobs != null) && ( potentialJobs.containsKey(j) ) ) {
+                            g += s.getShareOrder();
+                            if ( s.getShareOrder() == orderNeeded ) {
+                                potentialShares.add(0, s);    // exact matches first
+                            } else {
+                                potentialShares.add(s);
                             }
                         }
                     }
-                    given += nj_shares;
-                    needed -= nj_shares;
-                    if ( needed == 0 ) return given;
+                    if ( g >= orderNeeded ) break;
                 }
+                
+                if ( g >= orderNeeded ) {
+                    // found enough on this machine for 1 share!
+                    logger.debug(methodName, nj.getId(), "Clearing shares: g[", g, "], orderNeeded[", orderNeeded, "]");
+                    g = m.getVirtualShareOrder();             // reset
+                    for ( Share s : potentialShares ) {
+                        IRmJob j = s.getJob();
+                        User u = j.getUser();
+
+                        g += s.getShareOrder();
+                        given_per_round++;
+                        clearShare(s, nj);
+                        u.subtractWealth(s.getShareOrder());
+                        logger.debug(methodName, nj.getId(), "Clearing share", s, "order[", s.getShareOrder(),
+                                     "]: g[", g, "], orderNeeded[", orderNeeded, "]");
+                        if ( g >= orderNeeded) break; // inner loop, could break on exact match without giving everything away
+                    }
+                    break;                            // outer loop, if anything was found
+                }       
+            }
 
-                // TODO: This can't happen inside the loop
-                for ( IRmJob j : donorJobs.values() ) {                                         // rebalance tree on job wealth after possible removal
-                    job_set.remove(j);
-                    job_set.put(j, j);
+            if ( given_per_round > 0 ) {
+                // Must reorder the eligible list to get the "next" best candidate.  We could try to remove 
+                // machines that were exhausted above ...
+                Map<Machine, Machine> tmp = new HashMap<Machine, Machine>();
+                tmp.putAll(eligibleMachines);
+                eligibleMachines.clear();
+                for ( Machine m : tmp.keySet() ) {
+                    eligibleMachines.put(m, m);
                 }
 
-                for ( Share s : donorShares ) {                                        // given away, and removed from consideration
-                    shares.remove(s);
-                }
-            } while (shares_found && ( given < needed) );
-        }                        
+                // and also must track how many processes we ma made space for
+                given = given + (g / orderNeeded);    // at least one,or else we have a bug 
+                logger.debug(methodName, nj.getId(), "LOOPEND: given[", given, "] g[", g, "] orderNeeded[", orderNeeded, "]");
+            }
 
+        } while ( (given_per_round > 0) && ( given < needed ));
+        
         return given;
     }
 
@@ -2281,31 +2306,70 @@ public class NodepoolScheduler
     // This is a sorter for a tree map so we have to be sure not to return equality unless the objects
     // are the same objects.
     //
-    static private class FinalEvictionSorter
-        implements Comparator<Share>
+//    static private class FinalEvictionSorter
+//        implements Comparator<Share>
+//    {
+//        
+//        public int compare(Share s1, Share s2)
+//        {
+//            if ( s1 == s2 ) return 0;
+//
+//            // pending shares first, no point expanding them if we don't have to
+//            if ( s1.isPending() && s2.isPending() ) return -1;            
+//            if ( s1.isPending() ) return -1;
+//            if  (s2.isPending() ) return 1;
+//
+//            // Shares on machines with more space first, deal with defrag, which is why we're here
+//            int vso1 = s1.getMachine().countFreedUpShares();
+//            int vso2 = s2.getMachine().countFreedUpShares();
+//
+//            if ( vso1 != vso2 ) {
+//                return vso2 - vso1;      // (more space first)
+//            }
+//
+//            // All else being equal, use investment
+//            int inv =  (int) (s1.getInvestment() - s2.getInvestment());  
+//            if ( inv == 0 ) return -1;                  // careful not to return 0
+//            return inv;
+//        }
+//    }
+
+    //
+    // Sort machines for defrag.
+    // a) machines with richest users first
+    // b) largest machine second
+    //
+    static private class EligibleMachineSorter
+        implements Comparator<Machine>
     {
         
-        public int compare(Share s1, Share s2)
+        public int compare(Machine m1, Machine m2)
         {
-            if ( s1 == s2 ) return 0;
+            if ( m1 == m2 ) return 0;       
 
-            // pending shares first, no point expanding them if we don't have to
-            if ( s1.isPending() && s2.isPending() ) return -1;            
-            if ( s1.isPending() ) return -1;
-            if  (s2.isPending() ) return 1;
+            int m1wealth = 0;
+            int m2wealth = 0;
+            Map<Share, Share> sh1 = m1.getActiveShares();
+            for ( Share s : sh1.values() ) {
+                IRmJob j = s.getJob();
+                User u = j.getUser();
+                m1wealth = Math.max(m1wealth, u.getShareWealth());
+            }
+ 
+            Map<Share, Share> sh2 = m2.getActiveShares();
+            for ( Share s : sh2.values() ) {
+                IRmJob j = s.getJob();
+                User u = j.getUser();
+                m2wealth = Math.max(m2wealth, u.getShareWealth());
+            }
 
-            // Shares on machines with more space first, deal with defrag, which is why we're here
-            int vso1 = s1.getMachine().countFreedUpShares();
-            int vso2 = s2.getMachine().countFreedUpShares();
+            if ( m1wealth != m2wealth ) return m2wealth - m1wealth;       // richest uesr first
 
-            if ( vso1 != vso2 ) {
-                return vso2 - vso1;      // (more space first)
-            }
+            long m1mem = m1.getMemory();
+            long m2mem = m2.getMemory();
 
-            // All else being equal, use investment
-            int inv =  (int) (s1.getInvestment() - s2.getInvestment());  
-            if ( inv == 0 ) return -1;                  // careful not to return 0
-            return inv;
+            if ( m1mem == m2mem ) return -1;       // for tree map, must not return 0 unless same object
+            return (int) (m2mem - m1mem);          // largest machine first.
         }
     }
 

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1458311&r1=1458310&r2=1458311&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Tue Mar 19 14:31:02 2013
@@ -303,16 +303,20 @@ public class Scheduler
      * We only get one such name, so we give up the search if we find
      * it.
      */
+    static String cached_domain = null;
     private String getDomainName()
     {
     	String methodName = "getDomainName";
+
+        if ( cached_domain != null ) return cached_domain;
         try {
 			NodeIdentity ni   = new NodeIdentity();
 			for ( IIdentity id : ni.getNodeIdentities()) {
 			    String n = id.getName();
 			    int ndx = n.indexOf(".");
 			    if ( ndx > 0 ) {
-			        return n.substring(ndx + 1);
+			        cached_domain =  n.substring(ndx + 1);
+                    return cached_domain;
 			    }
 			}
 		} catch (Exception e) {