You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2015/03/15 15:20:49 UTC

svn commit: r1666803 - in /uima/sandbox/uima-ducc/trunk: src/main/resources/ uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/ uima-ducc-common/src/main/java/org/apache/uima/ducc/common/ uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ uima-ducc...

Author: challngr
Date: Sun Mar 15 14:20:49 2015
New Revision: 1666803

URL: http://svn.apache.org/r1666803
Log:
UIMA-4275 Rework handling of non-preemptive requests.

Modified:
    uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties
    uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/CliFixups.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/NodeConfiguration.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java

Modified: uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties (original)
+++ uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties Sun Mar 15 14:20:49 2015
@@ -346,6 +346,10 @@ ducc.rm.state.update.endpoint=ducc.rm.st
 ducc.rm.state.update.endpoint.type=topic
 # the frequency, relative to or publicatations, at which RM runs a schedule
 ducc.rm.state.publish.ratio = 1
+
+# maximum allotment in GB for Non-preemptable shares - default is unlimited
+#ducc.rm.global_allotment = 360
+
 # Base size of dram quantum in Gb
 ducc.rm.share.quantum = 1
 # Implementation class for actual scheduling algorithm

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/CliFixups.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/CliFixups.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/CliFixups.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/CliFixups.java Sun Mar 15 14:20:49 2015
@@ -40,6 +40,10 @@ public class CliFixups {
                 System.out.println("CLI ignored deprecated option: " + arg);
                 args[i] = null;
                 if (++i < args.length && !args[i].startsWith("--")) args[i] = null; 
+            } else if (arg.equals("--number_of_instances")) {
+                System.out.println("CLI ignored deprecated option: " + arg);
+                args[i] = null;
+                if (++i < args.length && !args[i].startsWith("--")) args[i] = null; 
             }
         }
     }
@@ -53,6 +57,9 @@ public class CliFixups {
             } else if (key.equals("classpath_order")) {
                 props.remove(key);
                 System.out.println("CLI ignored deprecated option: " + key);
+            } else if (key.equals("number_of_instances")) {
+                props.remove(key);
+                System.out.println("CLI ignored deprecated option: " + key);
             }
         }
     }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/NodeConfiguration.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/NodeConfiguration.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/NodeConfiguration.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/NodeConfiguration.java Sun Mar 15 14:20:49 2015
@@ -104,7 +104,6 @@ public class NodeConfiguration
         defaultFairShareClass.put("use-prediction", ""+SystemPropertyResolver.getBooleanProperty("ducc.rm.prediction", true));
         defaultFairShareClass.put("prediction-fudge", ""+SystemPropertyResolver.getIntProperty("ducc.rm.prediction.fudge", 60000));
         defaultFairShareClass.put("max-processes", "<optional>");
-        defaultFairShareClass.put("max-allotment", Integer.toString(Integer.MAX_VALUE));
         defaultFairShareClass.put("nodepool", "<required>");
         defaultFairShareClass.put("users", "<optional>");
         defaultFairShareClass.put("debug", "fixed");
@@ -124,7 +123,7 @@ public class NodeConfiguration
         defaultFixedShareClass.put("priority", "5");
         defaultFixedShareClass.put("default", "<optional>");
         defaultFixedShareClass.put("max-processes", "<optional>");
-        defaultFixedShareClass.put("max-allotment", Integer.toString(Integer.MAX_VALUE));
+        defaultFixedShareClass.put("max-allotment", "<optional>");
         defaultFixedShareClass.put("cap", "<optional>");
         defaultFixedShareClass.put("nodepool", "<required>");
         defaultFixedShareClass.put("users", "<optional>");
@@ -138,7 +137,7 @@ public class NodeConfiguration
         defaultReserveClass.put("priority", "1");
         defaultReserveClass.put("default", "<optional>");
         defaultReserveClass.put("max-machines", "<optional>");
-        defaultReserveClass.put("max-allotment", Integer.toString(Integer.MAX_VALUE));
+        defaultReserveClass.put("max-allotment", "<optional>");
         defaultReserveClass.put("cap", "<optional>");
         defaultReserveClass.put("nodepool", "<required>");
         defaultReserveClass.put("users", "<optional>");
@@ -152,6 +151,7 @@ public class NodeConfiguration
 
         defaultUser.put("type", "user");
         defaultUser.put("name", "<optional>");
+        defaultUser.put("max-allotment", Integer.toString(Integer.MAX_VALUE));
      }
 
     /**
@@ -1172,7 +1172,7 @@ public class NodeConfiguration
 
         for (Object o : cl.keySet() ) {
         		String k = (String) o;
-            if ( k.startsWith("max-allotment.") ) {
+            if ( k.startsWith("max-allotment") ) {
                 printProperty(k, cl.get(k));
             }
         }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java Sun Mar 15 14:20:49 2015
@@ -367,12 +367,15 @@ public class JobManagerConverter
             j.setNQuestions(total_work, remaining_work, arith_mean);
 
             // formatSchedulingInfo(job.getDuccId(), si, remaining_work);
-
             if ( job instanceof IDuccWorkJob ) {
                 if ( j.setInitWait( ((IDuccWorkJob) job).isRunnable()) ) {
                     logger.info(methodName, jobid, "Set Initialized.");
                     scheduler.signalInitialized(j);
                 }
+                // UIMA-4275 Avoid race so we don't keep trying to give out new processes
+                if ( ((IDuccWorkJob) job).isCompleting() ) {
+                    j.markComplete();
+                }
             } else {
                 j.setInitWait(true);                           // pop is always ready to go
             }            
@@ -505,7 +508,7 @@ public class JobManagerConverter
         if ( name == null ) {
             name = "A Job With No Name.";
         }
-        String user_name  = sti.getUser();
+        String user_name  = sti.getUser().trim();
         j.setUserName(user_name);
         j.setJobName(name);
 
@@ -622,46 +625,49 @@ public class JobManagerConverter
 //         }
 
         switch ( job.getDuccType() ) {
+          // UIMA-4275, must enforce max allocations as 1 for Service and Pop/
           case Service:
           case Pop:
+              switch ( rescl.getPolicy() ) {
+                  case FAIR_SHARE:
+                      refuse(j, "Services and managed reservations are not allowed to be FAIR_SHARE");
+                      break;
+                      
+                  case FIXED_SHARE:
+                      j.setMaxShares(1);
+                      break;
+                      
+                  case RESERVE:
+                      j.setMaxShares(1);
+                      break;
+              }
+              status = receiveExecutable(j, job, mustRecover); // UIMA-4142, add mustRecover flag
+              logger.trace(methodName, j.getId(), "Serivce, or Pop arrives, accepted:", status);
+              break;
           case Job:              
               // instance and share count are a function of the class
+              max_processes    = toInt(si.getSharesMax(), DEFAULT_PROCESSES);
               switch ( rescl.getPolicy() ) {
                   case FAIR_SHARE:
-                      max_processes    = toInt(si.getSharesMax(), DEFAULT_PROCESSES);
-                      // max_processes    = Math.min(rescl.getMaxProcesses(), max_processes);
                       j.setMaxShares(max_processes);
-                      j.setNInstances(-1);
                       break;
                       
                   case FIXED_SHARE:
-                      max_processes   = toInt(si.getSharesMax(), DEFAULT_INSTANCES);
                       j.setMaxShares(max_processes);
-                      j.setNInstances(max_processes);
                       break;
                       
                   case RESERVE:
                       max_machines   = toInt(si.getSharesMax(), DEFAULT_INSTANCES);
-                      j.setMaxShares(max_machines);
-                      j.setNInstances(max_machines);
+                      j.setMaxShares(max_processes);
                       break;
               }
               
               status = receiveExecutable(j, job, mustRecover); // UIMA-4142, add mustRecover flag
-              logger.trace(methodName, j.getId(), "Serivce, Pop, or Job arrives, accepted:", status);
+              logger.trace(methodName, j.getId(), "Job arrives, accepted:", status);
               break;
           case Reservation:
-              switch ( rescl.getPolicy() ) {
-                  case FIXED_SHARE:
-                      max_machines   = toInt(si.getInstancesCount(), DEFAULT_INSTANCES);
-                      break;
-                  case RESERVE:
-                      max_machines   = toInt(si.getInstancesCount(), DEFAULT_INSTANCES);
-                      break;
-              }
-                            
-              j.setMaxShares(-1);
-              j.setNInstances(max_machines);
+              // UIMA-4275. non-jobs restricted to exactly one allocation per request 
+              j.setMaxShares(1);
 
               status = receiveReservation(j, job, mustRecover);  // UIMA-4142, add mustRecover flag
               logger.trace(methodName, j.getId(), "Reservation arrives, accepted:", status);
@@ -1005,37 +1011,38 @@ public class JobManagerConverter
         return ret;
     }
 
-    boolean isPendingNonPreemptable(IRmJob j) 
-    {
-    	String methodName = "isPendingNonPreemptable";
-        // If fair share it definitely isn't any kind of preemptable
-        if ( j.getResourceClass().getPolicy() == Policy.FAIR_SHARE) return false;
-
-        // otherwise, if the shares it has allocated is < the number it wants, it is in fact
-        // pending but not complete.
-        logger.trace(methodName, j.getId(), "countNShares", j.countNShares(), "countInstances", j.countInstances(), "isComplete", j.isCompleted());
-
-        if ( j.isCompleted() ) {
-            return false;
-        }
-
-        // 2014-02-18 - countTotalAssignments is the total nodes this job ever got - we're not allowed to
-        //              add more.  But if a node dies and the share is canceled, countNShares() CAN return 
-        //              0, preventing this cutoff check from working, and the job looks "refused" when in
-        //              fact it's just hungy.  Hence, the change from countNShares to countTotalAssignments. 
-        //                
-        //              Note: The NodePool code that detects dead nodes is responsible for removing dead shares
-        //              from jobs and should not remove shares from reservations, but it can remove shares
-        //              from non-preemptables that aren't reservations.        
-        //              UIMA-3613 jrc
-        //if ( j.countNShares() == j.countInstances() ) {
-        if ( j.countTotalAssignments() == j.countInstances() ) {
-            j.markComplete();                  // non-preemptable, remember it finally got it's max
-            return false;
-        }
+    // No longer needed after UIMA-4275
+    // boolean isPendingNonPreemptable(IRmJob j) 
+    // {
+    // 	String methodName = "isPendingNonPreemptable";
+    //     // If fair share it definitely isn't any kind of preemptable
+    //     if ( j.getResourceClass().getPolicy() == Policy.FAIR_SHARE) return false;
+
+    //     // otherwise, if the shares it has allocated is < the number it wants, it is in fact
+    //     // pending but not complete.
+    //     logger.trace(methodName, j.getId(), "countNShares", j.countNShares(), "countInstances", j.getMaxShares(), "isComplete", j.isCompleted());
+
+    //     if ( j.isCompleted() ) {
+    //         return false;
+    //     }
+
+    //     // 2014-02-18 - countTotalAssignments is the total nodes this job ever got - we're not allowed to
+    //     //              add more.  But if a node dies and the share is canceled, countNShares() CAN return 
+    //     //              0, preventing this cutoff check from working, and the job looks "refused" when in
+    //     //              fact it's just hungy.  Hence, the change from countNShares to countTotalAssignments. 
+    //     //                
+    //     //              Note: The NodePool code that detects dead nodes is responsible for removing dead shares
+    //     //              from jobs and should not remove shares from reservations, but it can remove shares
+    //     //              from non-preemptables that aren't reservations.        
+    //     //              UIMA-3613 jrc
+    //     //if ( j.countNShares() == j.countInstances() ) {
+    //     if ( j.countTotalAssignments() == j.getMaxShares() ) {
+    //         j.markComplete();                  // non-preemptable, remember it finally got it's max
+    //         return false;
+    //     }
 
-        return (j.countNShares() < j.countInstances());
-    }
+    //     return (j.countNShares() < j.getMaxShares());
+    // }
 
     /**
      * If no state has changed, we just resend that last one.
@@ -1098,41 +1105,37 @@ public class JobManagerConverter
                 Map<Share, Share> shares = null;
                 Map<Share, Share> redrive = null;
 
-                if (isPendingNonPreemptable(j) ) {                
-                    logger.info(methodName, j.getId(), "Delaying publication of expansion because it's not yet complete.");
-                } else {
-                    shares = j.getAssignedShares();
-                    if ( shares != null ) {
-                        ArrayList<Share> sorted = new ArrayList<Share>(shares.values());
-                        Collections.sort(sorted, new RmJob.ShareByInvestmentSorter());
-                        for ( Share s : sorted ) {
-                            Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), s.getInitializationTime());
-                            all_shares.put(s.getId(), r);
-                        }
-                        redrive = sanityCheckForOrchestrator(j, shares, expanded.get(j.getId()));
+                shares = j.getAssignedShares();
+                if ( shares != null ) {
+                    ArrayList<Share> sorted = new ArrayList<Share>(shares.values());
+                    Collections.sort(sorted, new RmJob.ShareByInvestmentSorter());
+                    for ( Share s : sorted ) {
+                        Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), s.getInitializationTime());
+                        all_shares.put(s.getId(), r);
                     }
+                    redrive = sanityCheckForOrchestrator(j, shares, expanded.get(j.getId()));
+                }
                     
-                    shares = shrunken.get(j.getId());
-                    if ( shares != null ) {
-                        for ( Share s : shares.values() ) {
-                            Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0);
-                            shrunken_shares.put(s.getId(), r);
-                        }
-                    }                                        
+                shares = shrunken.get(j.getId());
+                if ( shares != null ) {
+                    for ( Share s : shares.values() ) {
+                        Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0);
+                        shrunken_shares.put(s.getId(), r);
+                    }
+                }                                        
                     
-                    shares = expanded.get(j.getId());
-                    if ( shares != null ) {                    
-                        for ( Share s : shares.values() ) {
-                            Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0);
-                            expanded_shares.put(s.getId(), r);
-                        }
+                shares = expanded.get(j.getId());
+                if ( shares != null ) {                    
+                    for ( Share s : shares.values() ) {
+                        Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0);
+                        expanded_shares.put(s.getId(), r);
                     }
+                }
                     
-                    if ( redrive != null ) {
-                        for ( Share s : redrive.values() ) {
-                            Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0);
-                            expanded_shares.put(s.getId(), r);
-                        }
+                if ( redrive != null ) {
+                    for ( Share s : redrive.values() ) {
+                        Resource r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0);
+                        expanded_shares.put(s.getId(), r);
                     }
                 }
                 

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java Sun Mar 15 14:20:49 2015
@@ -41,6 +41,8 @@ public interface IRmJob
 
     public DuccId getId();
     
+    public String getShortType();  // S, R, M, J - service reservation managed-reservation, job
+
     public long getFriendlyId();
 
     public String getName();
@@ -244,8 +246,8 @@ public interface IRmJob
 
     public ResourceClass getResourceClass();
     
-    public int countInstances();
-    public void setNInstances(int m);
+    //public int countInstances();
+    //public void setNInstances(int m);
 
     public int  nThreads();
     public void setThreads(int threads);
@@ -273,4 +275,9 @@ public interface IRmJob
     public boolean isService();                   // UIMA-4142
 
     public boolean isInitialized();
+    
+    // Total number of shares to account to me - either actually assigned, or
+    // counted afresh in the current scheduling cycle, for allotments
+    public int countOccupancy();                  // UIMA-4275
+
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java Sun Mar 15 14:20:49 2015
@@ -81,7 +81,6 @@ class NodePool
     int nSharesByOrder[];         // shares of each size for each share order                         [ 0 21 9 5  4 ] - collective N Shares for each order
     //int nFreeSharesByOrder[];     // for each order, the theoretical number of shares to give away  [ 0  1 0 3 16 ] - free Q shares per order
 
-    int machinesToPreempt[];     // number of full machines to preempt for reservations, by order
     int nPendingByOrder[];        // number of N-shares with pending evictinos
 
     //int neededByOrder[];         // for each order, how many N-shares do I want to add?
@@ -271,17 +270,12 @@ class NodePool
     /**
      * Counts just local, for reservations.
      */
-    int countFreeMachines(int order, boolean enforce)
+    int countFreeMachines(int order)
     {
         int cnt = 0;
 
         HashMap<Node, Machine> mlist = null;
-        if ( enforce ) {
-            mlist = machinesByOrder.get(order);
-        } else {
-            mlist = allMachines;
-        }
-
+        mlist = machinesByOrder.get(order);
         if ( mlist == null ) return 0;
 
         for ( Machine m : mlist.values() ) {
@@ -297,7 +291,7 @@ class NodePool
      */
     int countAllFreeMachines()
     {
-        int count = countFreeMachines(0, false);  // don't care about the order
+        int count = countFreeMachines(0);  // don't care about the order
         for ( NodePool np : children.values() ) {
             count += np.countAllFreeMachines();
         }
@@ -719,12 +713,13 @@ class NodePool
 
     void accountForShares(HashMap<Share, Share> shares)
     {
+        if ( shares == null ) return;
+
         for ( Share s : shares.values() ) {
             int order = s.getShareOrder();
             Machine m = s.getMachine();
             rearrangeVirtual(m, order);
         }
-        //calcNSharesByOrder();
     }
 
     /**
@@ -750,8 +745,6 @@ class NodePool
 
         nPendingByOrder = new int[maxorder + 1];
 
-        machinesToPreempt  = new int[maxorder + 1];
-
         // UIMA-4142 Must set vMachinesByOrder and virtualMachinesByOrder independently of
         //           machinesByOrder because blacklisting can cause v_order != r_order
         //           during reset.
@@ -1119,8 +1112,18 @@ class NodePool
      */
 
     /**
+     * A quick check to see if there are any machines of the right size. We make a more
+     * comprehensive check to see if they're usable in countFreeableMachines later.
+     */
+    int countReservables(IRmJob j)
+    {
+        int order = j.getShareOrder();
+        if ( ! machinesByOrder.containsKey(order) ) return 0;
+        return machinesByOrder.get(order).size();
+    }
+
+    /**
      * Adjust counts for something that takes full machines, like a reservation.
-     * If "enforce" is set the machine order must match, otherwise we just do best effort to match.
      *
      * This is intended for use by reservations only; as such it does NOT recurse into child nodepools.
      *
@@ -1129,24 +1132,20 @@ class NodePool
      *
      * @returns number of machines given
      */
-    int countFreeableMachines(IRmJob j, boolean enforce)
+    int countOutMachinesByOrder(IRmJob j, int needed)
     {
-        String methodName = "countFreeableMachines";
-
-        logger.info(methodName, j.getId(), "Enter nodepool", id, "with enforce", enforce, "preemptables.size() =", preemptables.size());
-        int needed = j.countInstances();
+        String methodName = "countOutMachinesByOrder";
+        
         int order = j.getShareOrder();
+        int given = 0;       
 
         ArrayList<Machine>  machs = new ArrayList<Machine>();
-        if ( enforce ) {
-        	if ( machinesByOrder.containsKey(order) ) {
-        		machs.addAll(machinesByOrder.get(order).values());
-        	} else {
-        		return 0;
-        	}
+        if ( machinesByOrder.containsKey(order) ) {
+            machs.addAll(machinesByOrder.get(order).values());
         } else {
-            machs.addAll(allMachines.values());
+            return 0;    // oops, nothing here
         }
+
         StringBuffer sb = new StringBuffer("Machines to search:");
         for ( Machine m : machs ) {
             sb.append(" ");
@@ -1156,7 +1155,6 @@ class NodePool
 
         Collections.sort(machs, new MachineByAscendingOrderSorter());
 
-        int given = 0;           // total to give, free or freeable
         Iterator<Machine> iter = machs.iterator();
         ArrayList<Machine> pables = new ArrayList<Machine>();
         
@@ -1174,37 +1172,22 @@ class NodePool
             }
 
             if ( m.isFree() ) {
-                logger.info(methodName, j.getId(), "Giving", m.getId(), "because it is free");
+                logger.info(methodName, j.getId(), m.getId(), "is free for reservations.");
+                nMachinesByOrder[m.getShareOrder()]--;
                 given++;
                 continue;
             }
 
             if ( m.isFreeable() ) {
-                logger.info(methodName, j.getId(), "Giving", m.getId(), "because it is freeable");
+                logger.info(methodName, j.getId(), "Setting up", m.getId(), "to clear for reservation");
+                nMachinesByOrder[m.getShareOrder()]--;
                 given++;
-                pables.add(m);
+                preemptables.put(m.key(), m);
+                continue;
             } else {
                 logger.info(methodName, j.getId(), "Bypass because machine", m.getId(), "is not freeable");
             }
         }
-
-        if ( given < needed ) {
-            return 0;
-        }
-
-        // Remember how many full machines we need to free up when we get to preemption stage.
-        if ( enforce ) {
-            machinesToPreempt[0] += preemptables.size();
-        } else {
-            machinesToPreempt[order] += preemptables.size();
-        }
-
-        for ( Machine m : pables ) {
-            logger.info(methodName, j.getId(), "Setting up", m.getId(), "to clear for reservation");
-            preemptables.put(m.key(), m);
-            nMachinesByOrder[m.getShareOrder()]--;
-        }
-
         calcNSharesByOrder();
         return given;
     }
@@ -1276,13 +1259,12 @@ class NodePool
     }
 
     /**
-     * We need to make enough space for 'cnt' full machines.  If enforce is true the machines need
-     * to be of the indicated order; otherwise we just nuke any old thing.
+     * We need to make enough space for 'cnt' full machines.
      *
      * Returns number of machines that are freeable, up to 'needed', or 0, if we can't get enough.
      * If we return 0, we must refuse the reservation.
      */
-    protected int setupPreemptions(int needed, int order, boolean enforce)
+    protected int setupPreemptions(int needed, int order)
     {
         String methodName = "setupPreemptions";
         int given = 0;
@@ -1292,9 +1274,10 @@ class NodePool
         while ( iter.hasNext() && (given < needed) ) {
             Machine m = iter.next();
             int o = m.getShareOrder();
-            if ( enforce && ( order != o) ) {
+            if ( order != o ) {
                 continue;
             }
+            logger.info(methodName, null, "Clearing", m.getId(), "from preemptable list for reservations.");
             HashMap<Share, Share> shares = m.getActiveShares();
             for ( Share s : shares.values() ) {
                 if ( s.isPreemptable() ) {
@@ -1306,36 +1289,35 @@ class NodePool
                     // log its state to try to figure out why it didn't evict
                     if ( ! (s.isEvicted() || s.isPurged() ) ) {
                         IRmJob j = s.getJob();                    
-                        logger.warn(methodName, j.getId(), "Found non-preemptable share", s.getId(), "fixed:", s.isFixed(), "j.NShares", j.countNShares(), "j.NSharesGiven", j.countNSharesGiven());
+                        logger.info(methodName, j.getId(), "Found non-preemptable share", s.getId(), "fixed:", s.isFixed(), 
+                                    "j.NShares", j.countNShares(), "j.NSharesGiven", j.countNSharesGiven());
                     }
                 }
             }
             given++;
             iter.remove();
-            logger.info(methodName, null, "Remove", m.getId(), "from preemptables list");
         }
        
         return given;
     }
 
     /**
-     * Here we have to dig around and find either a fully free machine, or a machine that we
+     * Here we have to dig around and find either fully free machines, or machines that we
      * can preempt to fully free it.
-     *
-     * If 'enforce' is set memory must match, otherwise we can pick anything from the pool.
-     *
-     * This routine should NEVER fail because we already counted to be sure we had enough. The
-     * caller should probably throw if it does not get exactly what it wants.
      */
-    void  allocateForReservation(IRmJob job, ResourceClass rc)
+    void  findMachines(IRmJob job, ResourceClass rc)
     {
-    	String methodName = "allocateForReservation";
-        ArrayList<Machine> answer = new ArrayList<Machine>();
+    	String methodName = "findMachines";        
         ArrayList<Machine> machs;
 
         int order = job.getShareOrder();
-        int needed = job.countInstances();
-        boolean enforce = rc.enforceMemory();
+
+        int counted = job.countNSharesGiven();      // allotment from the counter
+        int current = job.countNShares();           // currently allocated, plus pending, less those removed by earlier preemption
+        int needed = (counted - current);
+
+        logger.info(methodName, job.getId(), "counted", counted, "current", current, "needed", needed, "order", order);
+        if ( needed <= 0 ) return;
 
         //
         // Build up 'machs' array, containing all candidate machines, sorted by 
@@ -1345,56 +1327,27 @@ class NodePool
         // Free machines always sort to the front of the list of course.
         //
 
-        int cnt = countFreeMachines(order, enforce);
+        int cnt = countFreeMachines(order);
         if ( cnt < needed ) {
-            logger.info(methodName, job.getId(), "Reservation waiting on evictions.  Have", cnt, "free, needed", needed);
-            setupPreemptions(needed-cnt, order, enforce);  // if returns 0, must refuse the job
-            return;
+            // Get the preemptions started
+            logger.info(methodName, job.getId(), "Setup preemptions.  Have", cnt, "free machines, needed", needed);
+            setupPreemptions(needed-cnt, order);  // if returns 0, must refuse the job
         }
 
-        // 
-        // If we get here we MUST have enough machines because we would have refused the job otherwise, or else
-        // started preemptions.
-        //
-        if ( enforce ) {                               // enforcing order, only look in the right subset of machines
-            if ( ! machinesByOrder.containsKey(order) ) {       // hosed if this happens
-                throw new SchedInternalError(job.getId(), "Scheduling counts are wrong - machinesByOrder does not match nMachinesByOrder");
-            }
-            machs = sortedForReservation(machinesByOrder.get(order));
-        } else {                                       // no enforcement - anything in the pool is fair game
-            machs = sortedForReservation(allMachines);
+        // something awful happened if we throw here.
+        if ( ! machinesByOrder.containsKey(order) ) {       // hosed if this happens
+            throw new SchedInternalError(job.getId(), "Scheduling counts are wrong - machinesByOrder does not match nMachinesByOrder");
         }
+        machs = sortedForReservation(machinesByOrder.get(order));
 
-        if ( machs.size() < needed ) {                // totally completely hosed if this happens. but let's do the sanity check.
-            throw new SchedInternalError(job.getId(), "Scheduling counts are wrong - can't schedule reservation");
-        }
-
-        // machs is all candidate machines, ordered by empty, then most preferable, according to the eviction policy.
-        // We don't have to preempt explicitly, the counting should have worked this out.  Only thing to watch is that
-        // when we do preemption in fair-share that we free up sufficient machines fully.
-        for ( Machine m : machs ) {
-            if ( m.isFree() ) {
-                answer.add(m);
-                needed--;
-            } 
-
-            if ( needed == 0 ) {
-                break;
-            }
-        }
-
-        if ( needed == 0 ) {               // all-or-nothing - don't return anything if we're waiting for preemption
-            for ( Machine mm : answer ) {
+        // Machs is all candidate machines, ordered by empty, then most preferable, according to the eviction policy.
+        for ( Machine mm : machs ) {
+            if ( mm.isFree() ) {
                 Share s = new Share(mm, job, mm.getShareOrder());
                 s.setFixed();
                 connectShare(s, mm, job, mm.getShareOrder());
-                //job.assignShare(s);
-                //mm.assignShare(s);
-                //allShares.put(s, s);
-                //rearrangeVirtual(mm, order);
+                if ( --needed == 0 ) break;
             }
-        } else {
-            throw new SchedInternalError(job.getId(), "Thought we had enough machines for reservation, but we don't");
         }
 
     }
@@ -1584,7 +1537,7 @@ class NodePool
         int order = j.getShareOrder();
         int given = 0;        
 
-        logger.trace(methodName, j.getId(), "counted", counted, "current", current, "needed", needed, "order", order, "given", given);
+        logger.info(methodName, j.getId(), "counted", counted, "current", current, "needed", needed, "order", order, "given", given);
 
         if ( needed > 0 ) {
             whatof: {
@@ -1655,6 +1608,8 @@ class NodePool
         sb.append(getId());
         sb.append(" Expansions in this order: ");
         for ( IRmJob j : jobs ) {
+            if ( j.isCompleted() ) continue;  // deal with races while job is completing
+
             j.undefer();
             sb.append(j.getId());
             sb.append(":");

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java Sun Mar 15 14:20:49 2015
@@ -55,12 +55,12 @@ public class NodepoolScheduler
 
     int fragmentationThreshold = 2;
     boolean do_defragmentation = true;
+    boolean use_global_allotment = true;
+    int global_allotment = Integer.MAX_VALUE;
+    int scheduling_quantum;
 
     NodepoolScheduler()   
     {
-        fragmentationThreshold = SystemPropertyResolver.getIntProperty("ducc.rm.fragmentation.threshold", 
-                                                                       fragmentationThreshold);
-        do_defragmentation = SystemPropertyResolver.getBooleanProperty("ducc.rm.defragmentation", do_defragmentation);
     }
 
     public void setClasses(Map<ResourceClass, ResourceClass> prclasses)
@@ -106,6 +106,11 @@ public class NodepoolScheduler
             classes[ndx++] = sorter.get(k);
         }
 
+        fragmentationThreshold = SystemPropertyResolver.getIntProperty("ducc.rm.fragmentation.threshold", fragmentationThreshold);
+        scheduling_quantum = SystemPropertyResolver.getIntProperty("ducc.rm.share.quantum", scheduling_quantum);
+        do_defragmentation = SystemPropertyResolver.getBooleanProperty("ducc.rm.defragmentation", do_defragmentation);
+        use_global_allotment = SystemPropertyResolver.getBooleanProperty("ducc.rm.use_global_allotment",  use_global_allotment);
+        global_allotment = SystemPropertyResolver.getIntProperty("ducc.rm.global_allotment", global_allotment);
     }
 
     public void setNodePool(NodePool np)
@@ -118,6 +123,95 @@ public class NodepoolScheduler
         this.evictionPolicy = ep;
     }
 
+    /**
+     * Check the allotment for the user, given that we want to allocate
+     *    - nprocs new processes for
+     *    - job j
+     */
+    boolean validSingleAllotment(IRmJob j)
+    {
+        String methodName = "validAllotment";
+        //
+        // Original design and implementation for UIMA-4275: class based.  Subsequent discussion resulted in
+        // changing to a single global allotment.  I'll leave the class-based allotment in for now because
+        // I suspect it will re-raise its ugly head. (jrc)
+        //
+
+        if ( use_global_allotment ) {
+            User u = j.getUser();
+            int lim = u.getOverrideLimit();
+            if ( lim < 0 ) {
+                lim = global_allotment;
+            }
+
+            int shares = u.countNPShares();
+            long sharesInGB = ((shares + j.getShareOrder()) * scheduling_quantum);
+            if ( sharesInGB > lim ) {
+                schedulingUpdate.defer(j, "Deferred because allotment of " + lim + "GB is exceeded by user " + j.getUserName());
+                logger.info(methodName, j.getId(), "Deferred because allotment of " + lim + "GB is exceeded by user " + j.getUserName());
+                return false;
+            }
+        } else {
+            ResourceClass rc = j.getResourceClass();
+            if ( rc.allotmentExceeded(j) ) {
+                schedulingUpdate.defer(j, "Deferred because allotment of " + rc.getAllotment(j) + "GB is exceeded by user " + j.getUserName());
+                logger.info(methodName, j.getId(), "Deferred because allotment of " + rc.getAllotment(j) + "GB is exceeded by user " + j.getUserName());
+                return false;
+            }
+        } 
+        return true; 
+    }
+
+    int getAllotmentForJob(IRmJob j)
+    {
+    	String methodName = "getAllotmentForJob";
+        User u = j.getUser();
+        
+        // Let the job ask for the world.  This accounts for init cap, prediction, number usable, etc
+        int order = j.getShareOrder();
+        int wanted = j.getJobCap();                // in nshares
+        logger.info(methodName, j.getId(), "Job cap", nSharesToString(wanted, order));
+        
+        // Find out how many qshares we're allowed
+        int allotment_in_gb = u.getOverrideLimit();
+        if ( allotment_in_gb < 0 ) {
+            allotment_in_gb = global_allotment;
+        }
+        int user_allotment = allotment_in_gb / scheduling_quantum;     // qshares
+
+        // How many qshares we, the user, have used
+        int allocated = u.countNPShares();
+        logger.info(methodName, j.getId(), "Current NP allocation for user", allocated, "qshares", (allocated * scheduling_quantum), "GB",
+                    "user_allotment", user_allotment, "user_allotment in GB", allotment_in_gb );
+
+        // This is how many QShares we get to allocate for the job
+        int additional = Math.max(0, user_allotment - allocated);
+        int additional_processes = additional / order;
+        logger.info(methodName, j.getId(), "Additional shares allowed for request:", nSharesToString(additional_processes, order));
+
+        // No shares, so we show deferred
+        if ( (additional_processes == 0) ) {
+            if (j.countNShares() == 0)  {
+                // over allotment, never had anything, can get anything so is deferred
+                schedulingUpdate.defer(j, "Deferred because allotment of " + allotment_in_gb + "GB is exceeded.");
+                logger.info(methodName, j.getId(), "Deferred because allotment of",  allotment_in_gb, "GB is exceeded.");
+            } else {
+                logger.info(methodName, j.getId(), "Allotment of", allotment_in_gb, "GB caps request. Return with", allocated, "qshares allocated.");
+            }
+            return j.countNShares();
+        }
+
+        int allowed = j.countNShares() + additional_processes;
+
+        if ( allowed < wanted ) {
+            logger.info(methodName, j.getId(), "Capping job on allotment: ", allotment_in_gb + " GB. Remaining allowed nshares [",
+                        allowed, "] wanted [", wanted, "]");
+        }
+
+        logger.info(methodName, j.getId(), "Allowed", nSharesToString(allowed, order), "wanted", nSharesToString(wanted, order));
+        return Math.min(wanted, allowed);
+    }
+
     private void reworknShares(int[] vshares, int[] nshares)
     {
         // now redo nshares
@@ -1014,7 +1108,7 @@ public class NodepoolScheduler
         logger.debug(methodName, null, "NP[", np.getId(), "Expand needy reservations.", listJobSet(reservations));
         for ( IRmJob j : reservations ) {
             ResourceClass rc = j.getResourceClass();
-            np.allocateForReservation(j, rc);
+            np.findMachines(j, rc);
         }
 
         Collections.sort(fixed_share_jobs, new JobByTimeSorter());
@@ -1137,7 +1231,7 @@ public class NodepoolScheduler
      * Make sure there are enough shares to allocate either directly, or through preemption,
      * and count them out.
      */
-    protected void howMuchFixed(ArrayList<ResourceClass> rcs)
+    void howMuchFixed(ArrayList<ResourceClass> rcs)
     {
     	String methodName = "howMuchFixedShare";
 
@@ -1154,97 +1248,124 @@ public class NodepoolScheduler
         for ( ResourceClass rc : rcs ) {
             HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
             total_jobs += jobs.size();
-            if ( jobs.size() == 0 ) {
-                logger.info(methodName, null, "No jobs to schedule in class ", rc.getName());
-            } else {
-                StringBuffer buf = new StringBuffer();
-                for ( IRmJob j : jobs.values() ) {
-                    buf.append(" ");
-                    buf.append(j.getId());
-                }
-                logger.info(methodName, null, "Scheduling jobs in class:", rc.getName(), buf.toString());
-            }
         }
         if ( total_jobs == 0 ) {
             return;
         }
 
         for ( ResourceClass rc : rcs ) {
-            ArrayList<IRmJob> jobs = rc.getAllJobsSorted(new JobByTimeSorter());
+            ArrayList<IRmJob> jobs = rc.getAllJobsSorted(new JobByTimeSorter());            
 
             for ( IRmJob j : jobs ) {
-                j.clearShares();                               // reset shares assigned at start of each schedling cycle
-                j.undefer();                                   // in case it works this time!
-            }
+                logger.info(methodName, j.getId(), "Scheduling job to class:", rc.getName());
 
-            NodePool np = rc.getNodepool();
+                j.clearShares();                               // reset virtual shares at start of each schedling cycle
+                j.undefer();                                   // in case it works this time!
 
+                switch ( j.getDuccType() ) {
+                    case Job:
+                        countFixedForJob(j, rc);
+                        break;
+                    case Service:
+                    case Pop:
+                    case Reservation:
+                    default:
+                        countSingleFixedProcess(j, rc);
+                        break;
+                }
+            }            
+        }
+    }
 
-            for ( IRmJob j : jobs ) {
+    
+    void countFixedForJob(IRmJob j, ResourceClass rc)
+    {
+        String methodName = "countFixedForJob";
 
-                int n_instances = j.countInstances();               // n-shrares; virtual shares 
+        logger.info(methodName, j.getId(), "Counting shares for", j.getShortType() + "." + j.getId());
 
-                if ( j.countNShares() > 0 ) {                       // all-or-nothing check
-                    // already accounted for as well, since it is a non-preemptable share
-                    logger.info(methodName, j.getId(), "[stable]", "requested", n_instances, "assigned", j.countNShares(), "processes, ", 
-                                (j.countNShares() * j.getShareOrder()), "QS");
-                    int[] gbo = NodePool.makeArray();
+        // Allowed something if we get here.  Must see if we have something to give.
+        NodePool np = rc.getNodepool();
 
-                    gbo[j.getShareOrder()] = j.countNShares();    // must set the allocation so eviction works right
+        int order = j.getShareOrder();
+        int available = np.countLocalNSharesByOrder(order);
+        logger.info(methodName, j.getId(), "available shares of order", order, "in np:", available);
 
-                    j.setGivenByOrder(gbo);
-                    continue;
-                }
+        if ( available == 0 ) {
+            if (j.countNShares() == 0)  {
+                schedulingUpdate.defer(j, "Deferred because insufficient resources are availble.");
+                logger.info(methodName, j.getId(), "Deferring, insufficient shares available. NP", np.getId(), 
+                            "available[", np.countNSharesByOrder(order), "]");
+            } else {
+                logger.info(methodName, j.getId(), "Nodepool is out of shares: NP", np.getId(), 
+                            "available[", np.countNSharesByOrder(order), "]");
+            }
+            return;
+        }
 
-                if ( j.isCompleted() ) {                        // allocation once filled? then we're done
-                    continue;                                   // UIMA-3614, don't refuse, just stop allocating 
-                }
+        int granted = getAllotmentForJob(j); // in nshares, processes
 
-                int order = j.getShareOrder();
+        //
+        // The job passes; give the job a count
+        //
+        logger.info(methodName, j.getId(), "+++++ nodepool", np.getId(), "class", rc.getName(), "order", order, "shares", nSharesToString(granted, order));
+        int[] gbo = NodePool.makeArray();
+        gbo[order] = granted;                      // what we get
+        j.setGivenByOrder(gbo);
+        
+        // The difference between what we pass to 'what of', and what we already have.  The shares we already have are accounted
+        // for in a special step at the start of the scheduling round.
+        np.countOutNSharesByOrder(order, granted - j.countNShares());
+    }
 
-                // Don't schedule non-preemptable shares over subpools
-                if ( np.countLocalShares() < n_instances ) {
-                    schedulingUpdate.defer(j, "Job deferred because insufficient resources are availble for this class.");
-
-                    logger.warn(methodName, j.getId(), "1 Deferring sixed share job because nodepool " + np.getId() 
-                                            + " has insufficient space left. Available[" 
-                                            + np.countLocalShares() 
-                                            + "] requested[" + n_instances + "]");
-                    continue;
-                }
-             
-                //
-                // Now see if we have sufficient shares in the system for this allocation.
-                //
-                if ( np.countNSharesByOrder(order) < n_instances ) {     // countSharesByOrder is N shares, as is minshares
-                    schedulingUpdate.defer(j, "Job deferred  because insufficient resources are availble.");
-                    logger.warn(methodName, j.getId(), "2 Deferring fixed share job, insufficient shares available. Available[" + np.countNSharesByOrder(order) + "] requested[" + n_instances + "]");
-                    continue;
-                }
+    void countSingleFixedProcess(IRmJob j, ResourceClass rc)
+    {
+        String methodName = "countSingleFixedProcess";
 
-                //
-                // Make sure this allocation does not blow the class cap.
-                //
-                if ( rc.allotmentExceeded(j) ) {
-                    schedulingUpdate.defer(j, "Job deferred because allotment of " + rc.getAllotment(j) + "GB is exceeded by user " + j.getUserName());
-                    continue;
-                }
 
-                //
-                // The job passes.  Assign it a count and get on with life ...
-                //
-                logger.info(methodName, j.getId(), "+++++ nodepool", np.getId(), "class", rc.getName(), "order", order, "shares", nSharesToString(n_instances, order));
-                int[] gbo = NodePool.makeArray();
-                gbo[order] = n_instances;
-                j.setGivenByOrder(gbo);
+        logger.info(methodName, j.getId(), "Counting shares for", j.getShortType() + "." + j.getId(), "in class", rc.getName());
+        NodePool np = rc.getNodepool();
 
-                np.countOutNSharesByOrder(order, n_instances);
-            }
+        if ( j.countNShares() > 0 ) {                  // only 1 allowed, UIMA-4275
+            // already accounted for as well, since it is a non-preemptable share
+            logger.info(methodName, j.getId(), "[stable]", "assigned", j.countNShares(), "processes, ", 
+                        (j.countNShares() * j.getShareOrder()), "QS");
+            int[] gbo = NodePool.makeArray();
+            
+            gbo[j.getShareOrder()] = 1;                // must set the allocation so eviction works right
+            j.setGivenByOrder(gbo);
+            return;
+        }
+        
+        int order = j.getShareOrder();
+        
+        //
+        // Now see if we have sufficient shares in the nodepool for this allocation.
+        //
+        if ( np.countLocalNSharesByOrder(order) == 0 ) {
+            schedulingUpdate.defer(j, "Deferred  because insufficient resources are availble.");
+            logger.info(methodName, j.getId(), "Deferring, insufficient shares available. NP", np.getId(), "available[", np.countNSharesByOrder(order), "]");
+            return;
         }
+        
+        //
+        // Make sure this allocation does not blow the allotment cap.
+        //
+        if ( ! validSingleAllotment(j) ) return;        // this method will defer the job and log it
+        
+        //
+        // The job passes.  Assign it a count and get on with life ...
+        //
+        logger.info(methodName, j.getId(), "+++++ nodepool", np.getId(), "class", rc.getName(), "order", order, "shares", nSharesToString(1, order));
+        int[] gbo = NodePool.makeArray();
+        gbo[order] = 1;
+        j.setGivenByOrder(gbo);
+        
+        np.countOutNSharesByOrder(order, 1);
     }
 
     /**
-     * All-or-nothing makes this easy.  If there are free shares of the right order just assign them.  Otherwise
+     * If there are free shares of the right order just assign them.  Otherwise
      * the counts will cause evictions in lower-priority code so we just wait.
      */
     protected void whatOfFixedShare(ArrayList<ResourceClass> rcs)
@@ -1255,12 +1376,7 @@ public class NodepoolScheduler
 
             NodePool np = rc.getNodepool();
             for ( IRmJob j : jobs ) {
-
-                if ( j.countNShares() > 0 ) {               // all or nothing - if we have any, we're fully satisfied
-                    continue;
-                }
-
-                if ( j.isCompleted() ) {                    // UIMA-3614 - may have bene purged, don't give it more
+                if ( j.countNShares() == j.countNSharesGiven() ) {  // got what we need, we're done
                     continue;
                 }
 
@@ -1268,18 +1384,14 @@ public class NodepoolScheduler
                     continue;
                 }
 
-                if ( j.isDeferred() ) {                    // UIMA-4275 - still waiting for an allocation
+                if ( j.isDeferred() ) {                     // UIMA-4275 - still waiting for an allocation
                     continue;
                 }
 
                 int order = j.getShareOrder();
                 int count = j.countNSharesGiven();
-                int avail = np.countNSharesByOrder(order);
 
-                if ( avail >= count ) {
-                    if ( np.findShares(j) != count ) {
-                        throw new SchedInternalError(j.getId(), "Can't get enough shares but counts say there should be plenty.");
-                    }
+                if ( np.findShares(j) > 0 ) {               // UIMA-4275, no longer require full allocation, we'll take what we can
                     //
                     // Need to fix the shares here, if any, because the findShares() code is same for fixed and fair share so it
                     // won't have done that yet.
@@ -1287,15 +1399,16 @@ public class NodepoolScheduler
                     for ( Share s : j.getPendingShares().values() ) {
                         s.setFixed();
                     }
-                    logger.info(methodName, j.getId(), "Assign:", nSharesToString(count, j.getShareOrder()));
-                } else {
-                    j.setReason("Waiting for preemptions.");
+                    logger.info(methodName, j.getId(), "Assign:", nSharesToString(count, order));
                 }
 
-
                 // 
-                // If nothing assigned we're waiting on preemptions which will occur naturally, or by forcible eviction of squatters.
+                // If nothing assigned we're waiting on preemptions which will occur naturally, or by forcible eviction of squatters,
+                // or defrag.
                 //
+                if ( j.countNShares() == 0 ) {
+                    j.setReason("Waiting for preemptions.");
+                }                
             }
         }
     }
@@ -1307,7 +1420,7 @@ public class NodepoolScheduler
     // ==========================================================================================
 	private void howMuchReserve(ArrayList<ResourceClass> rcs)
     {
-        String methodName = "howMuchToreserve";
+        String methodName = "howMuchreserve";
 
         if ( logger.isTrace() ) {
             logger.trace(methodName, null, "Calculating counts for RESERVATION for these classes:");
@@ -1318,114 +1431,118 @@ public class NodepoolScheduler
             }
         }
 
+        int total_jobs = 0;
         for ( ResourceClass rc : rcs ) {
+            HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
+            total_jobs += jobs.size();
+        }
+        if ( total_jobs == 0 ) {
+            return;
+        }
 
-            // Get jobs into order by submission time - new ones ones may just be out of luck
+        for ( ResourceClass rc : rcs ) {
             ArrayList<IRmJob> jobs = rc.getAllJobsSorted(new JobByTimeSorter());
 
-            NodePool np = rc.getNodepool();
 
-            // Find out what is given out already, for class caps.  These are already accounted for
-            // in the global counts.
-            Iterator<IRmJob> jlist = jobs.iterator();
-            while ( jlist.hasNext() ) {
-                IRmJob j = jlist.next();
-                j.undefer();
+            // Now pick up the work that can  be scheduled, if any
+            for ( IRmJob j : jobs) {
+                j.clearShares();                               // reset shares assigned at start of each schedling cycle
+                j.undefer();                                   // in case it works this time!
 
-                if ( np == null ) {                      // oops - no nodes here yet, must refuse all jobs
-                    
-                    schedulingUpdate.defer(j, "Reservation deferred because insufficient resources are available.");
-                    logger.warn(methodName, j.getId(), "Job scheduled to class "
-                                                       + rc.getName()
-                                                       + " but associated nodepool has no resources");
-                    continue;
+                switch ( j.getDuccType() ) {
+                    case Job:
+                        countReservationForJob(j, rc);
+                        break;
+                    case Service:
+                    case Pop:
+                    case Reservation:
+                    default:
+                        countSingleReservation(j, rc);
+                        break;
                 }
+            }            
+        }
+    }
 
-                j.setReservation();                     // in case it's a new reservation
+    void countReservationForJob(IRmJob j, ResourceClass rc)
+    {
+        String methodName = "countReservationForJob";
 
-                int nshares = j.countNShares();         // for reservation each share is one full machine
-                if ( nshares > 0 ) {                    // if it has any, it has all of them, so take off list
-                    int[] gbo = NodePool.makeArray();   // needed to for defrag 
-                    // gbo[j.getShareOrder()] = j.countInstances();
-                    gbo[j.getShareOrder()] = j.countNShares();  // UIMA-3614 - may be < Instances if machine is purged
-                    j.setGivenByOrder(gbo);
-                    
-                    jlist.remove();
-                    continue;
-                } 
+        logger.info(methodName, j.getId(), "Counting full machines for", j.getShortType() + "." + j.getId());
 
-                if ( j.isCompleted() ) {                // maybe nothibng left, but it once had stuff
-                    jlist.remove();                     // don't try to reallocate.  UIMA-3614
-                    continue;
-                }
-            }
+        // Allowed something if we get here.  Must see if we have something to give.
+        NodePool np = rc.getNodepool();
 
-            if ( np == null ) {                         // no np. jobs have been deferred, cannot continue.
-                return;
+        // Are there any machines of the right size in the NP that can be reserved for this job?
+        int available = np.countReservables(j);
+        if ( available == 0 ) {
+            if (j.countNShares() == 0)  {
+                schedulingUpdate.defer(j, "Deferred because there are no hosts of the correct size.");
+                logger.info(methodName, j.getId(), "Deferred because no hosts of correct size. NP", np.getId());
+                           
+            } else {
+                logger.info(methodName, j.getId(), "Nodepool is out of shares: NP", np.getId());
             }
+            return;
+        }
 
-            // Now pick up the jobs that can still be scheduled, if any
-            jlist = jobs.iterator();
+        int granted = getAllotmentForJob(j);
 
-            while ( jlist.hasNext() ) {
-                IRmJob j = jlist.next();
-                logger.info(methodName, j.getId(), "Scheduling (reserve) job in class ", rc.getName());
+        //
+        // The job passes; give the job a count
+        //
+        int order = j.getShareOrder();
+        logger.info(methodName, j.getId(), "+++++ nodepool", np.getId(), "class", rc.getName(), "order", order, "shares", nSharesToString(granted, order));
+        int[] gbo = NodePool.makeArray();
+        gbo[order] = granted;                      // what we are allowed
+        j.setGivenByOrder(gbo);
 
-                if ( j.countNShares() > 0 ) {
-                    logger.info(methodName, j.getId(), "Already scheduled with ", j.countInstances(),  "shares");
-                    continue;
-                }
+        np.countOutNSharesByOrder(order, granted - j.countNShares());
+    }
 
-                int order      = j.getShareOrder();     // memory, coverted to order, so we can find stuff
-                int nrequested = j.countInstances();     // in machines                
-                
-                if ( np.countLocalMachines() == 0 ) {
-                    schedulingUpdate.defer(j, "Reservation deferred because resources are exhausted."); 
-                    logger.warn(methodName, j.getId(), "Job asks for " 
-                                            + nrequested 
-                                            + " reserved machines but reservable resources are exhausted for nodepool "
-                                            + np.getId());
-                    continue;
-                }
+    void countSingleReservation(IRmJob j, ResourceClass rc)
+    {
+        String methodName = "countSingleReservation";
 
-                if ( rc.allotmentExceeded(j) ) {               // Does it blow the configured limit for this class?
-                    schedulingUpdate.defer(j, "Reservation deferred because allotment of " + rc.getAllotment(j) + "GB is exceeded by user " + j.getUserName());
-                    continue;
-                }
-                
-                logger.info(methodName, j.getId(), "Job is granted " + nrequested + " machines for reservation.");
-                //j.addQShares(nrequested * order);
-                int[] gbo = NodePool.makeArray();
-                gbo[order] = nrequested;
-                j.setGivenByOrder(gbo);
-
-                int given = 0;
-                if ( rc.enforceMemory() ) { 
-                    given = np.countFreeableMachines(j, true);
-                } else {
-                    given = np.countFreeableMachines(j, false);
-                }           
+        logger.info(methodName, j.getId(), "Counting shares for", j.getShortType() + "." + j.getId(), "in class", rc.getName());
+        NodePool np = rc.getNodepool();
 
-                // The counts worked out but for some reason we can't find / evict enough space
-                if ( given == 0 ) {
-                    schedulingUpdate.defer(j, "Reservation is deferred because insufficient resources are available.");
-                    if ( rc.enforceMemory() ) {
-                        logger.warn(methodName, j.getId(), "Reservation deferred: asks for " 
-                                                + nrequested 
-                                                + " reserved machines with exactly "
-                                                + j.getShareOrder()  
-                                                + " shares but there are insufficient freeable machines.");
-                    } else {
-                        logger.warn(methodName, j.getId(), "Reservation deferred: ask for " 
-                                                + nrequested 
-                                                + " reserved machines with at least "
-                                                + j.getShareOrder()  
-                                                + " shares but there are insufficient freeable machines.");
-                    }
-                    continue;
-                }
-            }
+        if ( j.countNShares() > 0 ) {
+            logger.info(methodName, j.getId(), "[stable]", "assigned", j.countNShares(), "processes, ", 
+                        (j.countNShares() * j.getShareOrder()), "QS");
+
+            int[] gbo = NodePool.makeArray();
+
+            gbo[j.getShareOrder()] = 1;         // UIMA4275 - only one
+            j.setGivenByOrder(gbo);            
+            return;
+        }
+
+        if ( np.countReservables(j) == 0 ) {
+            schedulingUpdate.defer(j, "Deferred because requested memory " + j.getMemory() + " does not match any machine."); 
+            logger.warn(methodName, j.getId(), "Deferred because requested memory " + j.getMemory() + " does not match any machine.");
+            return;
+        }
+
+        int order      = j.getShareOrder();     // memory, coverted to order, so we can find stuff
+        
+        if ( np.countLocalMachines() == 0 ) {
+            schedulingUpdate.defer(j, "Deferred because resources are exhausted."); 
+            logger.warn(methodName, j.getId(), "Deferred because resources are exhausted in nodepool " + np.getId());
+            return;
         }
+
+        //
+        // Make sure this allocation does not blow the allotment cap.
+        //
+        if ( ! validSingleAllotment(j) ) return;   // defers and logs 
+                
+        logger.info(methodName, j.getId(), "Request is granted a machine for reservation.");
+        int[] gbo = NodePool.makeArray();
+        gbo[order] = 1;
+        j.setGivenByOrder(gbo);
+
+        np.countOutNSharesByOrder(order, 1);
     }
 
     /**
@@ -1447,16 +1564,8 @@ public class NodepoolScheduler
                     continue;
                 }
 
-                if ( j.countNShares() > 0 ) {            // shares already allocated, nothing to do (all-or-nothing policy in effect)
-                    continue;
-                }
-
-                if ( j.isCompleted() ) {                 // UIMA-3614 - may have bene purged, don't give it more
-                    continue;
-                }
-
                 try {
-                    np.allocateForReservation(j, rc);
+                    np.findMachines(j, rc);
                 } catch (Exception e) {
                     logger.error(methodName, j.getId(), "Reservation issues:", e);
                     continue;
@@ -1469,9 +1578,7 @@ public class NodepoolScheduler
                 if ( j.countNShares() == 0 ) {
                     j.setReason("Waiting for preemptions.");
                 }
-
             }
-
         }
     }
 
@@ -1486,27 +1593,13 @@ public class NodepoolScheduler
                    break;
 
                case FIXED_SHARE: 
-                   {
-                       NodePool np = rc.getNodepool();
-                       HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
-                       for ( IRmJob j : jobs.values() ) {
-                           if ( j.countNShares() > 0 ) {   // all-or-nothing - if there's anything, it's fully scheduled.
-                               HashMap<Share, Share> shares = j.getAssignedShares();
-                               np.accountForShares(shares);
-                           }
-                       }
-                   }
-                   break;                
-
                case RESERVE:               
                    {
                        NodePool np = rc.getNodepool();
                        HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
                        for ( IRmJob j : jobs.values() ) {
-                           if ( j.countNShares() > 0 ) {   // all-or-nothing - if there's anything, it's fully scheduled.
-                               HashMap<Share, Share> shares = j.getAssignedShares();
-                               np.accountForShares(shares);
-                           }
+                           HashMap<Share, Share> shares = j.getAssignedShares();
+                           np.accountForShares(shares);
                        }
                    }
                    break;                
@@ -1522,7 +1615,6 @@ public class NodepoolScheduler
         for (ResourceClass rc : resourceClasses.values() ) {
             if ( rc.getPolicy() == Policy.FAIR_SHARE ) {
                 NodePool np = rc.getNodepool();
-                // NodePool check = getNodepool(rc);
                 HashMap<IRmJob, IRmJob> jobs = rc.getAllJobs();
                 for ( IRmJob j : jobs.values() ) {
                     HashMap<Share, Share> shares = j.getAssignedShares();
@@ -1767,25 +1859,24 @@ public class NodepoolScheduler
             for ( Machine m : machines.values() ) {
 
                 if ( m.getShareOrder() < orderNeeded ) {                // nope, too small
-                    logger.trace(methodName, nj.getId(), "Bypass ", m.getId(), ": too small for request of order", orderNeeded); 
+                    logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": too small for request of order", orderNeeded); 
                     continue;
                 }
 
                 // if the job is a reservation the machine size has to matchm and machine must be clearable
-                if ( nj.isReservation() ) {
+                if ( nj.getSchedulingPolicy() == Policy.RESERVE ) {
                     if ( m.getShareOrder() != orderNeeded ) {
-                        logger.trace(methodName, nj.getId(), "Bypass ", m.getId(), ": reservation requires exact match for order", orderNeeded);
+                        logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": RESERVE policy requires exact match for order", orderNeeded);
                         continue;
                     }
                     // machine must be clearable as well
                     Collection<Share> shares = m.getActiveShares().values();
                     for ( Share s : shares ) {
                         if ( ! candidateJobs.containsKey(s.getJob()) ) {
-                            logger.trace(methodName, nj.getId(), "Bypass ", m.getId(), ": for reservation, machine contains non-candidate job", s.getJob().getId());
+                            logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": for reservation, machine contains non-candidate job", s.getJob().getId());
                             continue machine_loop;
                         }
-                    }
-                
+                    }                
                 }
 
                 Map<Share, Share> as = m.getActiveShares();            // everything alloacated here
@@ -1808,13 +1899,6 @@ public class NodepoolScheduler
         
         // Now eligibleMachines is the set of candidate machines for defrag
 
-        // All-or-nothing policy, can we satisfy the reservation with defrag?  If not, we're done.
-        if ( nj.isReservation() && ( eligibleMachines.size() < needed ) ) {
-            // if we can't clear enough for the reservation we have to wait.  Very unlikely, but not impossible.
-            logger.info(methodName, nj.getId(), "Found insufficient machines (", eligibleMachines.size(), "for reservation. Not clearing.");
-            return 0;
-        }
-
         logger.info(methodName, nj.getId(), "Found", eligibleMachines.size(), "machines to be searched in this order:");
         StringBuffer buf = new StringBuffer();
         for ( Machine m : eligibleMachines.keySet() ) {
@@ -1825,7 +1909,7 @@ public class NodepoolScheduler
 
         // first part done, we know where to look.
 
-        // Now just bop through the machines until either we can't find anything, or we find everything.
+        // Now just bop through the machines to see if we can get anything for this specific job (nj)
         int given_per_round = 0;
         do {
             int g = 0;
@@ -2002,10 +2086,13 @@ public class NodepoolScheduler
                     logger.debug(methodName, nj.getId(), "Job", j.getId(), "is a candidate with processes[", nshares, "] qshares[", qshares, "]");
                     rich_candidates.put(j, j);
                 }
-            }
+            } // End search for candidate donors
 
-            HashMap<User, TreeMap<IRmJob, IRmJob>> jobs_by_user = new HashMap<User, TreeMap<IRmJob, IRmJob>>();  // use this to track where the wealth originatse
-            TreeMap<User, User> users_by_wealth = new TreeMap<User, User>(new UserByWealthSorter()); // orders users by wealth
+            //
+            // Here start looking at 'needy' and trying to match them agains the candidates
+            //
+            HashMap<User, TreeMap<IRmJob, IRmJob>> jobs_by_user = new HashMap<User, TreeMap<IRmJob, IRmJob>>();  // use this to track where the wealth originatses
+            TreeMap<User, User> users_by_wealth = new TreeMap<User, User>(new UserByWealthSorter());             // orders users by wealth
 
             collectWealth(rich_candidates, users_by_wealth, jobs_by_user);
 
@@ -2042,6 +2129,7 @@ public class NodepoolScheduler
                 }
             }
             logger.info(methodName, nj.getId(), "Could not get enough from the rich. Asked for", needy.get(nj), "still needing", needed);
+            nj.setReason("Waiting for defragmentation.");
         }
     }
 
@@ -2168,17 +2256,7 @@ public class NodepoolScheduler
                     continue;
                 }
 
-                int counted = 0;
-                switch ( rc.getPolicy() ) {
-                    case FAIR_SHARE:
-                        counted = j.countNSharesGiven();       // fair share allocation, accounting for
-                                                               // ramp-up, various caps, etc.  could be more, could be less than
-                                                               // the "pure" fair share.
-                        break;
-                    default:
-                        counted = j.countInstances();          // fixed, all, or nothing
-                        break;
-                }
+                int counted = j.countNSharesGiven();          // accounting for ramp-up, various caps, etc. 
 
                 int current = j.countNShares();                // currently allocated, plus pending, less those removed by earlier preemption
                 int needed = counted - current;                // could go negative if its evicting
@@ -2197,8 +2275,8 @@ public class NodepoolScheduler
                         must_defrag = true;
                     }                    
                 } else {                                       // if not fair-share, must always try to defrag if needed
-                    if ( j.isCompleted() ) continue;           // non-preemptable, this means it has at least once received
                                                                // its full allocation, and therefore cannot be needy
+                    // UIMA-4275 We rely on quotas to keep 'needed' under control
                     if ( needed > 0 ) {
                         jobmap.put(j, needed);   
                         must_defrag = true;

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java Sun Mar 15 14:20:49 2015
@@ -288,6 +288,10 @@ public class ResourceClass
      * See if the total memory for job 'j' plus the occupancy of the 'jobs' exceeds 'max'
      * Returns 'true' if occupancy is exceeded, else returns 'false'
      * UIMA-4275
+     *
+     * NOTE: After discussion this may not be the ideal place for this, in favor of a global
+     *       cap on all NPShares, but I suspect it will come back to haunt us that we need
+     *       the finer granularity of class-based caps, so the logic is staying here for now
      */
     private boolean occupancyExceeded(int max, IRmJob j, Map<IRmJob, IRmJob> jobs)
     {
@@ -302,7 +306,7 @@ public class ResourceClass
             // Then multiply by the scheduling quantum to convert to GB
             occupancy += ( job.countNSharesGiven()  * job.getShareOrder() * share_quantum ); // convert to GB
         }
-        int requested = j.getMemory() * j.countInstances();
+        int requested = j.getMemory() * j.getMaxShares();
         
         if ( max - ( occupancy + requested ) < 0 ) {
             return true;

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java Sun Mar 15 14:20:49 2015
@@ -50,7 +50,9 @@ public class RmJob
     protected ResourceClass resource_class;           // The actual class, assigned as job is received in scheduler.
     protected int    user_priority;                   // user "priority", really apportionment 
 
-    protected int n_machines;                         // RESERVE:     minimum machines to allocate
+    // @deprecated
+    // protected int n_machines;                         // RESERVE:     minimum machines to allocate
+
     protected int max_shares;                         // FAIR_SHARE:  maximum N shares to allocate
     protected boolean is_reservation = false;
 
@@ -267,7 +269,7 @@ public class RmJob
     public int queryDemand()
     {
         if ( getSchedulingPolicy() == Policy.FAIR_SHARE ) return getJobCap();
-        return countInstances();
+        return max_shares;
     }
 
     /**
@@ -337,6 +339,18 @@ public class RmJob
     }
 
 
+    // UIMA-4275
+    public int countOccupancy()
+    {
+        if ( (given_by_order == null) || (given_by_order[share_order] == 0) ) {
+            // must use current allocation because we haven't been counted yet
+            return countNShares() * share_order;
+        } else {
+            // new allocation may not match current, so we use that one
+            return given_by_order[share_order] * share_order;
+        }
+    }
+
     public int countNSharesGiven()
     {
         if ( given_by_order == null) { return 0; }
@@ -813,13 +827,13 @@ public class RmJob
      *
      * @Deprecated.  Keeping until I can nuke or update the simple fair share code.
      */
-    public void shrinkTo(int k)
-    {	
+    //public void shrinkTo(int k)
+    //{	
 //         int count = assignedShares.size() - k;
 //         for ( int i = 0; i < count; i++ ) {
 //             pendingRemoves.add(assignedShares.get(i));
 //         }
-    }
+    //}
 
     /**
      * Waiting for somebody to deal with my shrinkage?
@@ -1022,7 +1036,14 @@ public class RmJob
             return;
         }
 
-        if ( getSchedulingPolicy() != Policy.FAIR_SHARE ) return;
+        if ( isCompleted() ) {
+            // job is finishing up and will relinquish all shares soon, let's avoid complicating the
+            // world and just wait for it to happen.
+            job_cap = countNShares();
+            return;
+        }
+
+        // if ( getSchedulingPolicy() != Policy.FAIR_SHARE ) return;
 
         int c = nquestions_remaining / threads;
 
@@ -1166,14 +1187,16 @@ public class RmJob
     	return resource_class;
     }
     
-    public int countInstances() {
-        return n_machines;
-    }
+    // @deprecated
+    //public int countInstances() {
+    //    return n_machines;
+    //}
 
-    public void setNInstances(int m)
-    {
-        this.n_machines = m;
-    }
+    // @deprecated
+    // public void setNInstances(int m)
+    // {
+    //    this.n_machines = m;
+    // }
 
     public int nThreads() {
         return threads;
@@ -1268,7 +1291,7 @@ public class RmJob
         return buf.toString();
     }
 
-    String getShortType()
+    public String getShortType()
     {
         String st = "?";
         switch ( ducc_type ) {
@@ -1313,7 +1336,7 @@ public class RmJob
                                  shares, share_order, (shares * share_order),      // 5 6 7
                                  0, memory,                                        // 8 9
                                  0, 0, 0,                                          // 10 11 12
-                                 n_machines);                                      // 13
+                                 max_shares);                                      // 13
             
                                  
         } else {   

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Sun Mar 15 14:20:49 2015
@@ -614,14 +614,21 @@ public class Scheduler
                 String val = ((String) dp.get(l)).trim();
 
                 int lim = Integer.parseInt( val ); // verified parsable int during parsing
-                String[] tmp = ((String)l).split("\\.");                // max_allotment.classname
+
+
                 User user = users.get(n);
                 if (user == null) {
                     user = new User(n);
                     users.put(n, user);
                 }
-                ResourceClass rc = resourceClassesByName.get(tmp[1]);
-                user.overrideLimit(rc, lim);   // constrain allotment for this class to value in l
+
+                if ( val.contains(".") ) {
+                    String[] tmp = ((String)l).split("\\.");                // max_allotment.classname
+                    ResourceClass rc = resourceClassesByName.get(tmp[1]);
+                    user.overrideLimit(rc, lim);   // constrain allotment for this class to value in l
+                } else {
+                    user.overrideGlobalLimit(lim);
+                }
             }
         }
 
@@ -735,9 +742,9 @@ public class Scheduler
             HashMap<Share, Share> sharesE = j.getAssignedShares();
             HashMap<Share, Share> sharesN = j.getPendingShares();        	
 
-            if ( sharesE.size() == j.countInstances() ) {
+            if ( sharesE.size() == j.getMaxShares() ) {
                 logger.trace(methodName, j.getId(), "reserve_stable", sharesE.size(), "machines");
-            } else  if ( sharesN.size() == j.countInstances() ) {           // reservation is complete but not yet confirmed?
+            } else  if ( sharesN.size() == j.getMaxShares() ) {           // reservation is complete but not yet confirmed?
                 logger.trace(methodName, j.getId(), "reserve_adding", sharesN.size(), "machines");
                 for ( Share s : sharesN.values()) {
                     logger.trace(methodName, j.getId(), "    reserve_expanding ", s.toString());
@@ -745,7 +752,7 @@ public class Scheduler
                 jmu.addShares(j, sharesN);                
                 j.promoteShares();
             } else {
-                logger.trace(methodName, j.getId(), "reserve_pending", j.countInstances(), "machines");
+                logger.trace(methodName, j.getId(), "reserve_pending", j.getMaxShares(), "machines");
             }
             logger.trace(methodName, j.getId(), "<<<<<<<<<<");
         }
@@ -1201,7 +1208,7 @@ public class Scheduler
             np.getOnlineByOrder(onlineMachines);
 
             for ( int i = 1; i < freeMachines.length; i++ ) {
-                freeMachines[i] += np.countFreeMachines(i, true);
+                freeMachines[i] += np.countFreeMachines(i);
             }
             
             int[] t = np.cloneVMachinesByOrder();
@@ -1486,8 +1493,6 @@ public class Scheduler
                     logger.info(methodName, j.getId(), "Set fixed bit for FIXED job");
                     s.setShareOrder(share_order);
                     s.setFixed();
-                    j.markComplete();            // in recovery: if there are any shares at all for this job
-                                                 // we know it once had all its shares so its allocation is complete
                     if ( !compatibleNodepool(s, j) ) {            // UIMA-4142
                         if ( j.isService() ) {
                             sharesToShrink.add(s);   // nodepool reconfig snafu, SM will reallocate the process
@@ -1499,8 +1504,6 @@ public class Scheduler
                 case RESERVE:
                     logger.info(methodName, j.getId(), "Set fixed bit for RESERVE job");
                     s.setFixed();
-                    j.markComplete();            // in recovery: if there are any shares at all for this job
-                                                 // we know it once had all its shares so its allocation is complete
                     if ( j.isService() && !compatibleNodepool(s, j) ) {       // UIMA-4142
                         sharesToShrink.add(s);   // nodepool reconfig snafu, SM will reallocate the process
                     }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java?rev=1666803&r1=1666802&r2=1666803&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java Sun Mar 15 14:20:49 2015
@@ -22,6 +22,8 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.uima.ducc.rm.scheduler.SchedConstants.Policy;
+
 public class User
     implements IEntity
 {
@@ -32,6 +34,7 @@ public class User
     private Map<Integer, Map<IRmJob, IRmJob>> jobsByOrder = new HashMap<Integer, Map<IRmJob, IRmJob>>();
 
     private Map<ResourceClass, Integer> classLimits = new HashMap<ResourceClass, Integer>(); // UIMA-4275
+    private int globalLimit = -1;  // use global limit by default;
 
     //private int user_shares;       // number of shares to apportion to jobs in this user in current epoch
     private int pure_fair_share;   // uncapped un-bonused counts
@@ -51,12 +54,41 @@ public class User
         return 0;
     }
 
-    // UIMA-4275
+    // UIMA-4275, class-based limit
     void overrideLimit(ResourceClass rc, int lim)
     {
         classLimits.put(rc, lim);
     }
 
+    // UIMA-4275 Global NPshare limit override from registry
+    void overrideGlobalLimit(int lim)
+    {
+        globalLimit = lim;
+    }
+    
+    // UIMA-4275 Get the override on the global limit
+    int getOverrideLimit()
+    {
+        return globalLimit;
+    }
+
+
+    // UIMA-4275, count all Non-Preemptable shares for this user, quantum shares
+    int countNPShares()
+    {
+        int occupancy = 0;
+        for ( IRmJob j : jobs.values() ) {
+            if ( j.getSchedulingPolicy() != Policy.FAIR_SHARE ) {
+                // nshares_given is shares counted out for the job but maybe not assigned
+                // nshares       is shares given
+                // share_order   is used to convert nshares to qshares so
+                // so ( nshares_give + nshares ) * share_order is the current potential occupancy of the job
+                occupancy += ( j.countOccupancy() );
+            }
+        }
+        return occupancy;
+    }
+
     int getClassLimit(ResourceClass rc)
     {
         if ( classLimits.containsKey(rc) ) return classLimits.get(rc);