You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:43:38 UTC

svn commit: r1427967 [5/5] - in /uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src: main/ main/java/ main/java/org/ main/java/org/apache/ main/java/org/apache/uima/ main/java/org/apache/uima/ducc/ main/java/org/apache/uima/ducc/rm/ main/java/org/apache/uim...

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,1139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.uima.ducc.common.IIdentity;
+import org.apache.uima.ducc.common.Node;
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.Pair;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccProperties;
+import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
+import org.apache.uima.ducc.common.utils.Version;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.common.utils.id.DuccIdFactory;
+
+
+/**
+ * This process orchestrates scheduling.
+ * - Receives requests from clients ( job manager, service manager, etc ) for resources
+ * - Forwards requests and current state to pluggable scheduling implementation
+ * - Receives a schedule, updates state, sends responses to requestors
+ * - Maintains state as needed (work item life cycle etc)
+ */
+public class Scheduler
+//    extends Thread
+    implements ISchedulerMain,
+    	SchedConstants
+{
+    IJobManager jobManager;
+    DuccLogger     logger = DuccLogger.getLogger(Scheduler.class, COMPONENT_NAME);
+
+    boolean done = false;
+    // Boolean force_epoch = false;
+    String ducc_home;
+    // Integer epoch = 5;                                                 // scheduling epoch, seconds
+
+    NodePool nodepool;
+
+    //
+    // Fair-share and fixed-share use shares only, not machines
+    //
+    HashMap<DuccId, Share> busyShares        = new HashMap<DuccId, Share>(); // Running "fair" share jobs
+
+    // incoming reports of machines that are now free
+    HashMap<DuccId, Pair<IRmJob, Share>> vacatedShares= new HashMap<DuccId, Pair<IRmJob, Share>>();
+    // boolean growthOccurred = false;                                           // don't care which grew, just that something grew
+
+    ArrayList<IRmJob>        incomingJobs    = new ArrayList<IRmJob>();       // coming in from external world but not added our queues yet
+    ArrayList<IRmJob>        recoveredJobs   = new ArrayList<IRmJob>();       // coming in from external world but we don't now about them, (hopefully
+                                                                              //    because we crashed and not for more nefarious reasons)
+    ArrayList<IRmJob>        completedJobs   = new ArrayList<IRmJob>();       // signaled complete from outside but not yet dealt with
+    ArrayList<IRmJob>        initializedJobs = new ArrayList<IRmJob>();       // Init is complete so we can begin full (un)fair share allocation
+
+    //HashMap<Node, Node> incomingNodes  = new HashMap<Node, Node>();         // node updates
+    HashMap<Node, Node> deadNodes      = new HashMap<Node, Node>();           // missed too many heartbeats
+    HashMap<Node, Node> allNodes       = new HashMap<Node, Node>();           // the guys we know
+
+    HashMap<String, User>    users     = new HashMap<String, User>();         // Active users - has a job in the system
+    //HashMap<DuccId, IRmJob>    runningJobs = new HashMap<DuccId, IRmJob>();
+
+    HashMap<DuccId, IRmJob>  allJobs = new HashMap<DuccId, IRmJob>();
+
+    HashMap<ResourceClass, ResourceClass> resourceClasses = new HashMap<ResourceClass, ResourceClass>();
+    HashMap<String, ResourceClass> resourceClassesByName = new HashMap<String, ResourceClass>();
+
+    String defaultClassName = null;
+    int defaultNThreads = 1;
+    int defaultNTasks = 10;
+    int defaultMemory = 16;
+
+    // these two are initialized in constructor
+    String schedImplName;
+    IScheduler scheduler;
+
+    long share_quantum    = 16;             // 16 GB in KB - smallest share size
+    long share_free_dram  = 0;              // 0  GB in KB  - minim memory after shares are allocated
+    long dramOverride     = 0;              // if > 0, use this instead of amount reported by agents (modeling and testing)
+
+    EvictionPolicy evictionPolicy = EvictionPolicy.SHRINK_BY_MACHINE;
+
+//     int nodeMetricsUpdateRate = 30000;
+//     int startupCountdown = 0;       // update each epoch.  only schedule when it's > nodeStability
+    int nodeStability = 3;
+    boolean stability = false;
+
+    // static boolean expandByDoubling = true;
+    // static int initializationCap = 2;      // Max allocation until we know initialization works in
+                                           // units of *processes*, not shares (i.e.N-shares).
+
+    //
+    // Version
+    //    0 - major version
+    //    6 - minor version
+    //    3 - ptf - forced eviction under fragmentation.
+    //    4 - defrag code complete
+    //  beta - not yet "real"!
+    //
+    final static int rmversion_major = 0;
+    final static int rmversion_minor = 6;
+    final static int rmversion_ptf   = 4;  
+    final static String rmversion_string = "beta";
+
+    boolean initialized = false;           // we refuse nodeupdates until this is true
+    public Scheduler()
+    {
+    }
+
+    public void init()
+        throws Exception
+    {
+        String methodName = "init";
+        //setName("Scheduler");
+
+        DuccLogger.setUnthreaded();
+
+        String ep         = SystemPropertyResolver.getStringProperty("ducc.rm.eviction.policy", "SHRINK_BY_MACHINE");
+        evictionPolicy    = EvictionPolicy.valueOf(ep);        
+
+        nodepool          = new NodePool(null, evictionPolicy, 0);   // global nodepool
+        share_quantum     = SystemPropertyResolver.getLongProperty("ducc.rm.share.quantum", share_quantum) * 1024 * 1024;        // GB -> KB
+        share_free_dram   = SystemPropertyResolver.getLongProperty("ducc.rm.reserved.dram", share_free_dram) * 1024 * 1024;   // GB -> KB
+        ducc_home         = SystemPropertyResolver.getStringProperty("DUCC_HOME");
+
+        // some defaults, for jobs that don't specify them
+        defaultNTasks     = SystemPropertyResolver.getIntProperty("ducc.rm.default.tasks", 10); 
+        defaultNThreads   = SystemPropertyResolver.getIntProperty("ducc.rm.default.threads", 1);
+        defaultMemory     = SystemPropertyResolver.getIntProperty("ducc.rm.default.memory", 16);      // in GB
+        // expandByDoubling  = RmUtil.getBooleanProperty("ducc.rm.expand.by.doubling", true);
+
+        nodeStability     = SystemPropertyResolver.getIntProperty("ducc.rm.node.stability", 3);        // number of node metrics updates to wait for before scheduling
+                                                                                  // 0 means, just jump right in and don't wait
+        // initializationCap = RmUtil.getIntProperty("ducc.rm.initialization.cap", 2);
+
+        dramOverride = SystemPropertyResolver.getLongProperty("ducc.rm.override.dram", 0);
+        if ( dramOverride > 0 ) {
+            dramOverride = dramOverride * (1024 * 1024);         // convert to KB
+        }
+
+        try {
+            schedImplName = SystemPropertyResolver.getStringProperty("ducc.rm.scheduler", "org.apache.uima.ducc.rm.rm.ClassBasedScheduler");
+            @SuppressWarnings("unchecked")
+			Class<IScheduler> cl = (Class<IScheduler>) Class.forName(schedImplName);
+            scheduler = (IScheduler) cl.newInstance();
+        } catch (ClassNotFoundException e) {
+            throw new SchedulingException(null, "Cannot find class " + schedImplName);
+        } catch (InstantiationException e) {
+            throw new SchedulingException(null, "Cannot instantiate class " + schedImplName);            
+        } catch (IllegalAccessException e) {
+            throw new SchedulingException(null, "Cannot instantiate class " + schedImplName + ": can't access constructor.");            
+        }
+
+        String class_definitions = SystemPropertyResolver.getStringProperty("ducc.rm.class.definitions", "scheduler.classes");
+        try {
+            initClasses(class_definitions);
+        } catch ( Exception e ) {
+            logger.error(methodName, null, e);
+            throw e;
+        }
+
+        // we share most of the state with the actual scheduling code - no need to keep passing this around
+        // TODO: Make sure these are all Sialized correctly
+        scheduler.setEvictionPolicy(evictionPolicy);
+        scheduler.setClasses(resourceClasses);
+        scheduler.setNodePool(nodepool);
+
+        logger.info(methodName, null, "Scheduler running with share quantum           : ", (share_quantum / (1024*1024)), " GB");
+        logger.info(methodName, null, "                       reserved DRAM           : ", (share_free_dram / (1024*1024)), " GB");
+        logger.info(methodName, null, "                       DRAM override           : ", (dramOverride / (1024*1024)), " GB");
+        logger.info(methodName, null, "                       scheduler               : ", schedImplName);
+        logger.info(methodName, null, "                       default threads         : ", defaultNThreads);
+        logger.info(methodName, null, "                       default tasks           : ", defaultNTasks);
+        logger.info(methodName, null, "                       default memory          : ", defaultMemory);
+        logger.info(methodName, null, "                       class definition file   : ", class_definitions);
+        logger.info(methodName, null, "                       RM:OR scheduling ratio  : ", SystemPropertyResolver.getIntProperty("ducc.rm.state.publish.ratio", 
+                                                                                                                                 DEFAULT_SCHEDULING_RATIO) + ":1");
+        logger.info(methodName, null, "                       eviction policy         : ", evictionPolicy);
+        logger.info(methodName, null, "                       use prediction          : ", SystemPropertyResolver.getBooleanProperty("ducc.rm.prediction", true));
+        logger.info(methodName, null, "                       prediction fudge factor : ", SystemPropertyResolver.getIntProperty("ducc.rm.prediction.fudge", 10000));
+        logger.info(methodName, null, "                       node stability          : ", nodeStability);
+        logger.info(methodName, null, "                       metrics update rate     : ", SystemPropertyResolver.getIntProperty("ducc.agent.node.metrics.publish.rate", 
+                                                                                                                                 DEFAULT_NODE_METRICS_RATE));
+        logger.info(methodName, null, "                       initialization cap      : ", SystemPropertyResolver.getIntProperty("ducc.rm.initialization.cap"));
+        logger.info(methodName, null, "                       expand by doubling      : ", SystemPropertyResolver.getBooleanProperty("ducc.rm.expand.by.doubling", true));
+        logger.info(methodName, null, "                       fragmentation threshold : ", SystemPropertyResolver.getIntProperty("ducc.rm.fragmentation.threshold", 2));
+        logger.info(methodName, null, "                       do defragmentation      : ", SystemPropertyResolver.getBooleanProperty("ducc.rm.defragmentation", true));
+        logger.info(methodName, null, "                       DUCC home               : ", System.getProperty("DUCC_HOME"));
+        logger.info(methodName, null, "                       ActiveMQ URL            : ", SystemPropertyResolver.getStringProperty("ducc.broker.url"));
+        logger.info(methodName, null, "                       JVM                     : ", System.getProperty("java.vendor") +
+                                                                                      " "+ System.getProperty("java.version"));
+        logger.info(methodName, null, "                       JAVA_HOME               : ", System.getProperty("java.home"));
+        logger.info(methodName, null, "                       JVM Path                : ", System.getProperty("ducc.jvm"));
+        logger.info(methodName, null, "                       JMX URL                 : ", System.getProperty("ducc.jmx.url"));
+        logger.info(methodName, null, "                       OS Architecture         : ", System.getProperty("os.arch"));
+        logger.info(methodName, null, "                       DUCC Version            : ", Version.version());
+        logger.info(methodName, null, "                       RM Version              : ", ""+ rmversion_major   + "." 
+                                                                                             + rmversion_minor   + "." 
+                                                                                             + rmversion_ptf     + "-" 
+                                                                                             + rmversion_string);
+        initialized = true;
+    }
+
+    public Machine getMachine(NodeIdentity ni)
+    {
+    	return nodepool.getMachine(ni);        
+    }
+
+    public void setJobManager(IJobManager jobmanager)
+    {
+        this.jobManager = jobmanager;
+    }
+
+    public String getDefaultClassName()
+    {
+    	return defaultClassName;
+    }
+
+    public int getDefaultNThreads()
+    {
+    	return defaultNThreads;
+    }
+
+    public int getDefaultNTasks()
+    {
+    	return defaultNTasks;
+    }
+
+    public int getDefaultMemory()
+    {
+    	return defaultMemory;
+    }
+
+    public ResourceClass getResourceClass(String name)
+    {
+        return resourceClassesByName.get(name);
+    }
+
+    public IRmJob getJob(DuccId id)
+    {
+        return allJobs.get(id);
+    }
+
+    public Share getShare(DuccId id)
+    {
+        return busyShares.get(id);
+    }
+
+//    public static int getInitializationCap()
+//    {
+//        return initializationCap;
+//    }
+//
+//    public static boolean isExpandByDoubling()
+//    {
+//        return expandByDoubling;
+//    }
+
+    /**
+     * Calculate share order, given some memory size in GB (as in from a job spec)
+     */
+    int calcShareOrder(long mem)
+    {
+        // Calculate its share order
+        mem = mem * 1024 * 1024;                 // to GB
+        
+        int share_order = (int) (mem / share_quantum);               // liberal calc, round UP
+        if ( (mem % share_quantum) > 0 ) {
+            share_order++;
+        }
+        return share_order;
+    }
+
+    /**
+     * Use the NodeIdentity to infer my the domain name.
+     *
+     * Itertate through the possible names - if one of them has a '.'
+     * the we have to assume the following stuff is the domain name.
+     * We only get one such name, so we give up the search if we find
+     * it.
+     */
+    private String getDomainName()
+    {
+    	String methodName = "getDomainName";
+        try {
+			NodeIdentity ni   = new NodeIdentity();
+			for ( IIdentity id : ni.getNodeIdentities()) {
+			    String n = id.getName();
+			    int ndx = n.indexOf(".");
+			    if ( ndx > 0 ) {
+			        return n.substring(ndx + 1);
+			    }
+			}
+		} catch (Exception e) {
+			// TODO Auto-generated catch block
+			logger.warn(methodName, null, "Cannot create my own node identity:", e);
+		}
+        return null;  // crappy config if this happens, some stuff may not match nodepools and
+                      // nothing to do about it.
+    }
+
+    Map<String, String> readNodepoolFile(String npfile)
+    {
+        String my_domain = getDomainName();
+        String ducc_home = System.getProperty("DUCC_HOME");
+        npfile = ducc_home + "/resources/" + npfile;
+
+        Map<String, String> response = new HashMap<String, String>();
+
+        try {
+            BufferedReader br = new BufferedReader(new FileReader(npfile));
+            String node = "";
+            while ( (node = br.readLine()) != null ) {
+                int ndx = node.indexOf("#");
+                if ( ndx >= 0 ) {
+                    node = node.substring(0, ndx);
+                }
+                node = node.trim();
+                if (node.equals("") ) {
+                    continue;
+                }
+
+                if ( node.startsWith("import") ) {
+                    String[] tmp = node.split("\\s");
+                    response.putAll(readNodepoolFile(tmp[1]));
+                    continue;
+                }
+                response.put(node, node);
+
+                // include fully and non-fully qualified names to allow sloppiness of config
+                ndx = node.indexOf(".");
+                String dnode;
+                if ( ndx >= 0 ) {
+                    dnode = node.substring(0, ndx);
+                    response.put(dnode, dnode);
+                } else if ( my_domain != null ) {
+                    dnode = node + "." + my_domain;
+                    response.put(dnode, dnode);
+                }
+            }
+            br.close();                        
+            
+        } catch (FileNotFoundException e) {
+            throw new SchedulingException(null, "Cannot open NodePool file \"" + npfile + "\": file not found.");
+        } catch (IOException e) {
+            throw new SchedulingException(null, "Cannot read NodePool file \"" + npfile + "\": I/O Error.");
+        }
+                
+        return response;
+    }
+
+    void initClasses(String filename)
+        throws Exception
+    {
+        String methodName = "initClasses";
+        DuccProperties props = new DuccProperties();
+        props.load(ducc_home + "/resources/" + filename);
+
+        defaultClassName = props.getProperty("scheduling.default.name");
+
+        // read in nodepools
+        String npn = props.getProperty("scheduling.nodepool");
+        if ( npn != null ) {
+            String[] npnames = npn.split(" ");
+            for ( String nodepoolName : npnames ) {
+                int nporder = props.getIntProperty("scheduling.nodepool." + nodepoolName + ".order", 100);                
+                String npfile = props.getProperty("scheduling.nodepool." + nodepoolName).trim();
+                Map<String,String> npnodes = readNodepoolFile(npfile);                
+                nodepool.createSubpool(nodepoolName, npnodes, nporder);                    
+//                 } catch (FileNotFoundException e) {
+//                     throw new SchedulingException(null, "Cannot open NodePool file \"" + npfile + "\": file not found.");
+//                 } catch (IOException e) {
+//                     throw new SchedulingException(null, "Cannot read NodePool file \"" + npfile + "\": I/O Error.");
+//                 }
+            }
+        }
+        
+        // read in the class definitions
+        String cn = props.getProperty("scheduling.class_set");
+        if ( cn == null ) {
+            throw new SchedulingException(null, "No class definitions found, scheduler cannot start.");
+        }
+        
+        String[] classNames = cn.split("\\s+");
+        logger.info(methodName, null, "Classes:");
+        logger.info(methodName, null, ResourceClass.getHeader());
+        logger.info(methodName, null, ResourceClass.getDashes());
+        for ( String n : classNames ) {
+        	n = n.trim();
+            ResourceClass rc = new ResourceClass(n); //, nodepool.getMachinesByName(), nodepool.getMachinesByIp());
+            rc.init(props);
+            resourceClasses.put(rc, rc);
+            resourceClassesByName.put(n, rc);
+            logger.info(methodName, null, rc.toString());
+        }
+    }
+
+    void initClassesX(String filename)
+        throws Exception
+    {
+        String methodName = "initClasses";
+        DuccProperties props = new DuccProperties();
+        props.load(ducc_home + "/resources/" + filename);
+
+        defaultClassName = props.getProperty("scheduling.default.name");
+        String my_domain = getDomainName();
+
+        // read in nodepools
+        String npn = props.getProperty("scheduling.nodepool");
+        if ( npn != null ) {
+            String[] npnames = npn.split(" ");
+            for ( String nodepoolName : npnames ) {
+                int nporder = props.getIntProperty("scheduling.nodepool." + nodepoolName + ".order", 100);                
+                String npfile = props.getProperty("scheduling.nodepool." + nodepoolName).trim();
+                try {
+                    String ducc_home = System.getProperty("DUCC_HOME");
+                    npfile = ducc_home + "/resources/" + npfile;
+                    BufferedReader br = new BufferedReader(new FileReader(npfile));
+                    String node = "";
+                    HashMap<String, String> npnodes = new HashMap<String, String>();
+                    while ( (node = br.readLine()) != null ) {
+                        int ndx = node.indexOf("#");
+                        if ( ndx >0 ) {
+                            node = node.substring(0, ndx);
+                        }
+                        node = node.trim();
+                        if (node.equals("") ) {
+                            continue;
+                        }
+
+                        npnodes.put(node, node);
+
+                        // include fully and non-fully qualified names to allow sloppiness of config
+                        ndx = node.indexOf(".");
+                        String dnode;
+                        if ( ndx >= 0 ) {
+                            dnode = node.substring(0, ndx);
+                            npnodes.put(dnode, dnode);
+                        } else if ( my_domain != null ) {
+                            dnode = node + "." + my_domain;
+                            npnodes.put(dnode, dnode);
+                        }
+                    }
+                    br.close();                        
+                    nodepool.createSubpool(nodepoolName, npnodes, nporder);
+                    
+                } catch (FileNotFoundException e) {
+                    throw new SchedulingException(null, "Cannot open NodePool file \"" + npfile + "\": file not found.");
+                } catch (IOException e) {
+                    throw new SchedulingException(null, "Cannot read NodePool file \"" + npfile + "\": I/O Error.");
+                }
+            }
+        }
+        
+        // read in the class definitions
+        String cn = props.getProperty("scheduling.class_set");
+        if ( cn == null ) {
+            throw new SchedulingException(null, "No class definitions found, scheduler cannot start.");
+        }
+        
+        String[] classNames = cn.split("\\s+");
+        logger.info(methodName, null, "Classes:");
+        logger.info(methodName, null, ResourceClass.getHeader());
+        logger.info(methodName, null, ResourceClass.getDashes());
+        for ( String n : classNames ) {
+        	n = n.trim();
+            ResourceClass rc = new ResourceClass(n); //, nodepool.getMachinesByName(), nodepool.getMachinesByIp());
+            rc.init(props);
+            resourceClasses.put(rc, rc);
+            resourceClassesByName.put(n, rc);
+            logger.info(methodName, null, rc.toString());
+        }
+    }
+
+
+
+    /**
+     * Called only from schedule, under the 'this' monitor.
+     *
+     * We then take the SchedulingUpdate from the IScheduler and dispatches orders to
+     * the world to make it happen.
+     *
+     * For jobs that lose resources, job manager is asked to stop execution in specific shares.
+     * For jobs that gain resources, job manager is asked to start execution in specific shares.
+     * Jobs that don't change are leftovers.  If they're not running at all, they're in the pending
+     *  list; they might also be in the running list but had no allocation changes in the current epoch.
+     */
+    private JobManagerUpdate dispatch(SchedulingUpdate upd, JobManagerUpdate jmu)
+    {
+        String methodName = "dispatch";
+        HashMap<IRmJob, IRmJob> jobs;
+
+        // Go through shrunken jobs - if they are shrunken to 0, move to dormant
+        jobs = upd.getShrunkenJobs();
+        for (IRmJob j : jobs.values()) {
+            
+            logger.info(methodName, j.getId(), ">>>>>>>>>> SHRINK");
+
+            HashMap<Share, Share> sharesE = j.getAssignedShares();
+            HashMap<Share, Share> sharesR = j.getPendingRemoves();
+            logger.info(methodName, j.getId(), "removing", sharesR.size(), "of existing", sharesE.size(), "shares.");
+
+            for ( Share s : sharesE.values() ) {
+                logger.debug(methodName, j.getId(), "    current", s.toString());
+            }
+
+            for ( Share s : sharesR.values() ) {
+                logger.debug(methodName, j.getId(), "    remove ", s.toString());
+            }
+            logger.info(methodName, j.getId(), ">>>>>>>>>>");
+
+            jmu.removeShares(j, sharesR);
+            // jobManager.stopJob(j, shares);                 // stops job on everything on the pendingRemoves list
+            // j.clearPendingRemoves();
+        }
+
+        // Go through expanded jobs - if they are dormant, remove from dormant
+        //                            then add to running.
+        // Tell the server it needs to start some machines for the job
+        jobs = upd.getExpandedJobs();
+        for (IRmJob j : jobs.values() ) {
+            HashMap<Share, Share> sharesE = j.getAssignedShares();
+            HashMap<Share, Share> sharesN = j.getPendingShares();        	
+
+            logger.info(methodName, j.getId(), "<<<<<<<<<<  EXPAND");
+            logger.info(methodName, j.getId(), "adding", sharesN.size(), "new shares to existing", sharesE.size(), "shares.");
+
+            for ( Share s : sharesE.values()) {
+                logger.debug(methodName, j.getId(), "    existing ", s.toString());
+            }
+
+            for ( Share s : sharesN.values()) {
+                logger.debug(methodName, j.getId(), "    expanding", s.toString());
+            }
+            logger.info(methodName, j.getId(), "<<<<<<<<<<");
+
+            sharesN = j.promoteShares();
+            if ( sharesN.size() == 0 ) {
+                // internal error - should not be marked expanded if no machines
+                throw new SchedulingException(j.getId(), "Trying to execute expanded job but no pending machines.");
+            }
+
+            for ( Share s : sharesN.values()) {                           // update machine books                
+                // Sanity checks on the bookkeeping
+                busyShares.put(s.getId(), s);                
+            }
+
+//            DuccId id = j.getId();                                  // pull from dormant, maybe
+//            if ( dormantJobs .containsKey(id) ) {
+//                dormantJobs .remove(id);
+//            }
+
+            //runningJobs.put(id, j);
+            jmu.addShares(j, sharesN);
+            // jobManager.executeJob(j, shares);                      // will update job's pending lists
+
+        }
+
+        jobs = upd.getStableJobs();                             // squirrel these away to try next time
+        for (IRmJob j: jobs.values()) {
+            if ( j.countNShares() < 0 ) {
+                throw new SchedulingException(j.getId(), "Share count went negative " + j.countNShares());
+            }
+            logger.info(methodName, j.getId(), ".......... STABLE with ", j.countNShares(), " shares.");
+        }
+
+        jobs = upd.getDormantJobs();                             // squirrel these away to try next time
+        for (IRmJob j: jobs.values()) {
+            logger.info(methodName, j.getId(), ".......... DORMANT");
+//            dormantJobs .put(j.getId(), j);
+        }
+
+        jobs = upd.getReservedJobs();
+        for (IRmJob j: jobs.values()) {
+            logger.info(methodName, j.getId(), "<<<<<<<<<<  RESERVE");
+
+            HashMap<Share, Share> sharesE = j.getAssignedShares();
+            HashMap<Share, Share> sharesN = j.getPendingShares();        	
+
+            if ( sharesE.size() == j.countInstances() ) {
+                logger.info(methodName, j.getId(), "reserve_stable", sharesE.size(), "machines");
+            } else  if ( sharesN.size() == j.countInstances() ) {           // reservation is complete but not yet confirmed?
+                logger.info(methodName, j.getId(), "reserve_adding", sharesN.size(), "machines");
+                for ( Share s : sharesN.values()) {
+                    logger.debug(methodName, j.getId(), "    reserve_expanding ", s.toString());
+                }
+                jmu.addShares(j, sharesN);                
+                j.promoteShares();
+            } else {
+                logger.info(methodName, j.getId(), "reserve_pending", j.countInstances(), "machines");
+            }
+            logger.info(methodName, j.getId(), "<<<<<<<<<<");
+        }
+
+        jmu.setAllJobs(allJobs);
+
+        jobs = upd.getRefusedJobs();
+        Iterator<IRmJob> iter = jobs.values().iterator();
+        while ( iter.hasNext() ) {
+            IRmJob j = iter.next();
+            logger.info(methodName, j.getId(), ".......... REFUSED");
+        }
+
+        return jmu;
+    }
+
+    /**
+     * We don't accept new work or even Orchestrator state updates until "ready". We do
+     * want machines, but be sure the internal structures are protected.
+     */
+    public boolean ready()
+    {
+    	return stability;
+    }
+
+    public void start()
+    {
+        stability = true;
+    }
+
+    protected void handleDeadNodes()
+    {
+    	String methodName = "handleDeadNodes";
+    	
+        if ( ! initialized ) {
+            return;
+        }
+
+        HashMap<Node, Node> nodeUpdates = new HashMap<Node, Node>();
+        synchronized(deadNodes) {
+            nodeUpdates.putAll(deadNodes);
+            deadNodes.clear();
+        }
+
+        synchronized(this) {
+
+            for ( Node n : nodeUpdates.values() ) {
+                Machine m = nodepool.getMachine(n);
+
+                if ( m == null ) {
+                    // must have been removed because of earlier missed hb
+                    continue;
+                }
+
+                logger.warn(methodName, null, "***Purging machine***", m.getId(), "due to missed heartbeats. THreshold:",  nodeStability);
+                NodePool np = m.getNodepool();
+                np.nodeLeaves(m);
+            }
+        }        
+    }
+
+    /**
+     * We first accept any changes and requests from the outside world and place them where they
+     * can be acted on in this epoch.
+     *
+     * We then pass all relevent requests and resources to the IScheduler.  This returns a
+     * SchedulingUpdate which is passed to the dispatcher to be acted upon.
+     */
+    public JobManagerUpdate schedule()
+    {
+    	String methodName = "schedule";
+
+
+//         if ( startupCountdown++ < nodeStability ) {
+//             logger.info(methodName, null, "Startup countdown:", startupCountdown, "of", nodeStability);
+//             return null;
+//         }
+
+        if ( ! stability ) {
+            return null;
+        }
+
+        // tracking the OR hang problem - are topics being delivered?
+        logger.debug("nodeArrives", null, "Total arrivals:", total_arrivals);
+
+        handleDeadNodes();
+        nodepool.reset(NodePool.getMaxOrder());
+
+        // TODO: Can we combine these two into one?
+        SchedulingUpdate upd = new SchedulingUpdate();              // state from internal scheduler
+        JobManagerUpdate jmu = new JobManagerUpdate();              // state we forward to job manager
+
+        // int nchanges = 0;
+    	
+
+        ArrayList<IRmJob> jobsToRecover = new ArrayList<IRmJob>();
+        synchronized(recoveredJobs) {
+            jobsToRecover.addAll(recoveredJobs);
+            recoveredJobs.clear();
+            // nchanges += jobsToRecover.size();
+        }
+
+        ArrayList<IRmJob> newJobs = new ArrayList<IRmJob>();
+        // 
+        // If there are new jobs we need to init some things and start a scheduling cycle.
+        //
+        synchronized(incomingJobs) {            
+            newJobs.addAll(incomingJobs);
+            incomingJobs.clear();
+            // nchanges += newJobs.size();
+        }
+
+        //
+        // If some jobs pased initializion we need to signal a scheduling cycle to get
+        // them their fair share
+        //
+//        synchronized(initializedJobs) {            
+//            if ( initializedJobs.size() > 0 ) {
+//                nchanges++;
+//            }
+//            initializedJobs.clear();
+//        }
+
+        //
+        // If some jobs completed we need to process clearning them out and signal a
+        // scheduling cycle to try to reuse their resources.
+        //
+        ArrayList<IRmJob> doneJobs = new ArrayList<IRmJob>();
+        synchronized(completedJobs) {
+            doneJobs.addAll(completedJobs);
+            completedJobs.clear();
+            //nchanges += doneJobs.size();
+        }
+
+        //
+        // If some shares were vacated we need to clear them out and run a scheduling cycle.
+        //
+        ArrayList<Pair<IRmJob, Share>> doneShares= new ArrayList<Pair<IRmJob, Share>>();
+        synchronized(vacatedShares) {
+            doneShares.addAll(vacatedShares.values());
+            vacatedShares.clear();
+            //nchanges += doneShares.size();
+
+            // we use the vacatedShares object to control share growth as well
+            //if ( growthOccurred ) nchanges++;
+            //growthOccurred = false;
+        }
+
+//         boolean must_run = false;
+//         synchronized(force_epoch) {
+//             must_run = force_epoch;
+//             force_epoch = false;
+//         }
+
+//         if ( (nchanges == 0) && !must_run ) { 
+//             jmu.setAllJobs(allJobs);
+//             return jmu;
+//         }
+// TODO if we remove this code above be sure to clear out all the force_epoch nonsense
+// TODO does this even use growthOccurred?
+
+        synchronized(this) {
+
+            // before looking at jobs, insure we're updated after a crash
+            for ( IRmJob j : jobsToRecover ) {
+                processRecovery(j);
+            }
+
+            // process these next to free up resources for the scheduling cycle
+            for (Pair<IRmJob, Share> p : doneShares) {
+                processCompletion(p.first(), p.second());
+            }
+
+            for (IRmJob j : doneJobs) {
+                processCompletion(j);
+            }
+
+            // update user records, "check in" new jobs
+            if ( newJobs.size() > 0 ) {
+                logger.info(methodName, null, "Jobs arrive:");
+                logger.info(methodName, null, "submit", RmJob.getHeader());
+            }
+
+            Iterator<IRmJob> iter = newJobs.iterator();
+            while ( iter.hasNext() ) {
+                IRmJob j = iter.next();
+
+
+                if ( j.isRefused() ) {          // the JobManagerConverter has already refused it
+                    upd.refuse(j, j.getRefusalReason());
+                }
+
+                String user = j.getUserName();
+                User u = users.get(user);
+                if ( u == null ) {
+                    u = new User(user);
+                    users.put(user, u);
+                }
+                j.setUser(u);
+
+                // Calculate its share order
+                int share_order = calcShareOrder(j.getMemory());
+                j.setShareOrder(share_order);
+
+                // Assign it to its priority class
+                String clid = j.getClassName();
+                ResourceClass prclass = resourceClassesByName.get(clid);
+
+                u.addJob(j);
+                allJobs.put(j.getId(), j);
+                if ( prclass == null ) {                    
+                    upd.refuse(j, "Cannot find priority class " + clid + " for job");
+                    continue;
+                }
+
+                /**
+                 * We want to allow this - a normal job, submitted to a reservation class.
+                   if ( (prclass.getPolicy() == Policy.RESERVE ) && ( ! j.isReservation() ) ) {
+                   upd.refuse(j, "Reservaction class " + 
+                   prclass.getId() + " specified but work is not a reservation.");
+                   continue;
+                   }
+                */
+
+                if ( ((prclass.getPolicy() != Policy.RESERVE ) && (prclass.getPolicy() != Policy.FIXED_SHARE)) && ( j.isReservation() ) ) {
+                    upd.refuse(j, "Class " + prclass.getName() + " is policy " + 
+                               prclass.getPolicy() + " but the work is submitted as a reservation.");
+                    continue;
+                }
+
+                prclass.addJob(j);
+                j.setResourceClass(prclass);
+                logger.info(methodName, j.getId(), "submit", j.toString());
+            }
+
+            logger.info(methodName, null, "Scheduling " + newJobs.size(), " new jobs.  Existing jobs: " + allJobs.size());
+            scheduler.schedule(upd);
+
+            logger.info(methodName, null, "--------------- Scheduler returns ---------------");
+            logger.info(methodName, null, "\n", upd.toString());
+            logger.info(methodName, null, "------------------------------------------------");                
+            dispatch(upd, jmu);                 // my own job lists get updated by this
+
+            return jmu;
+        }
+    }
+
+    synchronized public void shutdown()
+    {
+        done = true;
+    }
+
+//     public void run()
+//     {
+//     	String methodName = "run";
+//         while ( ! done ) {
+//             try { sleep(epoch); } catch (InterruptedException e) { }
+
+//             logger.info(methodName, null, "========================== Epoch starts ===========================");
+//             try {
+//                 schedule();
+//             } catch ( SchedulingException e ) {
+//                 logger.info(methodName, e.jobid, e);
+//             }
+
+//             logger.info(methodName, null, "========================== Epoch ends   ===========================");
+//         }
+//     }
+
+    private int total_arrivals = 0;
+    public void nodeArrives(Node node)
+    {        
+    	// String methodName = "nodeArrives";
+        // The first block insures the node is in the scheduler's records as soon as possible
+
+        total_arrivals++;       // report these in the main schedule loop
+        synchronized(this) {
+            // the amount of memory available for shares, adjusted with configured overhead
+
+            Machine m = nodepool.getMachine(node);
+            int share_order = 0;
+
+            if ( m == null ) {
+                allNodes.put(node, node);
+                long allocatable_mem =  node.getNodeMetrics().getNodeMemory().getMemTotal() - share_free_dram;
+                if ( dramOverride > 0 ) {
+                    allocatable_mem = dramOverride;
+                }
+                share_order = (int) (allocatable_mem / share_quantum);           // conservative - rounds down (this will always cast ok)                
+            } else {
+                share_order = m.getShareOrder();
+            }
+            
+            m = nodepool.nodeArrives(node, share_order);                         // announce to the nodepools
+            // m.heartbeat_down();
+            // logger.info(methodName, null, "Node arrives:", m.getId());                                                              // make SURE it's reset ok
+        }
+
+        // The second block registers it in the heartbeat map
+//        synchronized(incomingNodes) {
+//            incomingNodes.put(node, node);
+//        }
+    }
+
+    public void nodeDeath(Map<Node, Node> nodes)
+    {
+        synchronized(deadNodes) {
+            deadNodes.putAll(nodes);
+        }
+    }
+
+    /**
+     * Callback from job manager, need shares for a new fair-share job.
+     */
+    public void signalNewWork(IRmJob job)
+    {
+        // We'll synchronize only on the incoming job list 
+        synchronized(incomingJobs) {            
+            incomingJobs.add(job);
+        }
+    }
+
+//     public void signalForceEpoch()
+//     {
+//         synchronized( force_epoch ) {
+//             force_epoch = true;
+//         }
+//     }
+
+    public void signalInitialized(IRmJob job)
+    {
+        // We'll synchronize only on the incoming job list 
+        synchronized(initializedJobs) {            
+            initializedJobs.add(job);
+        }
+    }
+
+    public void signalRecovery(IRmJob job)
+    {
+        synchronized(recoveredJobs) {
+            recoveredJobs.add(job);
+        }
+    }
+
+    public void jobCancelled(DuccId id)
+    {
+        // TODO Fill this in.
+    }
+
+    /**
+     * Callback from job manager when a job completes. We just believe him, no sanity checks or other such stuff.
+     */
+    public void signalCompletion(DuccId id)
+    {
+        String methodName = "signalCompletion";
+        synchronized(completedJobs) {
+            try {
+                IRmJob job = allJobs.get(id);
+                logger.info(methodName, id, "Job completion signal.");
+                completedJobs.add(job);
+            } catch (Exception e) {
+                logger.warn(methodName, id, e);
+            }
+        }
+    }
+
+    /**
+     * Callback from job manager when a specific share exits but the job is still alive.
+     */
+    public void signalCompletion(IRmJob job, Share share)
+    {
+        String methodName = "signalCompletion";
+        synchronized(vacatedShares) {
+            logger.info(methodName, job.getId(), "Job vacate signal share: ", share.toString());
+            vacatedShares.put(share.getId(), new Pair<IRmJob, Share>(job, share));
+        }            
+    }
+
+    /**
+     * Callback from job manager when a specific share gets a process associated.
+     */
+//    public void signalGrowth(DuccId jobid, Share share)
+//    {
+//        String methodName = "signalGrowth";
+//        synchronized(vacatedShares) {
+//            logger.info(methodName, jobid, "Job growth signal share: ", share.toString());
+//            growthOccurred = true;
+//        }            
+//    }
+
+    /**
+     * Called in scheduling cycle, to actually complete the job - avoids deadlock
+     */
+    private synchronized void processCompletion(IRmJob job)
+    {
+        String methodName = "processCompletion";
+        logger.info(methodName, job.getId(), "Job completes.");
+
+        // -- clean up the running jobs list
+        IRmJob j = allJobs.remove(job.getId());
+        if ( j == null ) {
+            logger.info(methodName, job.getId(), "Job is not in run list!");  // can happen if job is refused very early
+            return;
+        }
+
+        j.markComplete();
+
+        // -- clean up user list
+        User user = users.get(j.getUserName());
+        if ( user.remove(job) == 0 ) {
+            users.remove(user.getName());
+        }
+
+        ResourceClass rc = job.getResourceClass();
+        if ( rc != null ) {
+            rc.removeJob(j);            // also clears it if it's a reservation
+        } else if ( !j.isRefused() ) {
+            throw new SchedInternalError(j.getId(), "Job exits from class " + job.getClassName() + " but we cannot find the priority class definition.");
+        }
+
+
+        // -- clean up machine lists
+        HashMap<Share, Share> shares= job.getAssignedShares();        
+        for (Share s: shares.values()) {
+            purgeShare(s, job);
+        }
+        job.removeAllShares();
+    }
+
+    /**
+     * Called from scheduling cycle - a specific share has run out of work for the give job (but the
+     * job is not done yet).
+     */
+    private synchronized void processCompletion(IRmJob job, Share share)
+    {
+        String methodName = "processCompletion";
+        
+        logger.debug(methodName, job.getId(), "Job vacates share ", share.toString());
+        //share.removeJob();
+        job.removeShare(share);
+        purgeShare(share, job);
+    }
+
+    private synchronized void processRecovery(IRmJob j)
+    {
+    	String methodName = "processRecovery";
+
+        int share_order = calcShareOrder(j.getMemory());
+        j.setShareOrder(share_order);
+        j.setResourceClass(resourceClassesByName.get(j.getClassName()));
+        HashMap<Share, Share> shares = j.getRecoveredShares();
+        StringBuffer sharenames = new StringBuffer();
+        for ( Share s : shares.values() ) {
+            sharenames.append(s.toString());
+            sharenames.append(" ");
+
+            if ( !j.isReservation() ) {          // if it's a reservation, the share order is already set from
+                                                 // the machine.  nodepools can cause the actual order to differ
+                                                 // from the order derived from indicated memory.
+                s.setShareOrder(share_order);
+            }
+            Machine m = s.getMachine();
+            NodePool np = m.getNodepool();
+            np.connectShare(s, m, j, s.getShareOrder());
+
+            busyShares.put(s.getId(), s);
+        }
+        String username = j.getUserName();
+        User user = users.get(username);
+        if ( user == null ) {
+            user = new User(username);
+            users.put(username, user);
+            logger.info(methodName, j.getId(), "&&&&&&&&&&&&&&&& new user", user.toString(), "-------------------");
+        }
+        j.setUser(user);
+        user.addJob(j);
+
+    	j.promoteShares();                       // NOT expanded, just recovered, promote them right away
+        j.clearRecoveredShares();
+
+        String clid = j.getClassName();
+        ResourceClass prclass = resourceClassesByName.get(clid);
+        
+        allJobs.put(j.getId(), j);
+        prclass.addJob(j);
+        j.setResourceClass(prclass);
+        logger.info(methodName, j.getId(), "Recovered job:", j.toString());
+        logger.info(methodName, j.getId(), "Recovered shares:", sharenames.toString());
+    }
+
+    /**
+     * The share is gone, purge from our structures.
+     */
+    private void purgeShare(Share s, IRmJob j)
+    {
+        busyShares.remove(s.getId());         // so long, and thanks for all the fish
+        Machine m = s.getMachine();
+        m.removeShare(s);
+    }
+
+    private static DuccIdFactory idFactory = new DuccIdFactory();
+    public static DuccId newId()
+    {
+        return idFactory.next();
+    }
+
+    public void queryMachines()
+    {
+        nodepool.queryMachines();
+    }
+
+    class MachineByOrderSorter
+    	implements Comparator<Machine>
+    {	
+    	public int compare(Machine m1, Machine m2)
+        {
+            if (m1.getShareOrder() == m2.getShareOrder()) {
+                return (m1.getId().compareTo(m2.getId()));
+            }
+            return (int) (m1.getShareOrder() - m2.getShareOrder());
+        }
+    }
+
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingException.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingException.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingException.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingException.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import org.apache.uima.ducc.common.utils.id.DuccId;
+
+@SuppressWarnings("serial")
+public class SchedulingException
+    extends RuntimeException
+{
+    DuccId jobid;
+    
+    public SchedulingException(DuccId jobid, String msg)
+    {
+        super(msg);
+        
+        this.jobid = jobid;
+    }
+
+    public SchedulingException(DuccId jobid, String msg, Throwable t)
+    {
+        super(msg, t);
+        
+        this.jobid = jobid;
+    }
+    
+    public DuccId getJobId()
+    {
+    	return jobid;
+    }
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingUpdate.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingUpdate.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingUpdate.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingUpdate.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import java.util.HashMap;
+
+public class SchedulingUpdate
+{
+    private HashMap<IRmJob, IRmJob> shrunken;
+    private HashMap<IRmJob, IRmJob> expanded;
+    private HashMap<IRmJob, IRmJob> stable;
+    private HashMap<IRmJob, IRmJob> dormant;
+    private HashMap<IRmJob, IRmJob> reservations;
+    private HashMap<IRmJob, IRmJob> refusals;
+
+    public SchedulingUpdate()
+    {
+        shrunken     = new HashMap<IRmJob, IRmJob>();
+        expanded     = new HashMap<IRmJob, IRmJob>();
+        stable       = new HashMap<IRmJob, IRmJob>();
+        dormant      = new HashMap<IRmJob, IRmJob>();
+        reservations = new HashMap<IRmJob, IRmJob>();
+        refusals     = new HashMap<IRmJob, IRmJob>();
+    }
+
+//     public SchedulingUpdate(
+//                             ArrayList<Job> shrunken,
+//                             ArrayList<Job> expanded,
+//                             ArrayList<Job> leftovers)
+//     {
+//         this.shrunken = shrunken;
+//         this.expanded = expanded;
+//         this.leftovers = leftovers;
+//     }
+
+    void addShrunkenJob(IRmJob j)
+    {
+        shrunken.put(j, j);
+    }
+
+    void addExpandedJob(IRmJob j)
+    {
+        expanded.put(j, j);
+    }
+
+    void addDormantJob(IRmJob j)
+    {
+        dormant.put(j, j);
+    }
+
+    void addStableJob(IRmJob j)
+    {
+        stable.put(j, j);
+    }
+
+     void addReservation(IRmJob j)
+     {
+         reservations.put(j, j);
+     }
+
+    void refuse(IRmJob j, String reason)
+    {
+        j.refuse(reason);
+        refusals.put(j, j);
+    }
+
+    HashMap<IRmJob, IRmJob> getRefusedJobs() 
+    {
+        return refusals;
+    }
+
+    HashMap<IRmJob, IRmJob> getReservedJobs() 
+    {
+        return reservations;
+    }
+
+    HashMap<IRmJob, IRmJob> getShrunkenJobs() 
+    {
+        return shrunken;
+    }
+
+    HashMap<IRmJob, IRmJob> getExpandedJobs() 
+    {
+        return expanded;
+    }
+
+    HashMap<IRmJob, IRmJob> getStableJobs() 
+    {
+        return stable;
+    }
+
+    HashMap<IRmJob, IRmJob> getDormantJobs() 
+    {
+        return dormant;
+    }
+
+//     HashMap<IRmJob, IRmJob> getReservations() 
+//     {
+//         return reservations;
+//     }
+
+    public String toString()
+    {
+        StringBuffer sb = new StringBuffer();
+
+        sb.append("Expanded:\n");
+        if ( expanded.size() == 0 ) {
+            sb.append("<none>\n");
+        } else {
+            sb.append("   ");
+            sb.append(RmJob.getHeader());
+            sb.append("\n");
+            for (IRmJob j : expanded.values()) {
+                sb.append("   ");
+                sb.append(j.toString());
+                sb.append("\n");
+            }
+        }
+
+        sb.append("\nShrunken:\n");
+        if ( shrunken.size() == 0 ) {
+            sb.append("   <none>\n");
+        } else {
+            sb.append("   ");
+            sb.append(RmJob.getHeader());
+            sb.append("\n");
+            for (IRmJob j : shrunken.values()) {
+                sb.append("   ");
+                sb.append(j.toString());
+                sb.append("\n");
+            }
+        }
+
+        sb.append("\nStable:\n");
+        if ( stable.size() == 0 ) {
+            sb.append("   <none>\n");
+        } else {
+            sb.append("   ");
+            sb.append(RmJob.getHeader());
+            sb.append("\n");
+            for (IRmJob j : stable.values()) {
+                sb.append("   ");
+                sb.append(j.toString());
+                sb.append("\n");
+            }
+        }
+
+        sb.append("\nDormant:\n");
+        if ( dormant.size() == 0 ) {
+            sb.append("   <none>\n");
+        } else {
+            sb.append("   ");
+            sb.append(RmJob.getHeader());
+            sb.append("\n");
+            for (IRmJob j : dormant.values()) {
+                sb.append("   ");
+                sb.append(j.toString());
+                sb.append("\n");
+            }
+        }
+
+        sb.append("\nReserved:\n");
+        if ( reservations.size() == 0 ) {
+            sb.append("   <none>\n");
+        } else {
+            sb.append("   ");
+            sb.append(RmJob.getHeader());
+            sb.append("\n");
+            for (IRmJob j : reservations.values()) {
+                sb.append("   ");
+                sb.append(j.toString());
+                sb.append("\n");
+            }
+        }
+
+        sb.append("\n");
+
+        return sb.toString();
+    }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingUpdate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ServerException.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ServerException.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ServerException.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ServerException.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+@SuppressWarnings("serial")
+public class ServerException
+    extends RuntimeException
+{
+    public ServerException()
+    {
+        super();
+    }
+
+    public ServerException(String msg)
+    {
+        super(msg);
+    }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ServerException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.ITimeWindow;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+
+/**
+ * This may more correctly thought of as representing a Process.
+ *
+ * A share is ALWAYS associated with a Machine.
+ */
+public class Share
+	implements SchedConstants
+{
+    //private transient DuccLogger logger = DuccLogger.getLogger(Share.class, COMPONENT_NAME);
+    private Machine machine;               // machine associatede with this share, assigned after "what-of"
+    private DuccId id;                     // unique *within this machine*         assigned after "what-of"
+    private IRmJob job;                    // job executing in this share, if any, assigned after "what-of"
+    private int share_order;               // may not be same as machine's order
+
+    private ITimeWindow init_time = null;  // how much time this process spends initializing
+    @SuppressWarnings("unused")
+	private ITimeWindow run_time = null;   // how much time this process spends initializing
+
+
+    // private HashMap<Integer, Long> activeQuestions = new HashMap<Integer, Long>();
+
+    private boolean evicted = false;      // during preemption, remember this share has already been removed
+                                          // (multiple classes might try to evict, this prevents multiple eviction)
+    private boolean purged = false;       // share is forcibly removed because it's machine died
+    private boolean fixed = false;        // if true, can't be preempted
+
+     @SuppressWarnings("unused")
+ 	private long resident_memory = 0;
+     @SuppressWarnings("unused")
+ 	private ProcessState state = ProcessState.Undefined;
+     @SuppressWarnings("unused")
+ 	private String pid = "<none>";
+
+
+    /**
+     * This constructor is used during recovery ONLY.
+     */
+    public Share(DuccId id, Machine machine, IRmJob job, int share_order)
+    {
+        this.id = id;
+        this.machine = machine;
+        this.job = job;
+        this.share_order = share_order;
+    }
+
+    /**
+     * Normal constructor.
+     */
+    public Share(Machine machine, IRmJob job, int share_order)
+    {
+        this.machine = machine;
+        this.id = Scheduler.newId();
+        this.job = job;
+        this.share_order = share_order;
+    }
+
+//     /**
+//      * Simulation only.
+//      */
+//     void questionStarted(WorkItem q)
+//     {
+//         if ( activeQuestions.containsKey(q.getId()) ) {
+//             throw new SchedulingException(job.getId(), "Share.questionStarted: work item " + q.getId() + " already running in share " + toString());
+//         } 
+        
+//         activeQuestions.put(q.getId(), System.currentTimeMillis());
+//     }
+
+//     /**
+//      * Simulation only.
+//      */
+//     void questionComplete(WorkItem q)
+//     {
+//     	String methodName = "questionComplete";
+//         if ( !activeQuestions.containsKey(q.getId()) ) {
+//             throw new SchedulingException(job.getId(), "Share.questionComplete: work item " + q.getId() + " not found in share " + toString());
+//         }
+
+//         logger.info(methodName, job.getId(), q.toString(), " completes in ", 
+//                    ""+(System.currentTimeMillis() - activeQuestions.get(q.getId())), " milliseconds in share ", toString());
+//         activeQuestions.remove(q.getId());
+//     }
+
+    public NodePool getNodepool()
+    {
+        return machine.getNodepool();
+    }
+
+    public int getNodepoolDepth()
+    {
+        return getNodepool().getDepth();
+    }
+
+    public String getNodepoolId()
+    {
+        return getNodepool().getId();
+    }
+
+    public IRmJob getJob()
+    {
+        return job;
+    }
+
+    public DuccId getId()
+    {
+        return id;
+    }
+
+    Machine getMachine()
+    {
+        return machine;
+    }
+
+    boolean isPending()
+    {
+        return job.isPendingShare(this);
+    }
+
+    /**
+     * Defrag - this (pending) share is given to a different job before OR learns about it.
+     */
+    void reassignJob(IRmJob job)
+    {
+        this.job = job;
+    }
+
+    public NodeIdentity getNodeIdentity()
+    {
+        return machine.getNodeIdentity();
+    }
+
+    /**
+     * The order of the share itself.
+     */
+    public int getShareOrder()
+    {
+        return share_order;
+    }
+
+    /**
+     * Update the share order, used during recovery.
+     */
+    void setShareOrder(int o)
+    {
+        this.share_order = o;
+    }
+
+    /**
+     * The size of the machine the share resides on.
+     */
+    int getMachineOrder()
+    {
+        return machine.getShareOrder();
+    }
+
+    /**
+     * It's forceable if it's not a permanent share and it's not already evicted for some reason.
+     */
+    boolean isForceable()
+    {
+        if ( evicted ) return false;
+        if ( purged )  return false;
+        if ( fixed )   return false;
+        return true;
+    }
+
+    /**
+     * It's preemptable if:
+     *   - it's not yet preempted
+     *   - it belongs to a job that has a "loser" count > 0
+     *   - it's fair-share share
+     */
+    boolean isPreemptable()
+    {
+        return ( isForceable() && (( job.countNShares() - job.countNSharesGiven()) > 0 ));
+        // This last works because if the job has preempted shares they count against the countNShares count
+        // so we don't end up counting this job more than it deserves.
+    }
+
+    public boolean update(DuccId jobid, long mem, ProcessState state, ITimeWindow init_time, ITimeWindow run_time, String pid)
+    {
+        if ( ! jobid.equals(job.getId()) ) return false;      // something has gone horribly wrong
+        
+        this.resident_memory = mem;
+        this.state = state;
+        this.pid = pid;
+        this.init_time = init_time;
+        this.run_time = run_time;
+        return true;
+    }
+
+//    /**
+//     * Calculate the "investment" this share has in the questions it's running.
+//     * Currently, this will be the longest time any of the current questions has been running.
+//     *
+//     * We have to separete this from "getting" so we can freeze the timestamp - in a sort the
+//     * getter could be called multiple times, and get inconsistent results.
+//     */
+//    synchronized void calculateInvestment()
+//    {    
+//    	investment = 0;
+//        long now = System.currentTimeMillis();
+//        for ( Long elapsed : activeQuestions.values() ) {
+//            investment = Math.max(investment, (now - elapsed));
+//        }
+//    }
+
+    /**
+     * Caller must always call calculateInvestment before retrieving the investment, or it is
+     * likely to be wrong.
+     *
+     * Investment is a combination of initialization time and execution time.  We always
+     * want shares that aren't yet initialized to sort LOWER than shares that are initialized.
+     *
+     * For now we only take into account initialzation time.
+     *
+     * NOTE: We no longer set investment directly.  Instead each state update refreshes the
+     *      init_time and run_time structures and we calculate investment from that.  This
+     *      affects the RM simulator, which will need updating if we decide to revive it.     
+     */
+    long getInvestment()
+    {
+        if ( init_time == null ) return 0;
+        return init_time.getElapsedMillis();
+    }
+
+    /**
+     * Returns only initialization time.  Eventually getInvestment() may take other things into
+     * consideration so we separate these two (even though currently they do the same thing.)
+     */
+    long getInitializationTime()
+    {
+        if ( init_time == null ) return 0;
+        return init_time.getElapsedMillis();        
+    }
+
+    boolean isInitialized()
+    {
+        if ( init_time == null ) return false;
+
+        return (init_time.getEnd() != null);
+    }
+
+    public void setFixed()
+    {
+        fixed = true;
+    }
+
+    boolean isFixed()
+    {
+        return fixed;
+    }
+
+    void evict()
+    {
+        evicted = true;
+    }
+
+    boolean isEvicted()
+    {
+        return evicted || purged;
+    }
+
+    void purge()
+    {
+        purged = true;
+    }
+
+    public boolean isPurged()
+    {
+        return purged;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return id.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if ( o == null ) return false;
+        if ( this == o ) return true;
+        if ( this.getClass() != o.getClass() ) return false;
+
+        Share s = (Share) o;
+        //return (id.equals(m.getId()));
+    	return this.id.equals(s.getId());
+    }
+
+    public String toString()
+    {
+        return machine.getId() + "." + getId();
+    }
+
+}
+            

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import java.util.Comparator;
+import java.util.HashMap;
+
+public class User
+    implements IEntity
+{
+    private String id;
+    private HashMap<IRmJob, IRmJob> jobs = new HashMap<IRmJob, IRmJob>();    // my jobs
+    private HashMap<ResourceClass, HashMap<IRmJob, IRmJob>> jobsByClass = new HashMap<ResourceClass, HashMap<IRmJob, IRmJob>>();
+
+    private HashMap<Integer, HashMap<IRmJob, IRmJob>> jobsByOrder = new HashMap<Integer, HashMap<IRmJob, IRmJob>>();
+    private int user_shares;       // number of shares to apportion to jobs in this user in current epoch
+    private int pure_fair_share;   // uncapped un-bonused counts
+    private int share_wealth;      // defrag, how many relevent Q shares do i really have?
+    private int[] given_by_order =  null;
+    private int[] wanted_by_order = null;
+
+    private static Comparator<IEntity> apportionmentSorter = new ApportionmentSorterCl();
+    public User(String name)
+    {
+        this.id = name;
+    }
+
+    public long getTimestamp()
+    {
+        return 0;
+    }
+
+    void addJob(IRmJob j)
+    {
+        jobs.put(j, j);
+        int order = j.getShareOrder();
+        
+        HashMap<IRmJob, IRmJob> ojobs = jobsByOrder.get(order);
+        if ( ! jobsByOrder.containsKey(order) ) {
+            ojobs = new HashMap<IRmJob, IRmJob>();
+            jobsByOrder.put(order, ojobs);
+        }
+        ojobs.put(j, j);
+
+        ResourceClass cl = j.getResourceClass();
+        ojobs = jobsByClass.get(cl);
+        if ( ojobs == null ) {
+            ojobs = new HashMap<IRmJob, IRmJob>();
+            jobsByClass.put(cl, ojobs);
+        }
+        ojobs.put(j, j);
+    }
+
+    /**
+     * Remove a job from the list and return how many jobs remain.
+     */
+    int remove(IRmJob j)
+    {
+        if ( jobs.containsKey(j) ) {
+            jobs.remove(j);
+
+            int order = j.getShareOrder();
+            HashMap<IRmJob, IRmJob> ojobs = jobsByOrder.get(order);
+            ojobs.remove(j);
+            
+            ResourceClass cl = j.getResourceClass();
+            if ( jobsByClass.containsKey(cl) ) {                // if not it's likely an early refusal
+                ojobs = jobsByClass.get(cl);
+                ojobs.remove(j);
+            }       
+        } else {
+            throw new SchedulingException(j.getId(), "User " + id + " is asked to remove job " + j.getId() + " but the job is not assigned.");
+        }
+        return jobs.size();
+    }
+
+    /**
+     * Currently, all users are equal.
+     */
+    public int getShareWeight()
+    {
+        return 1;
+    }
+
+    /**
+     * Returns total N-shares wanted by order for a given class. Processes of size order.
+     */
+    private int countNSharesWanted(int order, ResourceClass rc)
+    {
+        int K = 0;
+        
+        // First sum the max shares all my jobs can actually use
+        HashMap<IRmJob, IRmJob> jobs = jobsByOrder.get(order);
+        if ( jobs == null ) {
+            return 0;
+        }
+
+        String rcname = rc.getName();
+        for ( IRmJob j : jobs.values() ) {
+            if ( j.getResourceClass().getName().equals(rcname) ) {
+                K += j.getJobCap();
+            }
+        }
+
+        return K;
+    }
+
+    public void initWantedByOrder(ResourceClass rc)
+    {
+    	wanted_by_order = NodePool.makeArray();
+        for ( int o = NodePool.getMaxOrder(); o > 0; o-- ) {
+            wanted_by_order[o] = countNSharesWanted(o, rc);
+            wanted_by_order[0] +=  wanted_by_order[o];
+        }
+    }
+
+    public void setPureFairShare(int pfs)
+    {
+        this.pure_fair_share = pfs;
+    }
+
+    public int getPureFairShare()
+    {
+        return pure_fair_share;
+    }
+
+    public int[] getWantedByOrder()
+    {
+        return wanted_by_order;
+    }
+
+    public void setGivenByOrder(int[] gbo)
+    {
+        this.given_by_order = gbo;
+    }
+
+    public int[] getGivenByOrder()
+    {
+        return given_by_order;
+    }
+
+    public void setShareWealth(int w)
+    {
+        this.share_wealth = w; // qshares
+    }
+
+    public int getShareWealth()
+    {
+        return share_wealth;  // qshares
+    }
+
+    public void subtractWealth(int w)
+    {
+        share_wealth -= w;
+    }
+
+    public int calculateCap(int order, int total)
+    {
+        return Integer.MAX_VALUE;  // no cap for users
+    }
+
+    HashMap<IRmJob, IRmJob> getJobs()
+    {
+        return jobs;
+    }
+
+    HashMap<IRmJob, IRmJob> getJobsOfOrder(int order)
+    {
+        return jobsByOrder.get(order);
+    }
+
+    HashMap<Integer, HashMap<IRmJob, IRmJob>> getJobsByOrder()
+    {
+        return jobsByOrder;
+    }
+
+    HashMap<String, Machine> getMachines()
+    {
+        // TODO: fill this in - walk the jobs and return the hash
+        System.out.println("Warning: getMachines() is not implemented and is returning null");
+        return null;
+    }
+
+    public int countJobs()
+    {
+        return jobs.size();
+    }
+
+    public int countJobs(int o)
+    {
+        if ( jobsByOrder.containsKey(o) ) {
+            return jobsByOrder.get(o).size();
+        }
+        return 0;
+    }
+
+    public void clearShares()
+    {
+        user_shares = 0;
+        //System.out.println("**** user " + getId() + "/" + uniqueId + " clearing shares");
+        //sharesByOrder.clear();
+    }
+
+    public void addQShares(int s)
+    {
+        user_shares += s;
+        //System.out.println("***** user " + getId() + "/" + uniqueId + " shares are " + s);
+    }
+
+    /**
+     * Try to find the smallest bonus shares we can use.
+     */
+    public int canUseBonus(int bonus, int[] tmpSharesByOrder)
+    {
+        for ( int i = 1; i <= Math.min(bonus, tmpSharesByOrder.length); i++ ) {
+            
+            if ( jobsByOrder.containsKey(i) && (tmpSharesByOrder[i] > 0) ) {
+                return i;
+            }
+        }
+        return 0;
+    }
+
+    public int countQShares(String x)
+    {
+        //System.out.println(x + " **** user " + getId() + "/" + uniqueId + " returning " + user_shares + " shares");
+        return this.user_shares;
+    }
+
+
+    int countCappedQShares(int physicalCap, int order)
+    {
+        int K = 0;
+        physicalCap = physicalCap * order;                         // to quantum shares
+        HashMap<IRmJob, IRmJob> jobs = jobsByOrder.get(order);
+
+        if ( jobs == null ) {
+        	return 0;
+        }
+        
+        for ( IRmJob j : jobs.values() ) {
+            K += (Math.min(j.getJobCap(), physicalCap));
+        }
+
+        return Math.min(K, physicalCap) * order;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return id.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if ( o == null ) return false;
+        if ( this == o ) return true;
+        if ( this.getClass() != o.getClass() ) return false;
+
+        User u = (User) o;
+    	return this.id.equals(u.getName());
+    }
+
+    /**
+    public String getId()
+    {
+        return id;
+    }
+    */
+
+    public String getName()
+    {
+        return id;
+    }
+
+    public String toString()
+    {
+        return id;
+    }
+
+    public Comparator<IEntity> getApportionmentSorter()
+    {
+        return apportionmentSorter;
+    }
+
+    static private class ApportionmentSorterCl
+        implements Comparator<IEntity>
+    {
+        public int compare(IEntity e1, IEntity e2)
+        {
+            if ( e1 == e2 ) return 0;
+            return e1.getName().compareTo(e2.getName());
+        }
+    }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/WorkItem.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/WorkItem.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/WorkItem.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/WorkItem.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+public class WorkItem
+{
+    private int id;
+    //private int cputime;
+    private int walltime;
+
+    public WorkItem(int id, int walltime)
+    {
+        // not checking these because they come from generated code that we assume is correct
+        this.id = id;
+        this.walltime = walltime;
+    }
+
+    int getId()
+    {
+        return id;
+    }
+
+    String getStringId()
+    {
+        // convenience, for logging
+        return Integer.toString(id);
+    }
+
+    int getWalltime()
+    {
+    	return walltime;
+    }
+    
+    public String toString()
+    {
+        return "Qid " + id + " " + walltime;
+    }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/WorkItem.java
------------------------------------------------------------------------------
    svn:eol-style = native