You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:43:38 UTC

svn commit: r1427967 [4/5] - in /uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src: main/ main/java/ main/java/org/ main/java/org/apache/ main/java/org/apache/uima/ main/java/org/apache/uima/ducc/ main/java/org/apache/uima/ducc/rm/ main/java/org/apache/uim...

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/README
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/README?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/README (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/README Wed Jan  2 19:43:37 2013
@@ -0,0 +1,57 @@
+Some of the files here are used to implement the DUCC model for developing and
+debugging the scheduler.
+
+------------------------------------------------------------------------
+These are the scheduler codes:
+
+   IScheduler - interface defining the code that makes the tough decisions. The intent
+                is to be able to implement different schedulers that just plug in to
+                IScheduleMain.
+
+   IScheduleMain - interface defining the scheduling "daemon" that interfaces
+                   between the world and IScheduler.
+
+   Scheduler - implementation of IScheduleMain
+
+   SimpleFairShare - implementation of IScheduler providing a simple fair-share
+                     with single prioiry scheduler.
+
+   Share - repressents one share.  The IScheduleMain will look at the resources
+           and create Shares from them.
+
+   Job - represents one job.
+
+   Machine - represents one machine.
+
+   WorkItem - represents one work item. (Probably not needed for pure scheduling
+              purposes, essential for modeling).
+
+   SchedulingException - grab-all for anything that goes wrong in the scheduling code.
+                         Presents a RuntimeException (no crash)
+
+   User - represents one user
+
+------------------------------------------------------------------------
+These are purely for simulation.  I may refactor JobManager through an interface so I can
+    have the modeled JobManager and the "real" one, transparently.  There's not much code but
+    there's a bit of tricky threading and timer management here.
+
+    JobManager - scheduler interacts with this for ALL job actions, and only JobManager gets
+                 to interact with scheduler.  Simulates / models the real Job Manager.
+
+    JobHandler - Models the UIMA/AS client for a job - feeds questions to the Agents and
+                 fields returned state messages.  Interacts with JobManager and Agent.
+
+    Agent      - Models the node agent - dispatches questions.
+
+    ExecutionTask - this is a simple TimerTask that simulates execution of a task by just
+                    sleeping for an appropriate amount of time.
+
+------------------------------------------------------------------------
+Other utility codes:
+   Main - what you'd expect from main - starts things running and waits for it to die.
+          It also supports a trivial command line through stdin that simulates the
+          DUCC / Blade CLI.
+
+   Logger - simple logger that sort of looks like Log4J until we decide on something else.
+    

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/README
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,827 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+
+import org.apache.uima.ducc.common.utils.DuccProperties;
+import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
+
+
+/**
+ * This represents a priority class.
+ */
+public class ResourceClass
+    implements SchedConstants,
+               IEntity
+{
+    //private DuccLogger logger = DuccLogger.getLogger(this.getClass(), COMPONENT_NAME);
+
+    private String id;
+    private Policy policy;
+    private int priority;           // orders evaluation of the class
+
+
+    private int share_weight;       // for fair-share, the share weight to use
+    private int min_shares;         // fixed-shre: min shares to hand out
+    private int max_processes;      // fixed-share: max shares to hand out regardless of
+                                    // what is requested or what fair-share turns out to be
+
+    private int max_machines;       // reservation: max machines that can be reserved by a single user - global across
+                            // all this user's requests.
+
+    // for reservation, this caps machines. 
+    // for shares, this caps shares
+    private int absolute_cap;       // max shares or machines this class can hand out
+    private double percent_cap;     // max shares or machines this class can hand out as a percentage of all shares
+    private int true_cap;           // set during scheduling, based on actual current resource availability
+    private int pure_fair_share;    // the unmodified fair share, not counting caps, and not adding in bonuses
+
+    private HashMap<IRmJob, IRmJob> allJobs = new HashMap<IRmJob, IRmJob>();
+    private HashMap<Integer, HashMap<IRmJob, IRmJob>> jobsByOrder = new HashMap<Integer, HashMap<IRmJob, IRmJob>>();
+    private HashMap<User, HashMap<IRmJob, IRmJob>> jobsByUser = new HashMap<User, HashMap<IRmJob, IRmJob>>();
+    private int max_job_order = 0;  // largest order of any job still alive in this rc (not necessarily globally though)
+
+    // private HashMap<Integer, Integer> nSharesByOrder = new HashMap<Integer, Integer>();         // order, N shares of that order
+    private boolean subpool_counted = false;
+
+    // the physical presence of nodes in the pool is somewhat dynamic - we'll store names only, and generate
+    // a map of machines on demand by the schedler from currently present machnes
+    private String nodepoolName = null;
+
+//     ArrayList<String> nodepool = new ArrayList<String>();                               // nodepool names only
+//     HashMap<String, Machine> machinesByName = new HashMap<String, Machine>();
+//     HashMap<String, Machine> machinesByIp = new HashMap<String, Machine>();
+
+    // Whether to enforce memory constraints for matching reservations
+    private boolean enforce_memory = true;
+
+    // int class_shares;       // number of shares to apportion to jobs in this class in current epoch
+
+    private boolean expand_by_doubling = true;
+    private int initialization_cap = 2;
+    private int prediction_fudge = 10000;
+    private boolean use_prediction = true;
+
+    private int[] given_by_order  = null;
+    private int[] wanted_by_order = null;               // volatile - changes during countClassesByOrder
+
+    private static Comparator<IEntity> apportionmentSorter = new ApportionmentSorterCl();
+
+    public ResourceClass(String id) //, HashMap<String, Machine> machinesByName, HashMap<String, Machine> machinesByIp)
+    {
+        this.id = id;
+        this.policy = Policy.FAIR_SHARE;
+        this.share_weight = 100;
+        this.priority = 10;
+        this.max_processes = Integer.MAX_VALUE;
+        this.min_shares = 0;
+
+        this.max_machines = Integer.MAX_VALUE;
+
+        this.absolute_cap = Integer.MAX_VALUE;
+        this.percent_cap  = 1.0;
+
+        this.expand_by_doubling  = SystemPropertyResolver.getBooleanProperty("ducc.rm.expand.by.doubling", true);
+        this.initialization_cap  = SystemPropertyResolver.getIntProperty("ducc.rm.initialization.cap", 2);
+
+        this.use_prediction     = SystemPropertyResolver.getBooleanProperty("ducc.rm.prediction", true);
+        this.prediction_fudge   = SystemPropertyResolver.getIntProperty("ducc.rm.prediction.fudge", 10000);   // extra fudge factor to add in to the
+                                                                                           // projection, in miliseconds.
+
+        //this.machinesByName = machinesByName;
+        //this.machinesByIp = machinesByIp;
+    }
+
+    // TODO: sanity check 
+    //   - emit warnings if shares are specified in reservations
+    //                   if machines are sprcified for fair or fixed-share
+    //                   etc.
+    void init(DuccProperties props)
+    {
+    	//String methodName = "init";
+    	String k = "scheduling.class." + id + ".";
+        String s;
+        s = props.getProperty(k + "policy");
+        
+        if ( s == null ) {
+        	throw new SchedulingException(null, "Configuration problem: no policy for class " + id + ".");
+        }
+        policy = Policy.valueOf(s);
+
+        share_weight     = props.getIntProperty(k + "share_weight"   , DEFAULT_SHARE_WEIGHT);
+        priority  = props.getIntProperty(k + "priority", DEFAULT_PRIORITY);
+        min_shares = props.getIntProperty(k + "min_shares", 0);       // default no min
+
+        switch ( policy ) {
+            case FAIR_SHARE:
+                max_processes = props.getIntProperty(k + "max_processes", DEFAULT_MAX_PROCESSES);       // default no max
+                max_machines = 0;
+                break;
+
+            case FIXED_SHARE:
+                max_processes = props.getIntProperty(k + "max_processes", DEFAULT_MAX_PROCESSES);       // default no max
+                max_machines = 0;
+                break;
+
+            case RESERVE:
+                max_processes = 0;
+                max_machines = props.getIntProperty(k + "max_machines", DEFAULT_MAX_INSTANCES);       // default max 1
+                break;
+
+        }
+        if ( max_processes <= 0 ) max_processes = Integer.MAX_VALUE;
+        if ( max_machines <= 0 )  max_machines  = Integer.MAX_VALUE;
+
+        enforce_memory = props.getBooleanProperty(k + "enforce.memory", true);
+        
+        initialization_cap = props.getIntProperty(k + "initialization.cap", initialization_cap);
+        expand_by_doubling = props.getBooleanProperty(k + "expand.by.doubling", expand_by_doubling);
+        use_prediction     = props.getBooleanProperty(k + "prediction", use_prediction);
+        prediction_fudge   = props.getIntProperty(k + "prediction.fudge", prediction_fudge);
+
+        s = props.getStringProperty(k + "cap", ""+Integer.MAX_VALUE);                // default no cap
+        if ( s.endsWith("%") ) {
+            int t = Integer.parseInt(s.substring(0, s.length()-1));
+            percent_cap = (t * 1.0 ) / 100.0;
+        } else {
+            absolute_cap = Integer.parseInt(s);
+            if (absolute_cap == 0) absolute_cap = Integer.MAX_VALUE;
+        }
+
+        nodepoolName = props.getProperty(k + "nodepool");                               // optional nodepool
+        if (nodepoolName == null) {
+            nodepoolName = NodePool.globalName;
+        }
+    }
+
+    public long getTimestamp()
+    {
+        return 0;
+    }
+
+    String getNodepoolName()
+    {
+        return nodepoolName;
+    }
+
+    public void setPureFairShare(int pfs)
+    {
+        this.pure_fair_share = pfs;
+    }
+
+    public int getPureFairShare()
+    {
+        return pure_fair_share;
+    }
+
+    public boolean isExpandByDoubling()
+    {
+        return expand_by_doubling;
+    }
+
+    public void setExpandByDoubling(boolean ebd)
+    {
+        this.expand_by_doubling = ebd;
+    }
+
+    public int getInitializationCap()
+    {
+        return initialization_cap;
+    }
+
+    public void setInitializationCap(int c)
+    {
+        this.initialization_cap = c;
+    }
+
+    public boolean isUsePrediction()
+    {
+        return use_prediction;
+    }
+
+    public int getPredictionFudge()
+    {
+        return prediction_fudge;
+    }
+
+    public boolean enforceMemory()
+    {
+        return enforce_memory;
+    }
+
+    public Policy getPolicy()
+    {
+        return policy;
+    }
+
+    public void setTrueCap(int cap)
+    {
+        this.true_cap = cap;
+    }
+
+    public int getTrueCap()
+    {
+        return true_cap;
+    }
+
+    public double getPercentCap() {
+        return percent_cap;
+    }
+
+    public int getAbsoluteCap() {
+        return absolute_cap;
+    }
+        
+    public int getMaxProcesses() {
+        return max_processes;
+    }
+
+    public int getMinShares() {
+        return min_shares;
+    }
+
+    public int getMaxMachines() {
+        return max_machines;
+    }
+    
+    void setPolicy(Policy p)
+    {
+        this.policy = p;
+    }
+
+    /**
+    public String getId()
+    {
+        return id;
+    }
+*/
+ 
+    public String getName()
+    {
+        return id;
+    }
+
+    public int getShareWeight()
+    {
+        return share_weight;
+    }
+
+    /**
+     * Return my share weight, if I have any jobs of the given order or less.  If not,
+     * return 0;
+     */
+    public int getEffectiveWeight(int order)
+    {
+        for ( int o = order; o > 0; o-- ) {
+            if ( jobsByOrder.containsKey(o) && ( jobsByOrder.get(o).size() > 0) ) {
+                return share_weight;
+            }
+        }
+        return 0;
+    }
+
+//     /** @deprecated */
+//     public void setClassShares(int s)
+//     {
+//         this.class_shares = Math.min(s, class_shares);
+//     }
+
+    /**
+     * Add 's' ** quantum ** shares of the indicated order.
+     * Return the actual number of shares, which might have been capped.
+     * @deprecated  
+
+    public int setClassSharesByOrder(int order, int s)
+    {
+        int val = 0;
+        s /= order;                                              // convert to nShares
+        if ( nSharesByOrder.containsKey(order ) ) {
+            val = Math.min(s, nSharesByOrder.get(order));        // not first time, must use min
+        } else {
+            val = s;                                             // first time, just accept it
+        }
+        nSharesByOrder.put(order, val);
+        return val * order;
+    }
+     */
+
+    /**
+     * Add one ** virtual ** share of the given order.
+
+    public void addClassShare(int order)
+    {
+        nSharesByOrder.put(order, nSharesByOrder.get(order) + 1);
+    }
+     */
+    /**
+    public int canUseShares(NodePool np, int[] tmpSharesByOrder)
+    {
+        if ( !np.getId().equals(nodepoolName) ) return 0;             // can't use any from somebody else's nodepool
+
+        for ( int o = max_job_order; o > 0; o-- ) {
+
+            if ( !nSharesByOrder.containsKey(o) ) continue;
+
+            int given = nSharesByOrder.get(o);
+            int can_use = 0;
+
+            if ( tmpSharesByOrder[o] > 0 ) {                          // do we have any this size?
+                HashMap<IRmJob, IRmJob> jbo = jobsByOrder.get(o);     // yeah, see if any job wants it
+                if ( jbo != null ) {
+                    for (IRmJob j : jbo.values()) {                        
+                        can_use += j.getJobCap();
+                    }
+                }
+            }
+            if ( (can_use - given) > 0 ) {
+                return o;
+            }
+        }
+        return 0;
+    }
+    */
+
+    void updateNodepool(NodePool np)
+    {
+        //String methodName = "updateNodepool";
+
+//         for ( int k : nSharesByOrder.keySet() ) {
+//             np.countOutNSharesByOrder(k, nSharesByOrder.get(k));
+//         }
+
+        if ( given_by_order == null ) return;       // nothing given, nothing to adjust
+
+        for ( int o = NodePool.getMaxOrder(); o > 0; o-- ) {
+            np.countOutNSharesByOrder(o, given_by_order[o]);
+        }
+    }
+    
+    public int getPriority()
+    {
+    	return priority;
+    }
+    
+    public void clearShares()
+    {
+        //class_shares = 0;
+        given_by_order = null;
+        subpool_counted = false;
+    }
+    
+    public void markSubpoolCounted()
+    {
+        subpool_counted = true;
+    }
+
+    void addJob(IRmJob j)
+    {
+        allJobs.put(j, j);
+
+        int order = j.getShareOrder();
+        HashMap<IRmJob, IRmJob> jbo = jobsByOrder.get(order);
+        if ( jbo == null ) {
+            jbo = new HashMap<IRmJob, IRmJob>();
+            jobsByOrder.put(order, jbo);
+            max_job_order = Math.max(max_job_order, order);
+        }
+        jbo.put(j, j);
+
+        User u = j.getUser();
+        jbo = jobsByUser.get(u);
+        if ( jbo == null ) {
+            jbo = new HashMap<IRmJob, IRmJob>();
+            jobsByUser.put(u, jbo);
+        }
+        jbo.put(j, j);
+
+    }
+
+    void removeJob(IRmJob j)
+    {
+        if ( ! allJobs.containsKey(j) ) {
+            throw new SchedulingException(j.getId(), "Priority class " + getName() + " cannot find job to remove.");
+        }
+
+        allJobs.remove(j);
+
+        int order = j.getShareOrder();
+        HashMap<IRmJob, IRmJob> jbo = jobsByOrder.get(order);
+        jbo.remove(j);
+        if ( jbo.size() == 0 ) {
+            jobsByOrder.remove(order);
+
+            for ( int o = order - 1; o > 0; o-- ) {
+                if ( jobsByOrder.containsKey(o) ) {
+                    max_job_order = o;
+                    break;
+                }
+            }
+        }
+
+//         if ( jbo.size() == 0 ) {
+//             jobsByOrder.remove(order);
+//         }
+
+        User u = j.getUser();
+        jbo = jobsByUser.get(u);
+        jbo.remove(j);
+        if ( jbo.size() == 0 ) {
+            jobsByUser.remove(u);
+        }
+    }
+
+    int countJobs()
+    {
+        return allJobs.size();
+    }
+
+    /**
+    int countJobs(int order)
+    {
+        if ( jobsByOrder.containsKey(order) ) {
+            return jobsByOrder.get(order).size();
+        } else {
+            return 0;
+        }
+    }
+*/
+    /**
+    HashMap<IRmJob, IRmJob> getJobsOfOrder(int order)
+    {
+        return jobsByOrder.get(order);
+    }
+*/
+
+  /**
+    int sumAllWeights()
+    {
+        return allJobs.size() * share_weight;
+    }
+*/
+    /**
+     * The total weights of all jobs of order 'order' or less.
+     *
+    int sumAllWeights(int order)
+    {
+        int sum = 0;
+        for ( int o = order; o > 0; o-- ) {
+            if ( jobsByOrder.containsKey(o) ){ 
+                sum++;
+            }
+        }
+        return sum * share_weight;
+    }
+*/
+    /**
+     * Returns total N-shares wanted by order. Processes of size order.
+     */
+    private int countNSharesWanted(int order)
+    {
+        int K = 0;
+        
+        // First sum the max shares all my jobs can actually use
+        HashMap<IRmJob, IRmJob> jobs = jobsByOrder.get(order);
+        if ( jobs == null ) {
+            return 0;
+        }
+
+        for ( IRmJob j : jobs.values() ) {
+            K += j.getJobCap();
+        }
+
+        return K;
+    }
+
+    public void initWantedByOrder(ResourceClass unused)
+    {
+        int ord = NodePool.getMaxOrder();
+        wanted_by_order = NodePool.makeArray();
+        for ( int o = ord; o > 0; o-- ) {
+            wanted_by_order[o] = countNSharesWanted(o);
+            wanted_by_order[0] += wanted_by_order[o];
+        }
+    }
+
+    public int[] getWantedByOrder()
+    {
+        return wanted_by_order;
+    }
+
+    public int[] getGivenByOrder()
+    {
+    	return given_by_order;
+    }
+
+    public void setGivenByOrder(int[] gbo)
+    {
+        if ( given_by_order == null ) {      // Can have multiple passes, don't reset on subsequent ones.
+            this.given_by_order = gbo;       // Look carefuly at calculateCap() below for details.
+        }
+    }
+
+    public int calculateCap(int order, int basis)
+    {
+        int perccap = Integer.MAX_VALUE;    // the cap, calculated from percent
+        int absolute = getAbsoluteCap();
+        double percent = getPercentCap();
+
+        if ( percent < 1.0 ) {
+            double b = basis;
+            b = b * percent;
+            perccap = (int) Math.round(b);
+        } else {
+            perccap = basis;
+        }
+
+        int cap =  Math.min(absolute, perccap) / order;   // cap on total shares available
+
+        //
+        // If this RC is defined over a nodepool that isn't the global nodepool then its share
+        // gets calculated multiple times.  The first time when it is encountered during the
+        // depth-first traversal, to work out the fair-share for all classes defined over the
+        // nodepool.  Subpool resources are also available to parent pools however, and must
+        // be reevaluated to insure the resources are fairly allocated over the larger pool
+        // of work.
+        //
+        // So at this point there might already be shares assigned.  If so, we need to
+        // recap on whatever is already given to avoid over-allocating "outside" of the
+        // assigned shares.
+        //
+        if ( (given_by_order != null) && subpool_counted )  {
+            cap = Math.min(cap, given_by_order[order]);
+        } // else - never been counted or assigned at this order, no subpool cap
+        
+        return cap;
+    }
+
+
+
+    /**
+     * Get capped number of quantum shares this resource class can support.
+     *
+    int countCappedQShares(int[] tmpSharesByOrder)
+    {
+        int K = 0;
+        for ( IRmJob j : allJobs.values() ) {
+            int order = j.getShareOrder();
+            K += (Math.min(j.getJobCap(), tmpSharesByOrder[order] * order));            
+        }
+        return K;
+    }
+    */
+    /**
+     * Sum all the capped shares of the jobs.  Then cap on the physical cap and
+     * again on the (possible) nodepool cap.
+     *
+    int countCappedNShares(int physicalCap, int order)
+    {
+        int K = 0;
+        
+        // First sum the max shares all my jobs can actually use
+        HashMap<IRmJob, IRmJob> jobs = jobsByOrder.get(order);
+        for ( IRmJob j : jobs.values() ) {
+            K += j.getJobCap();
+        }
+
+        // cap by what is physically there
+        K = Math.min(K, physicalCap);
+        return K;
+    }
+*/
+
+    /**
+     * Sum all the capped shares of the jobs.  Then cap on the physical cap and
+     * again on the (possible) nodepool cap.
+
+     TODO: this seems all wrong.
+    int getSubpoolCap(int order)
+    {
+        if ( subpool_counted ) {                   // share are dirty, cap is valid
+            if( given_by_order[order] == 0 ) {
+                //if ( !nSharesByOrder.containsKey(order) ) {
+                return Integer.MAX_VALUE;
+            } 
+            return given_by_order[order];
+        } else {
+            return Integer.MAX_VALUE;              // shares are clean, no cap
+        }
+    }
+    */
+
+//    int countCappedShares()
+//    {
+//        int count = 0;
+//        for ( IRmJob j : allJobs.values() ) {
+//            count += j.getJobCap();
+//        }
+//        return count;
+//    }
+
+    /**
+    int countSharesByOrder(int order)
+    {
+        if ( nSharesByOrder.containsKey(order) ) {
+            return nSharesByOrder.get(order);
+        }
+        return 0;
+    }
+*/
+    /**
+    int[] getSharesByOrder(int asize)
+    {
+        int[] answer = new int[asize];
+        for ( int i = 0; i < asize; i++ ) {
+            if ( nSharesByOrder.containsKey(i) ) {
+                answer[i] = nSharesByOrder.get(i);
+            } else {
+                answer[i] = 0;
+            }
+        }
+        return answer;
+    }
+*/
+    public boolean hasSharesGiven() 
+    {
+        return ( (given_by_order != null) && (given_by_order[0] > 0) );
+    }
+
+    private int countActiveShares()
+    {
+        int sum = 0;
+        for ( IRmJob j : allJobs.values() ) {
+            sum += (j.countNShares() * j.getShareOrder());          // in quantum shares
+        }
+        return sum;
+    }
+
+    /**
+     * Get the highest order of any job in this class.
+     
+    protected int getMaxOrder()
+    {
+        int max = 0;
+        for ( IRmJob j : allJobs.values() ) {
+            max = Math.max(max, j.getShareOrder());
+        }
+        return max;
+    }
+    */
+
+    HashMap<IRmJob, IRmJob> getAllJobs()
+    {
+        return allJobs;
+    }
+
+    HashMap<Integer, HashMap<IRmJob, IRmJob>> getAllJobsByOrder()
+    {
+        return jobsByOrder;
+    }
+
+    HashMap<User, HashMap<IRmJob, IRmJob>> getAllJobsByUser()
+    {
+        return jobsByUser;
+    }
+
+    ArrayList<IRmJob> getAllJobsSorted(Comparator<IRmJob> sorter)
+    {
+        ArrayList<IRmJob> answer = new ArrayList<IRmJob>();
+        answer.addAll(allJobs.values());
+        Collections.sort(answer, sorter);
+        return answer;
+    }
+
+    int getMaxJobOrder()
+    {
+        return max_job_order;
+    }
+
+    int makeReadable(int i)
+    {
+        return (i == Integer.MAX_VALUE ? -1 : i);
+    }
+
+    /**
+     * Stringify the share tables for the logs
+     */
+    
+    /**
+    public String tablesToString()
+    {
+        ArrayList<Integer> keys = new ArrayList<Integer>();
+        keys.addAll(nSharesByOrder.keySet());
+        Collections.sort(keys);
+        int max = 0;
+        for ( int k : keys ) {
+            max = Math.max(max, k);
+        }
+        String[] values = new String[max+1];
+        values[0] = id;
+        for ( int i = 1; i < max+1; i++) {
+            if ( !nSharesByOrder.containsKey(i) ) {
+                values[i] = "0";
+            } else {
+                values[i] = Integer.toString(nSharesByOrder.get(i));
+            }
+        }
+        StringBuffer sb = new StringBuffer();
+        sb.append("%23s:");
+        for ( int i = 0; i < max; i++ ) {
+            sb.append("%s ");
+        }
+        
+        return String.format(sb.toString(), (Object[]) values);
+    }
+    */
+
+    // note we assume Nodepool is the last token so we don't set a len for it!
+    private static String formatString = "%12s %11s %4s %5s %5s %5s %6s %6s %7s %6s %6s %7s %5s %7s %s";
+    public static String getDashes()
+    {
+        return String.format(formatString, "------------", "-----------",  "----", "-----", "-----", "-----", "------", "------", "-------", "------", "------", "-------", "-----", "-------", "--------");
+    }
+
+    public static String getHeader()
+    {
+        return String.format(formatString, "Class Name", "Policy", "Prio", "Wgt", "MinSh", "MaxSh", "AbsCap", "PctCap", "InitCap", "Dbling", "Prdct", "PFudge", "Shr", "Enforce", "Nodepool");
+    }
+
+    @Override
+    public int hashCode()
+    { 
+        return id.hashCode();
+    }
+
+    public String toString() {
+        return String.format("%12s %11s %4d %5d %5d %5d %6d %6d %7d %6s %6s %7d %5d %7s %s", 
+                             id,
+                             policy.toString(),
+                             priority, 
+                             share_weight, 
+                             makeReadable(min_shares), 
+                             makeReadable(max_processes), 
+                             makeReadable(absolute_cap), 
+                             (int) (percent_cap *100), 
+                             initialization_cap,
+                             expand_by_doubling,
+                             use_prediction,
+                             prediction_fudge,
+                             countActiveShares(),
+                             enforce_memory,
+                             nodepoolName
+            );
+    }
+
+    public String toStringWithHeader()
+    {
+        StringBuffer buf = new StringBuffer();
+        
+
+        buf.append(getHeader());
+        buf.append("\n");
+        buf.append(toString());
+        return buf.toString();
+    }
+
+    public Comparator<IEntity> getApportionmentSorter()
+    {
+        return apportionmentSorter;
+    }
+
+    static private class ApportionmentSorterCl
+        implements Comparator<IEntity>
+    {
+        public int compare(IEntity e1, IEntity e2)
+        {
+        	// we want a consistent sort, that favors higher share weights
+            if ( e1 == e2 ) return 0;
+            int w1 = e1.getShareWeight();
+            int w2 = e2.getShareWeight();
+            if ( w1 == w2 ) {
+                return e1.getName().compareTo(e2.getName());
+            }
+            return (int) (w2 - w1);
+        }
+    }
+
+}
+            

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,1334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+/**
+ * This class represents a job inside the scheduler.
+ */
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
+
+
+public class RmJob
+	implements SchedConstants, 
+               IRmJob
+{
+    DuccLogger     logger = DuccLogger.getLogger(RmJob.class, COMPONENT_NAME);
+	static final int DEFAULT_NTHREADS = 4;
+	
+    protected DuccId   id;                            // sched-assigned id (maybe delegate to job manager eventually)
+    protected DuccType ducc_type;                     // for messages so we can tell what kind of job
+    protected String name;                            // user's name for job
+    protected String resource_class_name;             // Name of the res class, from incoming job parms
+    protected ResourceClass resource_class;           // The actual class, assigned as job is received in scheduler.
+    protected int    user_priority;                   // user "priority", really apportionment 
+
+    protected int n_machines;                         // RESERVE:     minimum machines to allocate
+    protected int min_shares;                         // FIXED_SHARE: minimum N shares to allocate
+    protected int max_shares;                         // FAIR_SHARE:  maximum N shares to allocate
+    protected boolean is_reservation = false;
+
+    protected int threads;                            // threads per process
+
+    protected int memory;                             // estimated memory usage
+    protected int nquestions;                         // number of work-items in total
+    protected int nquestions_remaining;               // number of uncompleted work items
+    protected double time_per_item = 0.0;             // from OR - mean time per work item
+
+    protected int share_order = 0;                    // How many shares per process this job requires (calculated on submission)
+
+    protected int share_cap = Integer.MAX_VALUE;      // initially; scheduler policy will reset as the job ages
+    protected int pure_fair_share = 0;                // pure uncapped un-bonused share for this job
+
+    protected long submit_time;                       // when job is submitted ( sched or job-manager sets this )
+
+    protected String username;
+    protected User user;                              // user id, enforced by submit and job manager. we just believe it in sched.
+
+    //
+    // We keep track of three things related to machines:
+    // 1.  All the machines the job is running on.
+    // 2.  The machines the job will be exanded to run on but which aren't yet dispatched
+    // 3.  The machines to be removed from the job, but which the job is still running on
+    //
+    protected HashMap<Share, Share> assignedShares;      // job is dispatched to these
+    protected HashMap<Share, Share> pendingShares ;      // job is scheduled for these but not yet confirmed
+    protected HashMap<Share, Share> pendingRemoves;      // job is scheduled to remove these but not confirmed
+    protected HashMap<Share, Share> recoveredShares;     // recovery after bounce, need to reconnect these
+
+    // track shares by machine, and machines, to help when we have to give stuff away
+    Map<Machine, Map<Share, Share>> sharesByMachine = new HashMap<Machine, Map<Share, Share>>();
+    Map<Machine, Machine> machineList = new HashMap<Machine, Machine>();
+
+    // protected int shares_given;                       // during scheduling, how many N-shares we get
+    int[] given_by_order;                                // during scheduling, how many N-shares we get
+    int[] wanted_by_order;                               // during scheduling, how many N-shares we we want - volatile, changes during countJobsByOrder
+    protected boolean init_wait;                         // If True, we're waiting for orchestrator to tell us that init is successful.
+                                                         // Until then, we give out only a very small share.
+    protected boolean completed = false;                 // RmJob can linger a bit after completion for the
+                                                         // defrag code - must mark it complete
+
+    // For predicting end of job based on current rate of completion
+    protected int orchestrator_epoch;
+    protected int rm_rate;
+    protected int ducc_epoch;                           
+
+    protected Properties jobprops;                       // input that job is constructed from. currently is condensed from Blade logs (simulation only)
+
+    protected String refusalReason = null;               // if refused, this is why, for the message
+
+    private static Comparator<IEntity> apportionmentSorter = new ApportionmentSorterCl();
+
+    protected RmJob()
+    {
+    }
+
+    public RmJob(DuccId id)
+    {
+        this.id = id;
+
+        orchestrator_epoch = SystemPropertyResolver.getIntProperty("ducc.orchestrator.state.publish.rate", 10000);
+        rm_rate            = SystemPropertyResolver.getIntProperty("ducc.rm.state.publish.ratio", 4);
+        ducc_epoch         = orchestrator_epoch * rm_rate;
+    }
+    
+    // public RmJob(DuccId id, Properties properties)
+    // {
+    // 	this.jobprops = properties;
+    //     this.id = id;
+    // }
+    
+    /**
+     * read the props file and set inital values for internal fields
+     */
+    public void init()
+    {
+        assignedShares  = new HashMap<Share, Share>();
+        pendingShares   = new HashMap<Share, Share>();
+        pendingRemoves  = new HashMap<Share, Share>();
+        recoveredShares = new HashMap<Share, Share>();
+
+        if ( max_shares == 0 ) max_shares = Integer.MAX_VALUE;
+    }
+
+    public DuccId getId()
+    {
+        return id;
+    }
+
+    public void setId(DuccId id)
+    {
+        this.id = id;
+    }
+
+    public long getFriendlyId()
+    {
+    	return id.getFriendly();
+    }
+
+    public String getName()
+    {
+    	return id.toString();
+    }
+    
+    public void setJobName(String name)
+    {
+        this.name = name;
+    }
+    
+    public void setReservation()
+    {
+        this.is_reservation = true;
+    }
+
+    public boolean isReservation()
+    {
+        return is_reservation;
+    }
+
+    public void markComplete()
+    {
+        completed = true;
+    }
+
+    public boolean isCompleted()
+    {
+        return completed;
+    }
+
+    /**
+     *    The matrix:
+     *    IW = am I in initializaton wait
+     *    RU = am I runnable
+     *    Resched = do we need to reschedule
+     *    Reset   = set IW to this
+     *    x       = don't care
+     *
+     *    IW   RU    Resched    Reset
+     *     T    T       T         F
+     *     T    F       F         T
+     *     F    T       F         F
+     *     F    F       x         x
+     *
+     *     So resched = IR & RU
+     *        IW      = !RU
+     *
+     *    We return resched so caller knows the tickle the scheduler
+     */
+    public boolean setInitWait(boolean is_running)
+    {
+        boolean resched = init_wait & is_running;
+        init_wait = !is_running;
+        return resched;
+    }
+
+    /**
+     * Save ref to the class we are in, and init class-based structures.
+     */
+    public void setResourceClass(ResourceClass cl)
+    {
+        this.resource_class = cl;
+    }
+
+    /**
+     * Number of questions submitted, all must be answered.   This is used by job manager
+     * to know when they've all been dealt with.
+     */
+    public int nQuestions()
+    {
+    	return nquestions;
+    }
+
+    public void setNQuestions(int allq, int remainingq, double time_per_item)
+    {
+        this.nquestions = allq;
+        this.nquestions_remaining = remainingq;
+        this.time_per_item = time_per_item;
+    }
+
+    /**
+     * Number of questions still to be answered.  Used by scheduler to determing current
+     * machine requirement.
+     */
+    public int nQuestionsRemaining()
+    {
+    	return nquestions_remaining;
+    }
+
+
+    public Map<Machine, Map<Share, Share>> getSharesByMachine()
+    {
+        return sharesByMachine;
+    }
+
+    public Map<Machine, Machine> getMachines()
+    {
+        return machineList;
+    }
+
+    /**
+     * There are a fair number of piddling little methods to manage shares.  This high granularity is
+     * needed in order to manage bits and pieces of the bookkeeping from different threads and queues
+     * without blocking.
+     *
+     * TODO: maybe we can consolidate some of this after it's all worked out.
+     */
+
+    /**
+     * Before each scheduling epoch, clear the counting from the last time.
+     */
+    public void clearShares()
+    {
+        // this.shares_given = 0;
+        given_by_order = null;
+    }
+
+    /**
+    public void addQShares(int s)
+    {
+        this.shares_given += ( s / share_order ) ;    // convert to N-shares == processes
+    }
+    */
+
+    public void setPureFairShare(int pfs)
+    {
+        this.pure_fair_share = pfs;
+    }
+
+    public int getPureFairShare()
+    {
+        return pure_fair_share;
+    }
+
+    public int[] getGivenByOrder()
+    {
+        return given_by_order;
+    }
+
+    public void setGivenByOrder(int[] gbo)
+    {
+        this.given_by_order = gbo;
+    }
+
+    public int getShareWeight()
+    {
+        return 1;                         // all jobs are the same currently
+    }
+
+    public void initWantedByOrder(ResourceClass unused)
+    {
+        wanted_by_order              = NodePool.makeArray();
+        wanted_by_order[share_order] = getJobCap();
+        wanted_by_order[0]           = wanted_by_order[share_order];
+    }
+
+    public int[] getWantedByOrder()
+    {
+        return wanted_by_order;
+    }
+
+    public int calculateCap(int order, int total)
+    {
+        return Integer.MAX_VALUE;  // no cap for jobs
+    }
+
+
+    public int countNSharesGiven()
+    {
+        if ( given_by_order == null) { return 0; }
+        return given_by_order[share_order];
+    }
+
+    public int countQSharesGiven()
+    {
+        return countNSharesGiven() * share_order;
+    }
+
+    /**
+     * Number of N-shares I'm losing.
+     */
+    public int countNSharesLost()
+    {
+        return countNShares() - countNSharesGiven();
+    }
+
+//     /**
+//      * Can I use more N-shares; how many?
+//      */
+//     public int canUseBonus(int bonus, int[] nSharesByOrder)
+//     {
+//         int cap = getJobCap();
+//         int can_use = Math.max(0, cap - shares_given);         // what can we actually use?
+
+//         if ( can_use > nSharesByOrder[share_order] ) {         // can't use more than physicalliy exist
+//             return 0;
+//         }
+
+//         for (int i = share_order; i <= Math.min(bonus, (nSharesByOrder.length - 1)); i++ ) {
+//             if ( (nSharesByOrder[i] > 0) &&  (i <= can_use) ) {
+//                 return i ;
+//             }
+//         }
+//        return 0;
+//    }
+
+    /**
+     * Can I use more N-shares?
+
+    public int canUseBonus(int bonus, int[] nSharesByOrder)
+    {
+        int cap = getJobCap();
+        int can_use = Math.max(0, cap - countNSharesGiven());         // what can we actually use?
+        
+        //         if ( can_use > nSharesByOrder[share_order] ) {         // can't use more than physicalliy exist
+        //             return 0;
+        //         }
+        if ( can_use == 0 ) {
+            return 0;
+        }
+        return ( nSharesByOrder[share_order] > 0 ) ? share_order : 0;
+    }
+    */
+
+    /**
+     * Officially allocated shares assigned to this job which are known to be in use.
+     */
+    public HashMap<Share, Share> getAssignedShares()
+    {
+        return assignedShares;
+    }
+
+    /**
+     * Shares recovered from the OR during job recovery.
+     */
+    public HashMap<Share, Share> getRecoveredShares()
+    {
+        return recoveredShares;
+    }
+
+    /**
+     * Newly allocated shares that have not been dispatched.  They're unavailable for scheduling but
+     * job manager doesn't know about them yet.  When we tell job manager we'll "promote" them to
+     * the assignedShares list.
+     */
+    public HashMap<Share, Share> getPendingShares()
+    {
+        return pendingShares;
+    }
+
+    /**
+     * We're dispatching, move machines to active list, and clear pending list.
+     * Tell caller which machines are affected so it can dispatch them.
+     */
+    public HashMap<Share, Share> promoteShares()
+    {
+        HashMap<Share, Share> answer = new HashMap<Share, Share>();        
+        for ( Share s : pendingShares.values() ) {
+            assignedShares.put(s, s);
+            Machine m = s.getMachine();
+            machineList.put(m, m);
+            Map<Share, Share> machine_shares = sharesByMachine.get(m);
+            if ( machine_shares == null ) {
+                machine_shares = new HashMap<Share, Share>();
+                sharesByMachine.put(m, machine_shares);
+            }
+            machine_shares.put(s, s);
+
+            answer.put(s, s);
+        }
+        pendingShares.clear();
+        return answer;
+    }
+
+    /**
+     * This share is being donated to someone more needy than I - see defrag code in NodepoolScheduler
+     */
+     public void cancelPending(Share s)
+     {
+         pendingShares.remove(s);
+     }
+
+//    public void assignReservation(Machine m)
+//    {
+//        reservations.add(m);
+//    }
+//
+//    public ArrayList<Machine> getReservations(Machine m)
+//    {
+//        return reservations;
+//    }
+
+//    public int countReservations()
+//    {
+//        return reservations.size();
+//    }
+//
+//    public boolean reservationComplete()
+//    {
+//        return reservations.size() == machines;
+//    }
+
+    /**
+     * Scheduler found us a new toy, make it pending until it's given to job manager.
+     */
+    public void assignShare(Share s)
+    {
+        pendingShares.put(s, s);
+    }
+
+    /**
+     * Job recovery: OR reports this share as one it already knew about.
+     */
+    public void recoverShare(Share s)
+    {
+        if ( (! assignedShares.containsKey(s)) && (!pendingShares.containsKey(s)) ) {
+            recoveredShares.put(s, s);
+        }
+    }
+
+    public boolean isPendingShare(Share s )
+    {
+        return pendingShares.containsKey(s);
+    }
+
+    /**
+     * Do I have un-dispatched shares?
+     */
+    public boolean isExpanded()
+    {
+        return pendingShares.size() > 0;
+    }
+
+    public boolean isShrunken()
+    {
+        return pendingRemoves.size() > 0;
+    }
+
+    public boolean isStable()
+    {
+        return (
+                ( assignedShares.size() > 0 ) &&
+                ( pendingShares.size() == 0 ) &&
+                ( pendingRemoves.size() == 0 )
+                );
+    }
+
+    public boolean isDormant()
+    {
+        return (
+                ( assignedShares.size() == 0 ) &&
+                ( pendingShares.size() == 0 ) &&
+                ( pendingRemoves.size() == 0 )
+                );
+    }
+
+    public void removeAllShares()
+    {
+    	String methodName = "removeAllShares";
+        if ( logger.isDebug() ) {
+            for ( Map<Share, Share> m : sharesByMachine.values() ) {
+                for ( Share s : m.values() ) {
+                    logger.debug(methodName, getId(), "Clear share", s);
+                }
+            }
+        }
+    	assignedShares.clear();
+    	pendingShares.clear();
+    	pendingRemoves.clear();
+        machineList.clear();
+        sharesByMachine.clear();
+    }
+
+    /**
+     * I've shrunk or this share has nothing left to do.  Remove this specific share.
+     */
+    public void removeShare(Share share)
+    {
+        String methodName = "removeShare";
+
+        if ( assignedShares.containsKey(share) ) {
+            int prev = assignedShares.size();
+            assignedShares.remove(share);
+            pendingRemoves.remove(share);
+
+            Machine m = share.getMachine();
+            Map<Share, Share> machineShares = sharesByMachine.get(m);
+            machineShares.remove(share);
+            if ( machineShares.size() == 0 ) {
+                sharesByMachine.remove(m);
+                machineList.remove(m);
+            }
+
+            logger.debug(methodName, getId(), "Job removes ", share.toString(), " reduces from ", prev, " to ", assignedShares.size() + ".");
+        } else {
+            logger.warn(methodName, getId(), "****** Job cannot find share " + share.toString() + " to remove. ******");
+        }
+    }
+
+    /**
+     * Used by Fixed Share and by Reservations
+     */
+    public void shrinkByOne(Share share)
+    {
+        String methodName = "shrinkByOne";
+        if ( assignedShares.containsKey(share) ) {
+            logger.debug(methodName, getId(), "Job schedules " + share.toString() + " for removal.");
+            pendingRemoves.put(share, share);
+            share.evict();
+        } else {
+            logger.warn(methodName, getId(), "****** Job cannot find share " + share.toString() + " to schedule for removal.******");
+        }
+
+    }
+
+    /**
+     * Shrink by 'shares' from machines of largest order starting from 'order' and decreasing.
+     * Investment is not used, this is to shrink-from-largest-machine to MINIMIZE FRAGMENTATIOPN.
+     *
+     * This implementation is simplest, we just vacate.  This is called in the sequence of highest
+     * order wanted so we're leaving the largest holes in the largest machines first.
+     *
+     * @param shares - number of N-shares that are wanted
+     * @param order  - N - try to free up space for shares of this size.
+     * @param force - When evicting for non-preemptables, we may need to free the requested
+     *                shares even if it puts us over our "fair" count.  If this happens
+     *                we'll end up "sliding" onto other machines (eventually).
+     * @param nodepool - only interested in shares from this nodepool.
+     *
+     * So when this is called, somebody needs (shares*order) shares, given out in chunks of
+     * (order).
+     * 
+     * @returns number of Q-shares actually given up
+     */
+    public int shrinkByOrderByMachine(int shares, int order, boolean force, NodePool nodepool)
+    {
+    	String methodName = "shrinkByOrderByMachine";
+
+        if ( shares <= 0 ) {
+            throw new SchedulingException(getId(), "Trying to shrink by " + shares + " shares.");
+        }
+
+        // These are the machines where I have stuff running.  
+        ArrayList<Machine> machinesSorted = new ArrayList<Machine>();
+        for ( Machine m : machineList.values() ) {
+            if ( (m.getNodepool() == nodepool) && ( m.getShareOrder() >= order) ) {
+                machinesSorted.add(m);
+            }
+        }
+        Collections.sort(machinesSorted, new MachineByOrderSorter());
+
+        int given = 0;        
+        int shares_to_lose = 0;
+
+        //
+        // How much to lose?  If we're not forcing, then only shares that are evicted because of
+        // the 'howMuch' counts.   If forcing then everything until we meet the goal or we run
+        // out of stuff to give.
+        //
+        if ( force ) {
+            shares_to_lose = countNShares();
+        } else {
+            shares_to_lose = Math.max(0, countNShares() - countNSharesGiven());
+        }
+        if ( shares_to_lose == 0 ) {
+            return 0;
+        }
+
+        for ( Machine m : machinesSorted ) {
+
+            logger.debug(methodName, getId(), "Inspecting machine", m.getId());
+            ArrayList<Share> slist = new ArrayList<Share>();
+
+            for ( Share s : sharesByMachine.get(m).values() ) {            // get the still-eligible shares 
+                if ( ! s.isEvicted() ) {
+                    slist.add(s);
+                }
+            }
+            if ( slist.size() == 0 ) {
+                continue;
+            }
+
+            int to_give = m.countFreedUpShares();
+            logger.debug(methodName, getId(), "A given:", given, "to_give:", to_give, "order", order, "shares", shares, "shares_to_lose", shares_to_lose);
+
+            Iterator<Share> iter = slist.iterator();
+            while ( iter.hasNext() &&  ( (given + (to_give/order)) < shares ) && (shares_to_lose > 0) ) {
+                Share s = iter.next();
+                logger.info(methodName, getId(), "Removing share", s.toString());
+                pendingRemoves.put(s, s);
+                s.evict();
+                to_give += share_order;
+                shares_to_lose--;
+            } 
+
+
+            given += (to_give / order);
+            if ( given >= shares ) {
+                break;
+            }
+        }        
+        return given;
+    }
+
+    /**
+     * Shrink by 'shares' from machines of largest order starting from 'order' and decreasing.
+     * Investment is not used, this is a shrink-from-largest-machine to MINIMIZE LOST WORK at the
+     * possibl eexpense of fragmentation.
+     *
+     * @param shares - number of N-shares that are wanted
+     * @param order  - N - try to free up space for shares of this size.
+     * @param force - When evicting for non-preemptables, we may need to free the requested
+     *                shares even if it puts us over our "fair" count.  If this happens
+     *                we'll end up "sliding" onto other machines (eventually).
+     * @param nodepool - only interested in shares from this nodepool.
+     *
+     * So when this is called, somebody needs (shares*order) shares, given out in chunks of
+     * (order).
+     * 
+     * @returns number of Q-shares actually given up
+     */
+    public int shrinkByInvestment(int shares, int order, boolean force, NodePool nodepool)
+    {
+    	String methodName = "shrinkByInvestment";
+
+        if ( shares <= 0 ) {
+            throw new SchedulingException(getId(), "Trying to shrink by " + shares + " shares.");
+        }
+
+        logger.debug(methodName, getId(), "Enter: shares", shares, "order", order, "force", force, "nodepool", nodepool.getId(),
+                    "nAssignedShares", assignedShares.size(), "nPendingShares", pendingShares.size());
+
+        ArrayList<Share> sharesSorted = new ArrayList<Share>();
+
+        // must pick up only shares in the given nodepool
+        for ( Share s : assignedShares.values() ) {
+            if ( s.getNodepoolId().equals(nodepool.getId()) && ( !s.isEvicted() ) ) {
+                sharesSorted.add(s);
+            } else {
+                if ( logger.isTrace () ) {
+                    logger.trace(methodName, getId(), "Skipping", s.getId(), "s.nodepool", s.getNodepoolId(), "incoming.nodepool", nodepool.getId(), "evicted", s.isEvicted());
+                }
+            }
+        }
+
+        if ( sharesSorted.size() == 0 ) {
+            return 0;
+        }
+
+
+        if ( logger.isTrace() ) {
+            logger.trace(methodName, getId(), "Shares Before Sort - id, isInitialized, investment:");
+            for ( Share s : sharesSorted ) {
+                logger.trace(methodName, getId(), s.getId(), s.isInitialized(), s.getInvestment());
+            }
+        }
+
+        Collections.sort(sharesSorted, new ShareByInvestmentSorter());
+
+        if ( logger.isTrace() ) {
+            logger.trace(methodName, getId(), "Shares After Sort - id, isInitialized, investment:");
+            for ( Share s : sharesSorted ) {
+                logger.trace(methodName, getId(), s.getId(), s.isInitialized(), s.getInvestment());
+            }
+        }
+
+
+        //
+        // How much to lose?  If we're not forcing, then only shares that are evicted because of
+        // the 'howMuch' counts.   If forcing then everything until we meet the goal or we run
+        // out of stuff to give.
+        //
+        int shares_given = 0;                  // number of shares of requested order given - NOT necessarily number of my own processes
+        int processes_to_lose = 0;             // number of processes I'm able to lose
+        int processes_given = 0;
+
+        if ( force ) {
+            processes_to_lose = countNShares();
+        } else {
+            processes_to_lose = Math.max(0, countNShares() - countNSharesGiven());
+        }
+        processes_to_lose = Math.min(processes_to_lose, sharesSorted.size());
+
+        if ( processes_to_lose == 0 ) {
+            return 0;
+        }
+
+        while ( (shares_given < shares) && (processes_given < processes_to_lose) ) {
+
+            int currently_given = 0;
+
+            if ( logger.isTrace() ) {
+                logger.trace(methodName, getId(), "In loop: Shares given", shares_given, "shares wanted", shares, 
+                             "processes_to_lose", processes_to_lose, "processes_given", processes_given);
+            }
+
+            Share s = sharesSorted.get(0);
+            Machine m = s.getMachine();
+            int to_give = m.countFreedUpShares();
+            logger.debug(methodName, getId(), "Inspecting share", s.getId());
+            ArrayList<Share> slist = new ArrayList<Share>();
+
+            Iterator<Share> iter = sharesSorted.iterator();
+            while ( (to_give < order) && iter.hasNext() ) {               // if we need more shares from this machine to be useful ...
+                // Here we search the share list for enough more shares on the machine to make up enough shares
+                // to satisy exactly one of the requested sizes.
+                Share ss = iter.next();
+                if ( ss.getMachine() == s.getMachine() ) {
+                    slist.add(ss);
+                    to_give += ss.getShareOrder();
+                }
+            }
+            
+            if ( to_give >= order ) {                 // did we find enough on the machine to make it useful to evict?
+                //slist.add(s);                         // didn't put on the list earlier, in case we couldn't use it
+                for ( Share ss : slist ) {
+                    logger.info(methodName, getId(), "Removing share", ss.toString());
+                    pendingRemoves.put(ss, ss);
+                    ss.evict();
+
+                    sharesSorted.remove(ss);
+                    processes_given++;
+                    currently_given++;
+                    if ( processes_given >= processes_to_lose ) break; // if this is too few to be useful, defrag will fix it (mostly)
+                }
+                shares_given += (to_give / order);
+            }
+            
+            //
+            // If we gave nothing away we didn't change any of the structures and we'll
+            // never exit.  So exit stage left asap right now.
+            // We rarely if ever will enter this but it prevents an infinite loop in
+            // varioius corner cases.
+            //
+            if ( currently_given == 0 ) {
+                logger.debug(methodName, getId(), "Gave no shares, breaking loop");
+                break;
+            }
+        }
+        
+    	return shares_given;
+    }
+
+    /**
+     * I'm shrinking by this many.  Put into pendingRemoves but don't take them off the assigned list
+     * until we know job manager has been told.
+     *
+     * TODO: this will likely change when the Scheduler starts deciding - either we'll put decision logic
+     * into Job proper, or defer it to the IScheduler implementation (probably the latter).
+     *
+     * @Deprecated.  Keeping until I can nuke or update the simple fair share code.
+     */
+    public void shrinkTo(int k)
+    {	
+//         int count = assignedShares.size() - k;
+//         for ( int i = 0; i < count; i++ ) {
+//             pendingRemoves.add(assignedShares.get(i));
+//         }
+    }
+
+    /**
+     * Waiting for somebody to deal with my shrinkage?
+     */
+    public boolean isShrunk()
+    {
+        return pendingRemoves.size() > 0;
+    }
+
+    /**
+     * Return the reclaimed shares.
+     */
+    public HashMap<Share, Share> getPendingRemoves()
+    {
+    	return pendingRemoves;
+    }
+    
+    /**
+     * And finally, dump the pending shrinkage.
+     */
+    public void clearPendingRemoves()
+    {
+        pendingRemoves.clear();
+    }
+
+    /**
+     * Recovery complete, clear the share map
+     */
+    public void clearRecoveredShares()
+    {
+        recoveredShares.clear();
+    }
+
+//     public Machine removeLastMachine()
+//     {
+//         return assignedMachines.remove(assignedMachines.size() - 1);
+//     }
+
+    /**
+     * Find number of nShares (virtual shares) this machine has assigned already.
+     *
+     * If things are slow in the cluster the pending removes might be
+     * non-zero.  This is an extreme corner case it's best to be safe.
+     */
+    public int countNShares()
+    {
+        return assignedShares.size() + pendingShares.size() - pendingRemoves.size();
+    }
+
+    public void refuse(String refusal)
+    {
+        String methodName = "refusal";
+        logger.warn(methodName, id, refusal);
+        this.refusalReason = refusal;
+    }
+
+    public boolean isRefused()
+    {
+    	return (refusalReason != null);
+    }
+    
+    public String getRefusalReason()
+    {
+    	return refusalReason;
+    }
+    
+    public void setShareOrder(int s)
+    {
+        this.share_order = s;
+    }
+
+    public int getShareOrder()
+    {
+        return share_order;
+    }
+
+    /**
+     * During the scheduling algorithm we want to track some things by userid.  The "share cap" stuff is used
+     * to keep track of max shares that I can actually use or want during scheduling but is generally just
+     * nonsense.
+     */
+    public void setShareCap(int cap)
+    {
+        this.share_cap = cap;
+    }
+
+    public int getShareCap()
+    {
+        return share_cap;
+    }
+
+    /**
+     * We try to project the maximum number of shares that this job can use, based on the current rate
+     * of completion of work items, and the known initialization time.  
+     *
+     * Many jobs have very long initialization times, and will complete in their current allocation before
+     * new processes can get started and initialized.  We want to avoid growing (evictions) in that case.
+     *
+     * How to use this ... 
+     */
+    int getProjectedCap()
+    {
+    	String methodName = "getProjectedCap";
+        if ( init_wait ) {                      // no cap if not initialized, because we don't know.  other caps will dominate.
+            return Integer.MAX_VALUE;
+        }
+
+        if ( time_per_item == Double.NaN ) {
+            return Integer.MAX_VALUE;
+        }
+
+        // Get average init time
+        int count = 0;
+        long total_init = 0;
+        for ( Share s : assignedShares.values() ) {
+            long t = s.getInitializationTime();
+            if ( t > 0 ) {
+                count++;
+                total_init += t;
+            }
+        }
+        long avg_init = 0;
+        if ( total_init > 0 ) {
+            avg_init = total_init / count;  // (to seconds)
+        } else {
+            logger.warn(methodName, getId(), username, "Initialization time is 0, project cap and investment will be inaccurate.");
+        }
+
+        // When in the future we want to estimate the amount of remaining work.
+        long target = avg_init + ducc_epoch + resource_class.getPredictionFudge();               
+
+        int nprocesses = countNShares();
+        double rate = ((double) (nprocesses * threads)) / time_per_item;                   // number of work items per second
+                                                                                           // with currently assigned resources
+        long projected = Math.round(target * rate);                                        // expected WI we can do after a new
+                                                                                           // process gets started
+
+        long future = Math.max(nquestions_remaining - projected, 0);                        // work still to do after doubling
+
+        logger.info(methodName, getId(), username, "O", getShareOrder(),"T", target, "NTh", (nprocesses * threads), "TI", avg_init, "TR", time_per_item,
+                    "R", rate, "QR", nquestions_remaining, "P", projected, "F", future, 
+                     "return", (future / threads));
+
+        int answer = (int) future / threads;
+        if ( (future % threads ) > 0 ) answer++;
+
+        return answer;                                                                     // number of processes we expect to need
+                                                                                           // in the future
+        
+    }
+
+    /**
+     * This returns the largest number that can actually be used, which will be either the
+     * share cap itself, or nProcess / nThreads, in N shares.
+     */
+    public int getJobCap()
+    {    	
+		String methodName = "getJobCap";
+
+        if ( isRefused() ) {
+            return 0;
+        }
+
+        int c = nquestions_remaining / threads;
+
+        if ( ( nquestions_remaining % threads ) > 0 ) {
+            c++;
+        }
+
+        c = Math.max(c, assignedShares.size());        // if job is ending we could be fragmented and have to be
+                                                       // careful not to underestimate, or we end up possibly
+                                                       // evicting something that should be left alone.
+
+        // 
+        // If waiting for initialization, we have to cap as well on the maximum number of shares
+        // we give out, in case the job can't start, to avoid unnecessary preemption.
+        //
+        // Must convert to N-shares, because that is the number of actual processes, which is the
+        // unit that the initialization cap is specified in.
+        //
+        
+        int base_cap = Math.min(getMaxShares(), c);
+        int projected_cap = getProjectedCap();       
+
+        int potential_cap = base_cap;
+        int actual_cap = 0;
+
+        if ( resource_class.isUsePrediction() ) {
+            if (projected_cap < base_cap ) {                     // If we project less need, revise the estimate down
+                potential_cap = Math.max(projected_cap, assignedShares.size());
+            } 
+        }
+            
+        if ( init_wait && ( resource_class.getInitializationCap() > 0) ) {
+            actual_cap = Math.min(potential_cap, (resource_class.getInitializationCap()));
+        } else {
+
+            if ( init_wait ) {                                 // ugly, but true, if not using initialization caps
+                actual_cap =  potential_cap;
+            } else  if ( resource_class.isExpandByDoubling() ) {
+                if ( (assignedShares.size() == 0) ) {
+                    actual_cap = Math.max(1, resource_class.getInitializationCap());   // if we shrink to 0, need to restart from the init cap
+                } else {
+                    actual_cap = Math.min(potential_cap, assignedShares.size() * 2);
+                }
+            } else {
+                actual_cap = potential_cap;
+            }
+        }
+
+        logger.debug(methodName, getId(), username, "O", getShareOrder(), "Base cap:", base_cap, "Expected future cap:", projected_cap, "potential cap", potential_cap, "actual cap", actual_cap);
+        return actual_cap;
+    }
+
+    public int getMaxShares()
+    {
+        // if set to -1, our max is the number already assigned
+        if ( max_shares < 0 ) {
+            return countNShares();
+        } else {
+            return max_shares;
+        }
+    }
+
+    public int getMinShares()
+    {
+        return min_shares;
+    }
+
+    public void setMaxShares(int s)
+    {
+        this.max_shares = s;
+    }
+
+    public void setMinShares(int s)
+    {
+        this.min_shares = s;
+    }
+
+    public boolean isRunning()
+    {
+        return countNShares() > 0 ? true : false;
+    }
+
+    public String getUserName()
+    {
+        return username;
+    }
+
+    public void setUserName(String n)
+    {
+        this.username = n;
+    }
+
+    public User getUser()
+    {
+        return user;
+    }
+
+    public void setUser(User u)
+    {
+        this.user = u;
+    }
+
+    public long getTimestamp()
+    {
+    	return submit_time;
+    }
+    
+    public void setTimestamp(long t)
+    {
+        this.submit_time = t;
+    }
+
+    public int getUserPriority() {
+        return user_priority;
+    }
+
+    public void setUserPriority(int p) {
+        this.user_priority = p;
+    }
+
+    public String getClassName() {
+        return resource_class_name;
+    }
+
+    public void setClassName(String class_name) {
+        this.resource_class_name = class_name;
+    }
+
+    public int getSchedulingPriority() {
+        return resource_class.getPriority();
+    }
+
+    public Policy getSchedulingPolicy() {
+        return resource_class.getPolicy();
+    }
+
+    public ResourceClass getResourceClass() {
+    	return resource_class;
+    }
+    
+    public int countInstances() {
+        return n_machines;
+    }
+
+    public void setNInstances(int m)
+    {
+        this.n_machines = m;
+    }
+
+    public int getMaxMachines() {
+        return min_shares;
+    }
+
+    public int nThreads() {
+        return threads;
+    }
+    public void setThreads(int th)
+    {
+    	this.threads = th;
+    }
+
+    public int getMemory() {
+        return memory;
+    }
+
+    public void setMemory(int memory) {
+        this.memory = memory;
+    }
+
+    public void setDuccType(DuccType type)
+    {
+        this.ducc_type = type;
+    }
+
+    public DuccType getDuccType()
+    {
+        return this.ducc_type;
+    }
+
+    /**
+     * Is at least one of my current shares initialized?
+     */
+    public boolean isInitialized()
+    {
+        for (Share s : assignedShares.values()) {
+            if ( s.isInitialized() ) return true;
+        }
+        return false;
+    }
+
+    /**
+     * Logging and debugging - nice to know what my assigned shares are.
+     */
+    public String printShares()
+    {
+        StringBuffer buf = new StringBuffer("AssignedShares: ");
+        if ( assignedShares.size() == 0 ) {
+            buf.append("<none>");
+        } else {
+            for ( Share s : assignedShares.values()) {
+                buf.append(s.getId());
+                buf.append(" ");
+            }
+        }
+
+        buf.append("\nPendingShares: ");
+        if ( pendingShares.size() == 0 ) {
+            buf.append("<none>");
+        } else {
+            for ( Share s : pendingShares.values()) {
+                buf.append(s.getId());
+                buf.append(" ");
+            }
+        }
+
+        buf.append("\nPendingRemoves: ");
+        if ( pendingRemoves.size() == 0 ) {
+            buf.append("<none>");
+        } else {
+            for ( Share s : pendingRemoves.values()) {
+                buf.append(s.getId());
+                buf.append(" ");
+            }
+        }
+
+        return buf.toString();
+    }
+
+    String getShortType()
+    {
+        String st = "?";
+        switch ( ducc_type ) {
+        case Reservation:
+            st = "R";
+            break;
+        case Job:
+            st = "J";
+            break;
+        case Service:
+            st = "S";
+            break;
+        }
+        return st;
+    }
+
+    public static String getHeader()
+    {
+        return String.format("%6s %30s %10s %10s %6s %5s %13s %8s %6s %9s %11s %8s", 
+                             "ID", "JobName", "User", "Class", 
+                             "Shares", "Order", "QuantumShares", 
+                             "NThreads", "Memory",
+                             "Questions", "Q Remaining", "InitWait");
+    }
+
+    public String toString()
+    {
+        int shares = assignedShares.size() + pendingShares.size();        
+
+        if ( isReservation() ) {
+            return String.format("%1s%5s %30.30s %10s %10s %6d",
+                                 getShortType(), id.toString(), name, username, getClassName(), shares);
+                                 
+        } else {
+            return String.format("%1s%5s %30.30s %10s %10s %6d %5d %13d %8d %6d %9d %11d %8s", 
+                                 getShortType(),
+                                 id.toString(), name, username, getClassName(), 
+                                 shares, share_order, (shares * share_order),
+                                 threads, memory,
+                                 nQuestions(), nQuestionsRemaining(),
+                                 init_wait);
+        }
+    }
+
+    public String toStringWithHeader()
+    {
+        StringBuilder buf = new StringBuilder(getHeader());
+        buf.append("\n");
+        buf.append(toString());
+        return buf.toString();
+    }
+
+    //
+    // Order machines by DECREASING order
+    //
+    class MachineByOrderSorter
+    	implements Comparator<Machine>
+    {	
+    	public int compare(Machine m1, Machine m2)
+        {
+            return (int) (m2.getShareOrder() - m1.getShareOrder());
+        }
+    }
+
+    //
+    // Order shares by INCREASING investment
+    //
+    class ShareByInvestmentSorter
+    	implements Comparator<Share>
+    {	
+    	public int compare(Share s1, Share s2)
+        {
+            //
+            // First divide them into two pools: 
+            // not-initialized shares always sort LESS than initialized shares
+            if ( ! s1.isInitialized() ) {
+                if ( s2.isInitialized() ) return -1;
+                // both not initialized. sort on less time spent initializing so far (fall through)
+            } else {
+                if ( ! s2.isInitialized() ) return 1;                
+                // bot initialized.  Again sort on less time spent ever in init. (fall through)
+            }
+
+            return ( (int) (s1.getInvestment() - s2.getInvestment()) );
+        }
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return id.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) 
+    {
+
+        if (this == obj)                  return true;
+
+        if (obj == null)                  return false;
+
+
+        if (getClass() != obj.getClass()) return false;
+
+        IRmJob other = (IRmJob) obj;
+
+    
+        // can't get null id
+        if ( !id.equals(other.getId()) )  return false;
+
+        //can't get null shares.. normal compare should finish it off.
+                                          return assignedShares.equals(other.getAssignedShares());
+    }
+
+    public Comparator<IEntity> getApportionmentSorter()
+    {
+        return apportionmentSorter;
+    }
+
+    static private class ApportionmentSorterCl
+        implements Comparator<IEntity>
+    {
+        public int compare(IEntity e1, IEntity e2)
+        {
+            if ( e1 == e2 ) return 0;
+            return (int) (e1.getTimestamp() - e2.getTimestamp());
+        }
+    }
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedConstants.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedConstants.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedConstants.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedConstants.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+public interface SchedConstants
+{
+    public enum Policy {
+        FAIR_SHARE,
+        FIXED_SHARE,
+        RESERVE,
+    };
+
+    public enum EvictionPolicy {
+        SHRINK_BY_MACHINE,         // shrink by largest machine first, to minimize fragmentation
+        SHRINK_BY_INVESTMENT,      // shrink by lowest investment first, to minimize waste
+    };
+
+    public static final String COMPONENT_NAME            = "RM";
+    public static final int DEFAULT_STABILITY_COUNT      = 5;
+    public static final int DEFAULT_INIT_STABILITY_COUNT = 3;
+    public static final int DEFAULT_SCHEDULING_RATIO     = 4;
+    public static final int DEFAULT_SCHEDULING_RATE      = 60000;
+    public static final int DEFAULT_NODE_METRICS_RATE    = 60000;
+
+    public static final int DEFAULT_PROCESSES            = 10;     // for jobs, number of processes if not specified
+    public static final int DEFAULT_INSTANCES            = 1;     // for reservations, number of instances if not specified
+
+    public static final int DEFAULT_MAX_PROCESSES        = Integer.MAX_VALUE; // for jobs, the class cap, if not configured
+    public static final int DEFAULT_MAX_INSTANCES        = Integer.MAX_VALUE; // for reservations, class cap, if not configured
+
+    public static final int DEFAULT_SHARE_WEIGHT         = 100;
+    public static final int DEFAULT_PRIORITY             = 10;
+
+}
+            

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedConstants.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedInternalError.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedInternalError.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedInternalError.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedInternalError.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import org.apache.uima.ducc.common.utils.id.DuccId;
+
+@SuppressWarnings("serial")
+public class SchedInternalError
+    extends RuntimeException
+{
+    DuccId jobid;
+    
+    public SchedInternalError(DuccId jobid, String msg)
+    {
+        super(msg);
+        
+        this.jobid = jobid;
+    }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedInternalError.java
------------------------------------------------------------------------------
    svn:eol-style = native