You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:43:38 UTC
svn commit: r1427967 [4/5] - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src: main/ main/java/
main/java/org/ main/java/org/apache/ main/java/org/apache/uima/
main/java/org/apache/uima/ducc/ main/java/org/apache/uima/ducc/rm/
main/java/org/apache/uim...
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/README
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/README?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/README (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/README Wed Jan 2 19:43:37 2013
@@ -0,0 +1,57 @@
+Some of the files here are used to implement the DUCC model for developing and
+debugging the scheduler.
+
+------------------------------------------------------------------------
+These are the scheduler codes:
+
+ IScheduler - interface defining the code that makes the tough decisions. The intent
+ is to be able to implement different schedulers that just plug in to
+ IScheduleMain.
+
+ IScheduleMain - interface defining the scheduling "daemon" that interfaces
+ between the world and IScheduler.
+
+ Scheduler - implementation of IScheduleMain
+
+ SimpleFairShare - implementation of IScheduler providing a simple fair-share
+ with single prioiry scheduler.
+
+ Share - repressents one share. The IScheduleMain will look at the resources
+ and create Shares from them.
+
+ Job - represents one job.
+
+ Machine - represents one machine.
+
+ WorkItem - represents one work item. (Probably not needed for pure scheduling
+ purposes, essential for modeling).
+
+ SchedulingException - grab-all for anything that goes wrong in the scheduling code.
+ Presents a RuntimeException (no crash)
+
+ User - represents one user
+
+------------------------------------------------------------------------
+These are purely for simulation. I may refactor JobManager through an interface so I can
+ have the modeled JobManager and the "real" one, transparently. There's not much code but
+ there's a bit of tricky threading and timer management here.
+
+ JobManager - scheduler interacts with this for ALL job actions, and only JobManager gets
+ to interact with scheduler. Simulates / models the real Job Manager.
+
+ JobHandler - Models the UIMA/AS client for a job - feeds questions to the Agents and
+ fields returned state messages. Interacts with JobManager and Agent.
+
+ Agent - Models the node agent - dispatches questions.
+
+ ExecutionTask - this is a simple TimerTask that simulates execution of a task by just
+ sleeping for an appropriate amount of time.
+
+------------------------------------------------------------------------
+Other utility codes:
+ Main - what you'd expect from main - starts things running and waits for it to die.
+ It also supports a trivial command line through stdin that simulates the
+ DUCC / Blade CLI.
+
+ Logger - simple logger that sort of looks like Log4J until we decide on something else.
+
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/README
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,827 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+
+import org.apache.uima.ducc.common.utils.DuccProperties;
+import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
+
+
+/**
+ * This represents a priority class.
+ */
+public class ResourceClass
+ implements SchedConstants,
+ IEntity
+{
+ //private DuccLogger logger = DuccLogger.getLogger(this.getClass(), COMPONENT_NAME);
+
+ private String id;
+ private Policy policy;
+ private int priority; // orders evaluation of the class
+
+
+ private int share_weight; // for fair-share, the share weight to use
+ private int min_shares; // fixed-shre: min shares to hand out
+ private int max_processes; // fixed-share: max shares to hand out regardless of
+ // what is requested or what fair-share turns out to be
+
+ private int max_machines; // reservation: max machines that can be reserved by a single user - global across
+ // all this user's requests.
+
+ // for reservation, this caps machines.
+ // for shares, this caps shares
+ private int absolute_cap; // max shares or machines this class can hand out
+ private double percent_cap; // max shares or machines this class can hand out as a percentage of all shares
+ private int true_cap; // set during scheduling, based on actual current resource availability
+ private int pure_fair_share; // the unmodified fair share, not counting caps, and not adding in bonuses
+
+ private HashMap<IRmJob, IRmJob> allJobs = new HashMap<IRmJob, IRmJob>();
+ private HashMap<Integer, HashMap<IRmJob, IRmJob>> jobsByOrder = new HashMap<Integer, HashMap<IRmJob, IRmJob>>();
+ private HashMap<User, HashMap<IRmJob, IRmJob>> jobsByUser = new HashMap<User, HashMap<IRmJob, IRmJob>>();
+ private int max_job_order = 0; // largest order of any job still alive in this rc (not necessarily globally though)
+
+ // private HashMap<Integer, Integer> nSharesByOrder = new HashMap<Integer, Integer>(); // order, N shares of that order
+ private boolean subpool_counted = false;
+
+ // the physical presence of nodes in the pool is somewhat dynamic - we'll store names only, and generate
+ // a map of machines on demand by the schedler from currently present machnes
+ private String nodepoolName = null;
+
+// ArrayList<String> nodepool = new ArrayList<String>(); // nodepool names only
+// HashMap<String, Machine> machinesByName = new HashMap<String, Machine>();
+// HashMap<String, Machine> machinesByIp = new HashMap<String, Machine>();
+
+ // Whether to enforce memory constraints for matching reservations
+ private boolean enforce_memory = true;
+
+ // int class_shares; // number of shares to apportion to jobs in this class in current epoch
+
+ private boolean expand_by_doubling = true;
+ private int initialization_cap = 2;
+ private int prediction_fudge = 10000;
+ private boolean use_prediction = true;
+
+ private int[] given_by_order = null;
+ private int[] wanted_by_order = null; // volatile - changes during countClassesByOrder
+
+ private static Comparator<IEntity> apportionmentSorter = new ApportionmentSorterCl();
+
+ public ResourceClass(String id) //, HashMap<String, Machine> machinesByName, HashMap<String, Machine> machinesByIp)
+ {
+ this.id = id;
+ this.policy = Policy.FAIR_SHARE;
+ this.share_weight = 100;
+ this.priority = 10;
+ this.max_processes = Integer.MAX_VALUE;
+ this.min_shares = 0;
+
+ this.max_machines = Integer.MAX_VALUE;
+
+ this.absolute_cap = Integer.MAX_VALUE;
+ this.percent_cap = 1.0;
+
+ this.expand_by_doubling = SystemPropertyResolver.getBooleanProperty("ducc.rm.expand.by.doubling", true);
+ this.initialization_cap = SystemPropertyResolver.getIntProperty("ducc.rm.initialization.cap", 2);
+
+ this.use_prediction = SystemPropertyResolver.getBooleanProperty("ducc.rm.prediction", true);
+ this.prediction_fudge = SystemPropertyResolver.getIntProperty("ducc.rm.prediction.fudge", 10000); // extra fudge factor to add in to the
+ // projection, in miliseconds.
+
+ //this.machinesByName = machinesByName;
+ //this.machinesByIp = machinesByIp;
+ }
+
+ // TODO: sanity check
+ // - emit warnings if shares are specified in reservations
+ // if machines are sprcified for fair or fixed-share
+ // etc.
+ void init(DuccProperties props)
+ {
+ //String methodName = "init";
+ String k = "scheduling.class." + id + ".";
+ String s;
+ s = props.getProperty(k + "policy");
+
+ if ( s == null ) {
+ throw new SchedulingException(null, "Configuration problem: no policy for class " + id + ".");
+ }
+ policy = Policy.valueOf(s);
+
+ share_weight = props.getIntProperty(k + "share_weight" , DEFAULT_SHARE_WEIGHT);
+ priority = props.getIntProperty(k + "priority", DEFAULT_PRIORITY);
+ min_shares = props.getIntProperty(k + "min_shares", 0); // default no min
+
+ switch ( policy ) {
+ case FAIR_SHARE:
+ max_processes = props.getIntProperty(k + "max_processes", DEFAULT_MAX_PROCESSES); // default no max
+ max_machines = 0;
+ break;
+
+ case FIXED_SHARE:
+ max_processes = props.getIntProperty(k + "max_processes", DEFAULT_MAX_PROCESSES); // default no max
+ max_machines = 0;
+ break;
+
+ case RESERVE:
+ max_processes = 0;
+ max_machines = props.getIntProperty(k + "max_machines", DEFAULT_MAX_INSTANCES); // default max 1
+ break;
+
+ }
+ if ( max_processes <= 0 ) max_processes = Integer.MAX_VALUE;
+ if ( max_machines <= 0 ) max_machines = Integer.MAX_VALUE;
+
+ enforce_memory = props.getBooleanProperty(k + "enforce.memory", true);
+
+ initialization_cap = props.getIntProperty(k + "initialization.cap", initialization_cap);
+ expand_by_doubling = props.getBooleanProperty(k + "expand.by.doubling", expand_by_doubling);
+ use_prediction = props.getBooleanProperty(k + "prediction", use_prediction);
+ prediction_fudge = props.getIntProperty(k + "prediction.fudge", prediction_fudge);
+
+ s = props.getStringProperty(k + "cap", ""+Integer.MAX_VALUE); // default no cap
+ if ( s.endsWith("%") ) {
+ int t = Integer.parseInt(s.substring(0, s.length()-1));
+ percent_cap = (t * 1.0 ) / 100.0;
+ } else {
+ absolute_cap = Integer.parseInt(s);
+ if (absolute_cap == 0) absolute_cap = Integer.MAX_VALUE;
+ }
+
+ nodepoolName = props.getProperty(k + "nodepool"); // optional nodepool
+ if (nodepoolName == null) {
+ nodepoolName = NodePool.globalName;
+ }
+ }
+
+ public long getTimestamp()
+ {
+ return 0;
+ }
+
+ String getNodepoolName()
+ {
+ return nodepoolName;
+ }
+
+ public void setPureFairShare(int pfs)
+ {
+ this.pure_fair_share = pfs;
+ }
+
+ public int getPureFairShare()
+ {
+ return pure_fair_share;
+ }
+
+ public boolean isExpandByDoubling()
+ {
+ return expand_by_doubling;
+ }
+
+ public void setExpandByDoubling(boolean ebd)
+ {
+ this.expand_by_doubling = ebd;
+ }
+
+ public int getInitializationCap()
+ {
+ return initialization_cap;
+ }
+
+ public void setInitializationCap(int c)
+ {
+ this.initialization_cap = c;
+ }
+
+ public boolean isUsePrediction()
+ {
+ return use_prediction;
+ }
+
+ public int getPredictionFudge()
+ {
+ return prediction_fudge;
+ }
+
+ public boolean enforceMemory()
+ {
+ return enforce_memory;
+ }
+
+ public Policy getPolicy()
+ {
+ return policy;
+ }
+
+ public void setTrueCap(int cap)
+ {
+ this.true_cap = cap;
+ }
+
+ public int getTrueCap()
+ {
+ return true_cap;
+ }
+
+ public double getPercentCap() {
+ return percent_cap;
+ }
+
+ public int getAbsoluteCap() {
+ return absolute_cap;
+ }
+
+ public int getMaxProcesses() {
+ return max_processes;
+ }
+
+ public int getMinShares() {
+ return min_shares;
+ }
+
+ public int getMaxMachines() {
+ return max_machines;
+ }
+
+ void setPolicy(Policy p)
+ {
+ this.policy = p;
+ }
+
+ /**
+ public String getId()
+ {
+ return id;
+ }
+*/
+
+ public String getName()
+ {
+ return id;
+ }
+
+ public int getShareWeight()
+ {
+ return share_weight;
+ }
+
+ /**
+ * Return my share weight, if I have any jobs of the given order or less. If not,
+ * return 0;
+ */
+ public int getEffectiveWeight(int order)
+ {
+ for ( int o = order; o > 0; o-- ) {
+ if ( jobsByOrder.containsKey(o) && ( jobsByOrder.get(o).size() > 0) ) {
+ return share_weight;
+ }
+ }
+ return 0;
+ }
+
+// /** @deprecated */
+// public void setClassShares(int s)
+// {
+// this.class_shares = Math.min(s, class_shares);
+// }
+
+ /**
+ * Add 's' ** quantum ** shares of the indicated order.
+ * Return the actual number of shares, which might have been capped.
+ * @deprecated
+
+ public int setClassSharesByOrder(int order, int s)
+ {
+ int val = 0;
+ s /= order; // convert to nShares
+ if ( nSharesByOrder.containsKey(order ) ) {
+ val = Math.min(s, nSharesByOrder.get(order)); // not first time, must use min
+ } else {
+ val = s; // first time, just accept it
+ }
+ nSharesByOrder.put(order, val);
+ return val * order;
+ }
+ */
+
+ /**
+ * Add one ** virtual ** share of the given order.
+
+ public void addClassShare(int order)
+ {
+ nSharesByOrder.put(order, nSharesByOrder.get(order) + 1);
+ }
+ */
+ /**
+ public int canUseShares(NodePool np, int[] tmpSharesByOrder)
+ {
+ if ( !np.getId().equals(nodepoolName) ) return 0; // can't use any from somebody else's nodepool
+
+ for ( int o = max_job_order; o > 0; o-- ) {
+
+ if ( !nSharesByOrder.containsKey(o) ) continue;
+
+ int given = nSharesByOrder.get(o);
+ int can_use = 0;
+
+ if ( tmpSharesByOrder[o] > 0 ) { // do we have any this size?
+ HashMap<IRmJob, IRmJob> jbo = jobsByOrder.get(o); // yeah, see if any job wants it
+ if ( jbo != null ) {
+ for (IRmJob j : jbo.values()) {
+ can_use += j.getJobCap();
+ }
+ }
+ }
+ if ( (can_use - given) > 0 ) {
+ return o;
+ }
+ }
+ return 0;
+ }
+ */
+
+ void updateNodepool(NodePool np)
+ {
+ //String methodName = "updateNodepool";
+
+// for ( int k : nSharesByOrder.keySet() ) {
+// np.countOutNSharesByOrder(k, nSharesByOrder.get(k));
+// }
+
+ if ( given_by_order == null ) return; // nothing given, nothing to adjust
+
+ for ( int o = NodePool.getMaxOrder(); o > 0; o-- ) {
+ np.countOutNSharesByOrder(o, given_by_order[o]);
+ }
+ }
+
+ public int getPriority()
+ {
+ return priority;
+ }
+
+ public void clearShares()
+ {
+ //class_shares = 0;
+ given_by_order = null;
+ subpool_counted = false;
+ }
+
+ public void markSubpoolCounted()
+ {
+ subpool_counted = true;
+ }
+
+ void addJob(IRmJob j)
+ {
+ allJobs.put(j, j);
+
+ int order = j.getShareOrder();
+ HashMap<IRmJob, IRmJob> jbo = jobsByOrder.get(order);
+ if ( jbo == null ) {
+ jbo = new HashMap<IRmJob, IRmJob>();
+ jobsByOrder.put(order, jbo);
+ max_job_order = Math.max(max_job_order, order);
+ }
+ jbo.put(j, j);
+
+ User u = j.getUser();
+ jbo = jobsByUser.get(u);
+ if ( jbo == null ) {
+ jbo = new HashMap<IRmJob, IRmJob>();
+ jobsByUser.put(u, jbo);
+ }
+ jbo.put(j, j);
+
+ }
+
+ void removeJob(IRmJob j)
+ {
+ if ( ! allJobs.containsKey(j) ) {
+ throw new SchedulingException(j.getId(), "Priority class " + getName() + " cannot find job to remove.");
+ }
+
+ allJobs.remove(j);
+
+ int order = j.getShareOrder();
+ HashMap<IRmJob, IRmJob> jbo = jobsByOrder.get(order);
+ jbo.remove(j);
+ if ( jbo.size() == 0 ) {
+ jobsByOrder.remove(order);
+
+ for ( int o = order - 1; o > 0; o-- ) {
+ if ( jobsByOrder.containsKey(o) ) {
+ max_job_order = o;
+ break;
+ }
+ }
+ }
+
+// if ( jbo.size() == 0 ) {
+// jobsByOrder.remove(order);
+// }
+
+ User u = j.getUser();
+ jbo = jobsByUser.get(u);
+ jbo.remove(j);
+ if ( jbo.size() == 0 ) {
+ jobsByUser.remove(u);
+ }
+ }
+
+ int countJobs()
+ {
+ return allJobs.size();
+ }
+
+ /**
+ int countJobs(int order)
+ {
+ if ( jobsByOrder.containsKey(order) ) {
+ return jobsByOrder.get(order).size();
+ } else {
+ return 0;
+ }
+ }
+*/
+ /**
+ HashMap<IRmJob, IRmJob> getJobsOfOrder(int order)
+ {
+ return jobsByOrder.get(order);
+ }
+*/
+
+ /**
+ int sumAllWeights()
+ {
+ return allJobs.size() * share_weight;
+ }
+*/
+ /**
+ * The total weights of all jobs of order 'order' or less.
+ *
+ int sumAllWeights(int order)
+ {
+ int sum = 0;
+ for ( int o = order; o > 0; o-- ) {
+ if ( jobsByOrder.containsKey(o) ){
+ sum++;
+ }
+ }
+ return sum * share_weight;
+ }
+*/
+ /**
+ * Returns total N-shares wanted by order. Processes of size order.
+ */
+ private int countNSharesWanted(int order)
+ {
+ int K = 0;
+
+ // First sum the max shares all my jobs can actually use
+ HashMap<IRmJob, IRmJob> jobs = jobsByOrder.get(order);
+ if ( jobs == null ) {
+ return 0;
+ }
+
+ for ( IRmJob j : jobs.values() ) {
+ K += j.getJobCap();
+ }
+
+ return K;
+ }
+
+ public void initWantedByOrder(ResourceClass unused)
+ {
+ int ord = NodePool.getMaxOrder();
+ wanted_by_order = NodePool.makeArray();
+ for ( int o = ord; o > 0; o-- ) {
+ wanted_by_order[o] = countNSharesWanted(o);
+ wanted_by_order[0] += wanted_by_order[o];
+ }
+ }
+
+ public int[] getWantedByOrder()
+ {
+ return wanted_by_order;
+ }
+
+ public int[] getGivenByOrder()
+ {
+ return given_by_order;
+ }
+
+ public void setGivenByOrder(int[] gbo)
+ {
+ if ( given_by_order == null ) { // Can have multiple passes, don't reset on subsequent ones.
+ this.given_by_order = gbo; // Look carefuly at calculateCap() below for details.
+ }
+ }
+
+ public int calculateCap(int order, int basis)
+ {
+ int perccap = Integer.MAX_VALUE; // the cap, calculated from percent
+ int absolute = getAbsoluteCap();
+ double percent = getPercentCap();
+
+ if ( percent < 1.0 ) {
+ double b = basis;
+ b = b * percent;
+ perccap = (int) Math.round(b);
+ } else {
+ perccap = basis;
+ }
+
+ int cap = Math.min(absolute, perccap) / order; // cap on total shares available
+
+ //
+ // If this RC is defined over a nodepool that isn't the global nodepool then its share
+ // gets calculated multiple times. The first time when it is encountered during the
+ // depth-first traversal, to work out the fair-share for all classes defined over the
+ // nodepool. Subpool resources are also available to parent pools however, and must
+ // be reevaluated to insure the resources are fairly allocated over the larger pool
+ // of work.
+ //
+ // So at this point there might already be shares assigned. If so, we need to
+ // recap on whatever is already given to avoid over-allocating "outside" of the
+ // assigned shares.
+ //
+ if ( (given_by_order != null) && subpool_counted ) {
+ cap = Math.min(cap, given_by_order[order]);
+ } // else - never been counted or assigned at this order, no subpool cap
+
+ return cap;
+ }
+
+
+
+ /**
+ * Get capped number of quantum shares this resource class can support.
+ *
+ int countCappedQShares(int[] tmpSharesByOrder)
+ {
+ int K = 0;
+ for ( IRmJob j : allJobs.values() ) {
+ int order = j.getShareOrder();
+ K += (Math.min(j.getJobCap(), tmpSharesByOrder[order] * order));
+ }
+ return K;
+ }
+ */
+ /**
+ * Sum all the capped shares of the jobs. Then cap on the physical cap and
+ * again on the (possible) nodepool cap.
+ *
+ int countCappedNShares(int physicalCap, int order)
+ {
+ int K = 0;
+
+ // First sum the max shares all my jobs can actually use
+ HashMap<IRmJob, IRmJob> jobs = jobsByOrder.get(order);
+ for ( IRmJob j : jobs.values() ) {
+ K += j.getJobCap();
+ }
+
+ // cap by what is physically there
+ K = Math.min(K, physicalCap);
+ return K;
+ }
+*/
+
+ /**
+ * Sum all the capped shares of the jobs. Then cap on the physical cap and
+ * again on the (possible) nodepool cap.
+
+ TODO: this seems all wrong.
+ int getSubpoolCap(int order)
+ {
+ if ( subpool_counted ) { // share are dirty, cap is valid
+ if( given_by_order[order] == 0 ) {
+ //if ( !nSharesByOrder.containsKey(order) ) {
+ return Integer.MAX_VALUE;
+ }
+ return given_by_order[order];
+ } else {
+ return Integer.MAX_VALUE; // shares are clean, no cap
+ }
+ }
+ */
+
+// int countCappedShares()
+// {
+// int count = 0;
+// for ( IRmJob j : allJobs.values() ) {
+// count += j.getJobCap();
+// }
+// return count;
+// }
+
+ /**
+ int countSharesByOrder(int order)
+ {
+ if ( nSharesByOrder.containsKey(order) ) {
+ return nSharesByOrder.get(order);
+ }
+ return 0;
+ }
+*/
+ /**
+ int[] getSharesByOrder(int asize)
+ {
+ int[] answer = new int[asize];
+ for ( int i = 0; i < asize; i++ ) {
+ if ( nSharesByOrder.containsKey(i) ) {
+ answer[i] = nSharesByOrder.get(i);
+ } else {
+ answer[i] = 0;
+ }
+ }
+ return answer;
+ }
+*/
+ public boolean hasSharesGiven()
+ {
+ return ( (given_by_order != null) && (given_by_order[0] > 0) );
+ }
+
+ private int countActiveShares()
+ {
+ int sum = 0;
+ for ( IRmJob j : allJobs.values() ) {
+ sum += (j.countNShares() * j.getShareOrder()); // in quantum shares
+ }
+ return sum;
+ }
+
+ /**
+ * Get the highest order of any job in this class.
+
+ protected int getMaxOrder()
+ {
+ int max = 0;
+ for ( IRmJob j : allJobs.values() ) {
+ max = Math.max(max, j.getShareOrder());
+ }
+ return max;
+ }
+ */
+
+ HashMap<IRmJob, IRmJob> getAllJobs()
+ {
+ return allJobs;
+ }
+
+ HashMap<Integer, HashMap<IRmJob, IRmJob>> getAllJobsByOrder()
+ {
+ return jobsByOrder;
+ }
+
+ HashMap<User, HashMap<IRmJob, IRmJob>> getAllJobsByUser()
+ {
+ return jobsByUser;
+ }
+
+ ArrayList<IRmJob> getAllJobsSorted(Comparator<IRmJob> sorter)
+ {
+ ArrayList<IRmJob> answer = new ArrayList<IRmJob>();
+ answer.addAll(allJobs.values());
+ Collections.sort(answer, sorter);
+ return answer;
+ }
+
+ int getMaxJobOrder()
+ {
+ return max_job_order;
+ }
+
+ int makeReadable(int i)
+ {
+ return (i == Integer.MAX_VALUE ? -1 : i);
+ }
+
+ /**
+ * Stringify the share tables for the logs
+ */
+
+ /**
+ public String tablesToString()
+ {
+ ArrayList<Integer> keys = new ArrayList<Integer>();
+ keys.addAll(nSharesByOrder.keySet());
+ Collections.sort(keys);
+ int max = 0;
+ for ( int k : keys ) {
+ max = Math.max(max, k);
+ }
+ String[] values = new String[max+1];
+ values[0] = id;
+ for ( int i = 1; i < max+1; i++) {
+ if ( !nSharesByOrder.containsKey(i) ) {
+ values[i] = "0";
+ } else {
+ values[i] = Integer.toString(nSharesByOrder.get(i));
+ }
+ }
+ StringBuffer sb = new StringBuffer();
+ sb.append("%23s:");
+ for ( int i = 0; i < max; i++ ) {
+ sb.append("%s ");
+ }
+
+ return String.format(sb.toString(), (Object[]) values);
+ }
+ */
+
+ // note we assume Nodepool is the last token so we don't set a len for it!
+ private static String formatString = "%12s %11s %4s %5s %5s %5s %6s %6s %7s %6s %6s %7s %5s %7s %s";
+ public static String getDashes()
+ {
+ return String.format(formatString, "------------", "-----------", "----", "-----", "-----", "-----", "------", "------", "-------", "------", "------", "-------", "-----", "-------", "--------");
+ }
+
+ public static String getHeader()
+ {
+ return String.format(formatString, "Class Name", "Policy", "Prio", "Wgt", "MinSh", "MaxSh", "AbsCap", "PctCap", "InitCap", "Dbling", "Prdct", "PFudge", "Shr", "Enforce", "Nodepool");
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return id.hashCode();
+ }
+
+ public String toString() {
+ return String.format("%12s %11s %4d %5d %5d %5d %6d %6d %7d %6s %6s %7d %5d %7s %s",
+ id,
+ policy.toString(),
+ priority,
+ share_weight,
+ makeReadable(min_shares),
+ makeReadable(max_processes),
+ makeReadable(absolute_cap),
+ (int) (percent_cap *100),
+ initialization_cap,
+ expand_by_doubling,
+ use_prediction,
+ prediction_fudge,
+ countActiveShares(),
+ enforce_memory,
+ nodepoolName
+ );
+ }
+
+ public String toStringWithHeader()
+ {
+ StringBuffer buf = new StringBuffer();
+
+
+ buf.append(getHeader());
+ buf.append("\n");
+ buf.append(toString());
+ return buf.toString();
+ }
+
+ public Comparator<IEntity> getApportionmentSorter()
+ {
+ return apportionmentSorter;
+ }
+
+ static private class ApportionmentSorterCl
+ implements Comparator<IEntity>
+ {
+ public int compare(IEntity e1, IEntity e2)
+ {
+ // we want a consistent sort, that favors higher share weights
+ if ( e1 == e2 ) return 0;
+ int w1 = e1.getShareWeight();
+ int w2 = e2.getShareWeight();
+ if ( w1 == w2 ) {
+ return e1.getName().compareTo(e2.getName());
+ }
+ return (int) (w2 - w1);
+ }
+ }
+
+}
+
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,1334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+/**
+ * This class represents a job inside the scheduler.
+ */
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
+
+
+public class RmJob
+ implements SchedConstants,
+ IRmJob
+{
+ DuccLogger logger = DuccLogger.getLogger(RmJob.class, COMPONENT_NAME);
+ static final int DEFAULT_NTHREADS = 4;
+
+ protected DuccId id; // sched-assigned id (maybe delegate to job manager eventually)
+ protected DuccType ducc_type; // for messages so we can tell what kind of job
+ protected String name; // user's name for job
+ protected String resource_class_name; // Name of the res class, from incoming job parms
+ protected ResourceClass resource_class; // The actual class, assigned as job is received in scheduler.
+ protected int user_priority; // user "priority", really apportionment
+
+ protected int n_machines; // RESERVE: minimum machines to allocate
+ protected int min_shares; // FIXED_SHARE: minimum N shares to allocate
+ protected int max_shares; // FAIR_SHARE: maximum N shares to allocate
+ protected boolean is_reservation = false;
+
+ protected int threads; // threads per process
+
+ protected int memory; // estimated memory usage
+ protected int nquestions; // number of work-items in total
+ protected int nquestions_remaining; // number of uncompleted work items
+ protected double time_per_item = 0.0; // from OR - mean time per work item
+
+ protected int share_order = 0; // How many shares per process this job requires (calculated on submission)
+
+ protected int share_cap = Integer.MAX_VALUE; // initially; scheduler policy will reset as the job ages
+ protected int pure_fair_share = 0; // pure uncapped un-bonused share for this job
+
+ protected long submit_time; // when job is submitted ( sched or job-manager sets this )
+
+ protected String username;
+ protected User user; // user id, enforced by submit and job manager. we just believe it in sched.
+
+ //
+ // We keep track of three things related to machines:
+ // 1. All the machines the job is running on.
+ // 2. The machines the job will be exanded to run on but which aren't yet dispatched
+ // 3. The machines to be removed from the job, but which the job is still running on
+ //
+ protected HashMap<Share, Share> assignedShares; // job is dispatched to these
+ protected HashMap<Share, Share> pendingShares ; // job is scheduled for these but not yet confirmed
+ protected HashMap<Share, Share> pendingRemoves; // job is scheduled to remove these but not confirmed
+ protected HashMap<Share, Share> recoveredShares; // recovery after bounce, need to reconnect these
+
+ // track shares by machine, and machines, to help when we have to give stuff away
+ Map<Machine, Map<Share, Share>> sharesByMachine = new HashMap<Machine, Map<Share, Share>>();
+ Map<Machine, Machine> machineList = new HashMap<Machine, Machine>();
+
+ // protected int shares_given; // during scheduling, how many N-shares we get
+ int[] given_by_order; // during scheduling, how many N-shares we get
+ int[] wanted_by_order; // during scheduling, how many N-shares we we want - volatile, changes during countJobsByOrder
+ protected boolean init_wait; // If True, we're waiting for orchestrator to tell us that init is successful.
+ // Until then, we give out only a very small share.
+ protected boolean completed = false; // RmJob can linger a bit after completion for the
+ // defrag code - must mark it complete
+
+ // For predicting end of job based on current rate of completion
+ protected int orchestrator_epoch;
+ protected int rm_rate;
+ protected int ducc_epoch;
+
+ protected Properties jobprops; // input that job is constructed from. currently is condensed from Blade logs (simulation only)
+
+ protected String refusalReason = null; // if refused, this is why, for the message
+
+ private static Comparator<IEntity> apportionmentSorter = new ApportionmentSorterCl();
+
+ protected RmJob()
+ {
+ }
+
+ public RmJob(DuccId id)
+ {
+ this.id = id;
+
+ orchestrator_epoch = SystemPropertyResolver.getIntProperty("ducc.orchestrator.state.publish.rate", 10000);
+ rm_rate = SystemPropertyResolver.getIntProperty("ducc.rm.state.publish.ratio", 4);
+ ducc_epoch = orchestrator_epoch * rm_rate;
+ }
+
+ // public RmJob(DuccId id, Properties properties)
+ // {
+ // this.jobprops = properties;
+ // this.id = id;
+ // }
+
+ /**
+ * read the props file and set inital values for internal fields
+ */
+ public void init()
+ {
+ assignedShares = new HashMap<Share, Share>();
+ pendingShares = new HashMap<Share, Share>();
+ pendingRemoves = new HashMap<Share, Share>();
+ recoveredShares = new HashMap<Share, Share>();
+
+ if ( max_shares == 0 ) max_shares = Integer.MAX_VALUE;
+ }
+
+ public DuccId getId()
+ {
+ return id;
+ }
+
+ public void setId(DuccId id)
+ {
+ this.id = id;
+ }
+
+ public long getFriendlyId()
+ {
+ return id.getFriendly();
+ }
+
+ public String getName()
+ {
+ return id.toString();
+ }
+
+ public void setJobName(String name)
+ {
+ this.name = name;
+ }
+
+ public void setReservation()
+ {
+ this.is_reservation = true;
+ }
+
+ public boolean isReservation()
+ {
+ return is_reservation;
+ }
+
+ public void markComplete()
+ {
+ completed = true;
+ }
+
+ public boolean isCompleted()
+ {
+ return completed;
+ }
+
+ /**
+ * The matrix:
+ * IW = am I in initializaton wait
+ * RU = am I runnable
+ * Resched = do we need to reschedule
+ * Reset = set IW to this
+ * x = don't care
+ *
+ * IW RU Resched Reset
+ * T T T F
+ * T F F T
+ * F T F F
+ * F F x x
+ *
+ * So resched = IR & RU
+ * IW = !RU
+ *
+ * We return resched so caller knows the tickle the scheduler
+ */
+ public boolean setInitWait(boolean is_running)
+ {
+ boolean resched = init_wait & is_running;
+ init_wait = !is_running;
+ return resched;
+ }
+
+ /**
+ * Save ref to the class we are in, and init class-based structures.
+ */
+ public void setResourceClass(ResourceClass cl)
+ {
+ this.resource_class = cl;
+ }
+
+ /**
+ * Number of questions submitted, all must be answered. This is used by job manager
+ * to know when they've all been dealt with.
+ */
+ public int nQuestions()
+ {
+ return nquestions;
+ }
+
+ public void setNQuestions(int allq, int remainingq, double time_per_item)
+ {
+ this.nquestions = allq;
+ this.nquestions_remaining = remainingq;
+ this.time_per_item = time_per_item;
+ }
+
+ /**
+ * Number of questions still to be answered. Used by scheduler to determing current
+ * machine requirement.
+ */
+ public int nQuestionsRemaining()
+ {
+ return nquestions_remaining;
+ }
+
+
+ public Map<Machine, Map<Share, Share>> getSharesByMachine()
+ {
+ return sharesByMachine;
+ }
+
+ public Map<Machine, Machine> getMachines()
+ {
+ return machineList;
+ }
+
+ /**
+ * There are a fair number of piddling little methods to manage shares. This high granularity is
+ * needed in order to manage bits and pieces of the bookkeeping from different threads and queues
+ * without blocking.
+ *
+ * TODO: maybe we can consolidate some of this after it's all worked out.
+ */
+
+ /**
+ * Before each scheduling epoch, clear the counting from the last time.
+ */
+ public void clearShares()
+ {
+ // this.shares_given = 0;
+ given_by_order = null;
+ }
+
+ /**
+ public void addQShares(int s)
+ {
+ this.shares_given += ( s / share_order ) ; // convert to N-shares == processes
+ }
+ */
+
+ public void setPureFairShare(int pfs)
+ {
+ this.pure_fair_share = pfs;
+ }
+
+ public int getPureFairShare()
+ {
+ return pure_fair_share;
+ }
+
+ public int[] getGivenByOrder()
+ {
+ return given_by_order;
+ }
+
+ public void setGivenByOrder(int[] gbo)
+ {
+ this.given_by_order = gbo;
+ }
+
+ public int getShareWeight()
+ {
+ return 1; // all jobs are the same currently
+ }
+
+ public void initWantedByOrder(ResourceClass unused)
+ {
+ wanted_by_order = NodePool.makeArray();
+ wanted_by_order[share_order] = getJobCap();
+ wanted_by_order[0] = wanted_by_order[share_order];
+ }
+
+ public int[] getWantedByOrder()
+ {
+ return wanted_by_order;
+ }
+
+ public int calculateCap(int order, int total)
+ {
+ return Integer.MAX_VALUE; // no cap for jobs
+ }
+
+
+ public int countNSharesGiven()
+ {
+ if ( given_by_order == null) { return 0; }
+ return given_by_order[share_order];
+ }
+
+ public int countQSharesGiven()
+ {
+ return countNSharesGiven() * share_order;
+ }
+
+ /**
+ * Number of N-shares I'm losing.
+ */
+ public int countNSharesLost()
+ {
+ return countNShares() - countNSharesGiven();
+ }
+
+// /**
+// * Can I use more N-shares; how many?
+// */
+// public int canUseBonus(int bonus, int[] nSharesByOrder)
+// {
+// int cap = getJobCap();
+// int can_use = Math.max(0, cap - shares_given); // what can we actually use?
+
+// if ( can_use > nSharesByOrder[share_order] ) { // can't use more than physicalliy exist
+// return 0;
+// }
+
+// for (int i = share_order; i <= Math.min(bonus, (nSharesByOrder.length - 1)); i++ ) {
+// if ( (nSharesByOrder[i] > 0) && (i <= can_use) ) {
+// return i ;
+// }
+// }
+// return 0;
+// }
+
+ /**
+ * Can I use more N-shares?
+
+ public int canUseBonus(int bonus, int[] nSharesByOrder)
+ {
+ int cap = getJobCap();
+ int can_use = Math.max(0, cap - countNSharesGiven()); // what can we actually use?
+
+ // if ( can_use > nSharesByOrder[share_order] ) { // can't use more than physicalliy exist
+ // return 0;
+ // }
+ if ( can_use == 0 ) {
+ return 0;
+ }
+ return ( nSharesByOrder[share_order] > 0 ) ? share_order : 0;
+ }
+ */
+
+ /**
+ * Officially allocated shares assigned to this job which are known to be in use.
+ */
+ public HashMap<Share, Share> getAssignedShares()
+ {
+ return assignedShares;
+ }
+
+ /**
+ * Shares recovered from the OR during job recovery.
+ */
+ public HashMap<Share, Share> getRecoveredShares()
+ {
+ return recoveredShares;
+ }
+
+ /**
+ * Newly allocated shares that have not been dispatched. They're unavailable for scheduling but
+ * job manager doesn't know about them yet. When we tell job manager we'll "promote" them to
+ * the assignedShares list.
+ */
+ public HashMap<Share, Share> getPendingShares()
+ {
+ return pendingShares;
+ }
+
+ /**
+ * We're dispatching, move machines to active list, and clear pending list.
+ * Tell caller which machines are affected so it can dispatch them.
+ */
+ public HashMap<Share, Share> promoteShares()
+ {
+ HashMap<Share, Share> answer = new HashMap<Share, Share>();
+ for ( Share s : pendingShares.values() ) {
+ assignedShares.put(s, s);
+ Machine m = s.getMachine();
+ machineList.put(m, m);
+ Map<Share, Share> machine_shares = sharesByMachine.get(m);
+ if ( machine_shares == null ) {
+ machine_shares = new HashMap<Share, Share>();
+ sharesByMachine.put(m, machine_shares);
+ }
+ machine_shares.put(s, s);
+
+ answer.put(s, s);
+ }
+ pendingShares.clear();
+ return answer;
+ }
+
+ /**
+ * This share is being donated to someone more needy than I - see defrag code in NodepoolScheduler
+ */
+ public void cancelPending(Share s)
+ {
+ pendingShares.remove(s);
+ }
+
+// public void assignReservation(Machine m)
+// {
+// reservations.add(m);
+// }
+//
+// public ArrayList<Machine> getReservations(Machine m)
+// {
+// return reservations;
+// }
+
+// public int countReservations()
+// {
+// return reservations.size();
+// }
+//
+// public boolean reservationComplete()
+// {
+// return reservations.size() == machines;
+// }
+
+ /**
+ * Scheduler found us a new toy, make it pending until it's given to job manager.
+ */
+ public void assignShare(Share s)
+ {
+ pendingShares.put(s, s);
+ }
+
+ /**
+ * Job recovery: OR reports this share as one it already knew about.
+ */
+ public void recoverShare(Share s)
+ {
+ if ( (! assignedShares.containsKey(s)) && (!pendingShares.containsKey(s)) ) {
+ recoveredShares.put(s, s);
+ }
+ }
+
+ public boolean isPendingShare(Share s )
+ {
+ return pendingShares.containsKey(s);
+ }
+
+ /**
+ * Do I have un-dispatched shares?
+ */
+ public boolean isExpanded()
+ {
+ return pendingShares.size() > 0;
+ }
+
+ public boolean isShrunken()
+ {
+ return pendingRemoves.size() > 0;
+ }
+
+ public boolean isStable()
+ {
+ return (
+ ( assignedShares.size() > 0 ) &&
+ ( pendingShares.size() == 0 ) &&
+ ( pendingRemoves.size() == 0 )
+ );
+ }
+
+ public boolean isDormant()
+ {
+ return (
+ ( assignedShares.size() == 0 ) &&
+ ( pendingShares.size() == 0 ) &&
+ ( pendingRemoves.size() == 0 )
+ );
+ }
+
+ public void removeAllShares()
+ {
+ String methodName = "removeAllShares";
+ if ( logger.isDebug() ) {
+ for ( Map<Share, Share> m : sharesByMachine.values() ) {
+ for ( Share s : m.values() ) {
+ logger.debug(methodName, getId(), "Clear share", s);
+ }
+ }
+ }
+ assignedShares.clear();
+ pendingShares.clear();
+ pendingRemoves.clear();
+ machineList.clear();
+ sharesByMachine.clear();
+ }
+
+ /**
+ * I've shrunk or this share has nothing left to do. Remove this specific share.
+ */
+ public void removeShare(Share share)
+ {
+ String methodName = "removeShare";
+
+ if ( assignedShares.containsKey(share) ) {
+ int prev = assignedShares.size();
+ assignedShares.remove(share);
+ pendingRemoves.remove(share);
+
+ Machine m = share.getMachine();
+ Map<Share, Share> machineShares = sharesByMachine.get(m);
+ machineShares.remove(share);
+ if ( machineShares.size() == 0 ) {
+ sharesByMachine.remove(m);
+ machineList.remove(m);
+ }
+
+ logger.debug(methodName, getId(), "Job removes ", share.toString(), " reduces from ", prev, " to ", assignedShares.size() + ".");
+ } else {
+ logger.warn(methodName, getId(), "****** Job cannot find share " + share.toString() + " to remove. ******");
+ }
+ }
+
+ /**
+ * Used by Fixed Share and by Reservations
+ */
+ public void shrinkByOne(Share share)
+ {
+ String methodName = "shrinkByOne";
+ if ( assignedShares.containsKey(share) ) {
+ logger.debug(methodName, getId(), "Job schedules " + share.toString() + " for removal.");
+ pendingRemoves.put(share, share);
+ share.evict();
+ } else {
+ logger.warn(methodName, getId(), "****** Job cannot find share " + share.toString() + " to schedule for removal.******");
+ }
+
+ }
+
+ /**
+ * Shrink by 'shares' from machines of largest order starting from 'order' and decreasing.
+ * Investment is not used, this is to shrink-from-largest-machine to MINIMIZE FRAGMENTATIOPN.
+ *
+ * This implementation is simplest, we just vacate. This is called in the sequence of highest
+ * order wanted so we're leaving the largest holes in the largest machines first.
+ *
+ * @param shares - number of N-shares that are wanted
+ * @param order - N - try to free up space for shares of this size.
+ * @param force - When evicting for non-preemptables, we may need to free the requested
+ * shares even if it puts us over our "fair" count. If this happens
+ * we'll end up "sliding" onto other machines (eventually).
+ * @param nodepool - only interested in shares from this nodepool.
+ *
+ * So when this is called, somebody needs (shares*order) shares, given out in chunks of
+ * (order).
+ *
+ * @returns number of Q-shares actually given up
+ */
+ public int shrinkByOrderByMachine(int shares, int order, boolean force, NodePool nodepool)
+ {
+ String methodName = "shrinkByOrderByMachine";
+
+ if ( shares <= 0 ) {
+ throw new SchedulingException(getId(), "Trying to shrink by " + shares + " shares.");
+ }
+
+ // These are the machines where I have stuff running.
+ ArrayList<Machine> machinesSorted = new ArrayList<Machine>();
+ for ( Machine m : machineList.values() ) {
+ if ( (m.getNodepool() == nodepool) && ( m.getShareOrder() >= order) ) {
+ machinesSorted.add(m);
+ }
+ }
+ Collections.sort(machinesSorted, new MachineByOrderSorter());
+
+ int given = 0;
+ int shares_to_lose = 0;
+
+ //
+ // How much to lose? If we're not forcing, then only shares that are evicted because of
+ // the 'howMuch' counts. If forcing then everything until we meet the goal or we run
+ // out of stuff to give.
+ //
+ if ( force ) {
+ shares_to_lose = countNShares();
+ } else {
+ shares_to_lose = Math.max(0, countNShares() - countNSharesGiven());
+ }
+ if ( shares_to_lose == 0 ) {
+ return 0;
+ }
+
+ for ( Machine m : machinesSorted ) {
+
+ logger.debug(methodName, getId(), "Inspecting machine", m.getId());
+ ArrayList<Share> slist = new ArrayList<Share>();
+
+ for ( Share s : sharesByMachine.get(m).values() ) { // get the still-eligible shares
+ if ( ! s.isEvicted() ) {
+ slist.add(s);
+ }
+ }
+ if ( slist.size() == 0 ) {
+ continue;
+ }
+
+ int to_give = m.countFreedUpShares();
+ logger.debug(methodName, getId(), "A given:", given, "to_give:", to_give, "order", order, "shares", shares, "shares_to_lose", shares_to_lose);
+
+ Iterator<Share> iter = slist.iterator();
+ while ( iter.hasNext() && ( (given + (to_give/order)) < shares ) && (shares_to_lose > 0) ) {
+ Share s = iter.next();
+ logger.info(methodName, getId(), "Removing share", s.toString());
+ pendingRemoves.put(s, s);
+ s.evict();
+ to_give += share_order;
+ shares_to_lose--;
+ }
+
+
+ given += (to_give / order);
+ if ( given >= shares ) {
+ break;
+ }
+ }
+ return given;
+ }
+
+ /**
+ * Shrink by 'shares' from machines of largest order starting from 'order' and decreasing.
+ * Investment is not used, this is a shrink-from-largest-machine to MINIMIZE LOST WORK at the
+ * possibl eexpense of fragmentation.
+ *
+ * @param shares - number of N-shares that are wanted
+ * @param order - N - try to free up space for shares of this size.
+ * @param force - When evicting for non-preemptables, we may need to free the requested
+ * shares even if it puts us over our "fair" count. If this happens
+ * we'll end up "sliding" onto other machines (eventually).
+ * @param nodepool - only interested in shares from this nodepool.
+ *
+ * So when this is called, somebody needs (shares*order) shares, given out in chunks of
+ * (order).
+ *
+ * @returns number of Q-shares actually given up
+ */
+ public int shrinkByInvestment(int shares, int order, boolean force, NodePool nodepool)
+ {
+ String methodName = "shrinkByInvestment";
+
+ if ( shares <= 0 ) {
+ throw new SchedulingException(getId(), "Trying to shrink by " + shares + " shares.");
+ }
+
+ logger.debug(methodName, getId(), "Enter: shares", shares, "order", order, "force", force, "nodepool", nodepool.getId(),
+ "nAssignedShares", assignedShares.size(), "nPendingShares", pendingShares.size());
+
+ ArrayList<Share> sharesSorted = new ArrayList<Share>();
+
+ // must pick up only shares in the given nodepool
+ for ( Share s : assignedShares.values() ) {
+ if ( s.getNodepoolId().equals(nodepool.getId()) && ( !s.isEvicted() ) ) {
+ sharesSorted.add(s);
+ } else {
+ if ( logger.isTrace () ) {
+ logger.trace(methodName, getId(), "Skipping", s.getId(), "s.nodepool", s.getNodepoolId(), "incoming.nodepool", nodepool.getId(), "evicted", s.isEvicted());
+ }
+ }
+ }
+
+ if ( sharesSorted.size() == 0 ) {
+ return 0;
+ }
+
+
+ if ( logger.isTrace() ) {
+ logger.trace(methodName, getId(), "Shares Before Sort - id, isInitialized, investment:");
+ for ( Share s : sharesSorted ) {
+ logger.trace(methodName, getId(), s.getId(), s.isInitialized(), s.getInvestment());
+ }
+ }
+
+ Collections.sort(sharesSorted, new ShareByInvestmentSorter());
+
+ if ( logger.isTrace() ) {
+ logger.trace(methodName, getId(), "Shares After Sort - id, isInitialized, investment:");
+ for ( Share s : sharesSorted ) {
+ logger.trace(methodName, getId(), s.getId(), s.isInitialized(), s.getInvestment());
+ }
+ }
+
+
+ //
+ // How much to lose? If we're not forcing, then only shares that are evicted because of
+ // the 'howMuch' counts. If forcing then everything until we meet the goal or we run
+ // out of stuff to give.
+ //
+ int shares_given = 0; // number of shares of requested order given - NOT necessarily number of my own processes
+ int processes_to_lose = 0; // number of processes I'm able to lose
+ int processes_given = 0;
+
+ if ( force ) {
+ processes_to_lose = countNShares();
+ } else {
+ processes_to_lose = Math.max(0, countNShares() - countNSharesGiven());
+ }
+ processes_to_lose = Math.min(processes_to_lose, sharesSorted.size());
+
+ if ( processes_to_lose == 0 ) {
+ return 0;
+ }
+
+ while ( (shares_given < shares) && (processes_given < processes_to_lose) ) {
+
+ int currently_given = 0;
+
+ if ( logger.isTrace() ) {
+ logger.trace(methodName, getId(), "In loop: Shares given", shares_given, "shares wanted", shares,
+ "processes_to_lose", processes_to_lose, "processes_given", processes_given);
+ }
+
+ Share s = sharesSorted.get(0);
+ Machine m = s.getMachine();
+ int to_give = m.countFreedUpShares();
+ logger.debug(methodName, getId(), "Inspecting share", s.getId());
+ ArrayList<Share> slist = new ArrayList<Share>();
+
+ Iterator<Share> iter = sharesSorted.iterator();
+ while ( (to_give < order) && iter.hasNext() ) { // if we need more shares from this machine to be useful ...
+ // Here we search the share list for enough more shares on the machine to make up enough shares
+ // to satisy exactly one of the requested sizes.
+ Share ss = iter.next();
+ if ( ss.getMachine() == s.getMachine() ) {
+ slist.add(ss);
+ to_give += ss.getShareOrder();
+ }
+ }
+
+ if ( to_give >= order ) { // did we find enough on the machine to make it useful to evict?
+ //slist.add(s); // didn't put on the list earlier, in case we couldn't use it
+ for ( Share ss : slist ) {
+ logger.info(methodName, getId(), "Removing share", ss.toString());
+ pendingRemoves.put(ss, ss);
+ ss.evict();
+
+ sharesSorted.remove(ss);
+ processes_given++;
+ currently_given++;
+ if ( processes_given >= processes_to_lose ) break; // if this is too few to be useful, defrag will fix it (mostly)
+ }
+ shares_given += (to_give / order);
+ }
+
+ //
+ // If we gave nothing away we didn't change any of the structures and we'll
+ // never exit. So exit stage left asap right now.
+ // We rarely if ever will enter this but it prevents an infinite loop in
+ // varioius corner cases.
+ //
+ if ( currently_given == 0 ) {
+ logger.debug(methodName, getId(), "Gave no shares, breaking loop");
+ break;
+ }
+ }
+
+ return shares_given;
+ }
+
+ /**
+ * I'm shrinking by this many. Put into pendingRemoves but don't take them off the assigned list
+ * until we know job manager has been told.
+ *
+ * TODO: this will likely change when the Scheduler starts deciding - either we'll put decision logic
+ * into Job proper, or defer it to the IScheduler implementation (probably the latter).
+ *
+ * @Deprecated. Keeping until I can nuke or update the simple fair share code.
+ */
+ public void shrinkTo(int k)
+ {
+// int count = assignedShares.size() - k;
+// for ( int i = 0; i < count; i++ ) {
+// pendingRemoves.add(assignedShares.get(i));
+// }
+ }
+
+ /**
+ * Waiting for somebody to deal with my shrinkage?
+ */
+ public boolean isShrunk()
+ {
+ return pendingRemoves.size() > 0;
+ }
+
+ /**
+ * Return the reclaimed shares.
+ */
+ public HashMap<Share, Share> getPendingRemoves()
+ {
+ return pendingRemoves;
+ }
+
+ /**
+ * And finally, dump the pending shrinkage.
+ */
+ public void clearPendingRemoves()
+ {
+ pendingRemoves.clear();
+ }
+
+ /**
+ * Recovery complete, clear the share map
+ */
+ public void clearRecoveredShares()
+ {
+ recoveredShares.clear();
+ }
+
+// public Machine removeLastMachine()
+// {
+// return assignedMachines.remove(assignedMachines.size() - 1);
+// }
+
+ /**
+ * Find number of nShares (virtual shares) this machine has assigned already.
+ *
+ * If things are slow in the cluster the pending removes might be
+ * non-zero. This is an extreme corner case it's best to be safe.
+ */
+ public int countNShares()
+ {
+ return assignedShares.size() + pendingShares.size() - pendingRemoves.size();
+ }
+
+ public void refuse(String refusal)
+ {
+ String methodName = "refusal";
+ logger.warn(methodName, id, refusal);
+ this.refusalReason = refusal;
+ }
+
+ public boolean isRefused()
+ {
+ return (refusalReason != null);
+ }
+
+ public String getRefusalReason()
+ {
+ return refusalReason;
+ }
+
+ public void setShareOrder(int s)
+ {
+ this.share_order = s;
+ }
+
+ public int getShareOrder()
+ {
+ return share_order;
+ }
+
+ /**
+ * During the scheduling algorithm we want to track some things by userid. The "share cap" stuff is used
+ * to keep track of max shares that I can actually use or want during scheduling but is generally just
+ * nonsense.
+ */
+ public void setShareCap(int cap)
+ {
+ this.share_cap = cap;
+ }
+
+ public int getShareCap()
+ {
+ return share_cap;
+ }
+
+ /**
+ * We try to project the maximum number of shares that this job can use, based on the current rate
+ * of completion of work items, and the known initialization time.
+ *
+ * Many jobs have very long initialization times, and will complete in their current allocation before
+ * new processes can get started and initialized. We want to avoid growing (evictions) in that case.
+ *
+ * How to use this ...
+ */
+ int getProjectedCap()
+ {
+ String methodName = "getProjectedCap";
+ if ( init_wait ) { // no cap if not initialized, because we don't know. other caps will dominate.
+ return Integer.MAX_VALUE;
+ }
+
+ if ( time_per_item == Double.NaN ) {
+ return Integer.MAX_VALUE;
+ }
+
+ // Get average init time
+ int count = 0;
+ long total_init = 0;
+ for ( Share s : assignedShares.values() ) {
+ long t = s.getInitializationTime();
+ if ( t > 0 ) {
+ count++;
+ total_init += t;
+ }
+ }
+ long avg_init = 0;
+ if ( total_init > 0 ) {
+ avg_init = total_init / count; // (to seconds)
+ } else {
+ logger.warn(methodName, getId(), username, "Initialization time is 0, project cap and investment will be inaccurate.");
+ }
+
+ // When in the future we want to estimate the amount of remaining work.
+ long target = avg_init + ducc_epoch + resource_class.getPredictionFudge();
+
+ int nprocesses = countNShares();
+ double rate = ((double) (nprocesses * threads)) / time_per_item; // number of work items per second
+ // with currently assigned resources
+ long projected = Math.round(target * rate); // expected WI we can do after a new
+ // process gets started
+
+ long future = Math.max(nquestions_remaining - projected, 0); // work still to do after doubling
+
+ logger.info(methodName, getId(), username, "O", getShareOrder(),"T", target, "NTh", (nprocesses * threads), "TI", avg_init, "TR", time_per_item,
+ "R", rate, "QR", nquestions_remaining, "P", projected, "F", future,
+ "return", (future / threads));
+
+ int answer = (int) future / threads;
+ if ( (future % threads ) > 0 ) answer++;
+
+ return answer; // number of processes we expect to need
+ // in the future
+
+ }
+
+ /**
+ * This returns the largest number that can actually be used, which will be either the
+ * share cap itself, or nProcess / nThreads, in N shares.
+ */
+ public int getJobCap()
+ {
+ String methodName = "getJobCap";
+
+ if ( isRefused() ) {
+ return 0;
+ }
+
+ int c = nquestions_remaining / threads;
+
+ if ( ( nquestions_remaining % threads ) > 0 ) {
+ c++;
+ }
+
+ c = Math.max(c, assignedShares.size()); // if job is ending we could be fragmented and have to be
+ // careful not to underestimate, or we end up possibly
+ // evicting something that should be left alone.
+
+ //
+ // If waiting for initialization, we have to cap as well on the maximum number of shares
+ // we give out, in case the job can't start, to avoid unnecessary preemption.
+ //
+ // Must convert to N-shares, because that is the number of actual processes, which is the
+ // unit that the initialization cap is specified in.
+ //
+
+ int base_cap = Math.min(getMaxShares(), c);
+ int projected_cap = getProjectedCap();
+
+ int potential_cap = base_cap;
+ int actual_cap = 0;
+
+ if ( resource_class.isUsePrediction() ) {
+ if (projected_cap < base_cap ) { // If we project less need, revise the estimate down
+ potential_cap = Math.max(projected_cap, assignedShares.size());
+ }
+ }
+
+ if ( init_wait && ( resource_class.getInitializationCap() > 0) ) {
+ actual_cap = Math.min(potential_cap, (resource_class.getInitializationCap()));
+ } else {
+
+ if ( init_wait ) { // ugly, but true, if not using initialization caps
+ actual_cap = potential_cap;
+ } else if ( resource_class.isExpandByDoubling() ) {
+ if ( (assignedShares.size() == 0) ) {
+ actual_cap = Math.max(1, resource_class.getInitializationCap()); // if we shrink to 0, need to restart from the init cap
+ } else {
+ actual_cap = Math.min(potential_cap, assignedShares.size() * 2);
+ }
+ } else {
+ actual_cap = potential_cap;
+ }
+ }
+
+ logger.debug(methodName, getId(), username, "O", getShareOrder(), "Base cap:", base_cap, "Expected future cap:", projected_cap, "potential cap", potential_cap, "actual cap", actual_cap);
+ return actual_cap;
+ }
+
+ public int getMaxShares()
+ {
+ // if set to -1, our max is the number already assigned
+ if ( max_shares < 0 ) {
+ return countNShares();
+ } else {
+ return max_shares;
+ }
+ }
+
+ public int getMinShares()
+ {
+ return min_shares;
+ }
+
+ public void setMaxShares(int s)
+ {
+ this.max_shares = s;
+ }
+
+ public void setMinShares(int s)
+ {
+ this.min_shares = s;
+ }
+
+ public boolean isRunning()
+ {
+ return countNShares() > 0 ? true : false;
+ }
+
+ public String getUserName()
+ {
+ return username;
+ }
+
+ public void setUserName(String n)
+ {
+ this.username = n;
+ }
+
+ public User getUser()
+ {
+ return user;
+ }
+
+ public void setUser(User u)
+ {
+ this.user = u;
+ }
+
+ public long getTimestamp()
+ {
+ return submit_time;
+ }
+
+ public void setTimestamp(long t)
+ {
+ this.submit_time = t;
+ }
+
+ public int getUserPriority() {
+ return user_priority;
+ }
+
+ public void setUserPriority(int p) {
+ this.user_priority = p;
+ }
+
+ public String getClassName() {
+ return resource_class_name;
+ }
+
+ public void setClassName(String class_name) {
+ this.resource_class_name = class_name;
+ }
+
+ public int getSchedulingPriority() {
+ return resource_class.getPriority();
+ }
+
+ public Policy getSchedulingPolicy() {
+ return resource_class.getPolicy();
+ }
+
+ public ResourceClass getResourceClass() {
+ return resource_class;
+ }
+
+ public int countInstances() {
+ return n_machines;
+ }
+
+ public void setNInstances(int m)
+ {
+ this.n_machines = m;
+ }
+
+ public int getMaxMachines() {
+ return min_shares;
+ }
+
+ public int nThreads() {
+ return threads;
+ }
+ public void setThreads(int th)
+ {
+ this.threads = th;
+ }
+
+ public int getMemory() {
+ return memory;
+ }
+
+ public void setMemory(int memory) {
+ this.memory = memory;
+ }
+
+ public void setDuccType(DuccType type)
+ {
+ this.ducc_type = type;
+ }
+
+ public DuccType getDuccType()
+ {
+ return this.ducc_type;
+ }
+
+ /**
+ * Is at least one of my current shares initialized?
+ */
+ public boolean isInitialized()
+ {
+ for (Share s : assignedShares.values()) {
+ if ( s.isInitialized() ) return true;
+ }
+ return false;
+ }
+
+ /**
+ * Logging and debugging - nice to know what my assigned shares are.
+ */
+ public String printShares()
+ {
+ StringBuffer buf = new StringBuffer("AssignedShares: ");
+ if ( assignedShares.size() == 0 ) {
+ buf.append("<none>");
+ } else {
+ for ( Share s : assignedShares.values()) {
+ buf.append(s.getId());
+ buf.append(" ");
+ }
+ }
+
+ buf.append("\nPendingShares: ");
+ if ( pendingShares.size() == 0 ) {
+ buf.append("<none>");
+ } else {
+ for ( Share s : pendingShares.values()) {
+ buf.append(s.getId());
+ buf.append(" ");
+ }
+ }
+
+ buf.append("\nPendingRemoves: ");
+ if ( pendingRemoves.size() == 0 ) {
+ buf.append("<none>");
+ } else {
+ for ( Share s : pendingRemoves.values()) {
+ buf.append(s.getId());
+ buf.append(" ");
+ }
+ }
+
+ return buf.toString();
+ }
+
+ String getShortType()
+ {
+ String st = "?";
+ switch ( ducc_type ) {
+ case Reservation:
+ st = "R";
+ break;
+ case Job:
+ st = "J";
+ break;
+ case Service:
+ st = "S";
+ break;
+ }
+ return st;
+ }
+
+ public static String getHeader()
+ {
+ return String.format("%6s %30s %10s %10s %6s %5s %13s %8s %6s %9s %11s %8s",
+ "ID", "JobName", "User", "Class",
+ "Shares", "Order", "QuantumShares",
+ "NThreads", "Memory",
+ "Questions", "Q Remaining", "InitWait");
+ }
+
+ public String toString()
+ {
+ int shares = assignedShares.size() + pendingShares.size();
+
+ if ( isReservation() ) {
+ return String.format("%1s%5s %30.30s %10s %10s %6d",
+ getShortType(), id.toString(), name, username, getClassName(), shares);
+
+ } else {
+ return String.format("%1s%5s %30.30s %10s %10s %6d %5d %13d %8d %6d %9d %11d %8s",
+ getShortType(),
+ id.toString(), name, username, getClassName(),
+ shares, share_order, (shares * share_order),
+ threads, memory,
+ nQuestions(), nQuestionsRemaining(),
+ init_wait);
+ }
+ }
+
+ public String toStringWithHeader()
+ {
+ StringBuilder buf = new StringBuilder(getHeader());
+ buf.append("\n");
+ buf.append(toString());
+ return buf.toString();
+ }
+
+ //
+ // Order machines by DECREASING order
+ //
+ class MachineByOrderSorter
+ implements Comparator<Machine>
+ {
+ public int compare(Machine m1, Machine m2)
+ {
+ return (int) (m2.getShareOrder() - m1.getShareOrder());
+ }
+ }
+
+ //
+ // Order shares by INCREASING investment
+ //
+ class ShareByInvestmentSorter
+ implements Comparator<Share>
+ {
+ public int compare(Share s1, Share s2)
+ {
+ //
+ // First divide them into two pools:
+ // not-initialized shares always sort LESS than initialized shares
+ if ( ! s1.isInitialized() ) {
+ if ( s2.isInitialized() ) return -1;
+ // both not initialized. sort on less time spent initializing so far (fall through)
+ } else {
+ if ( ! s2.isInitialized() ) return 1;
+ // bot initialized. Again sort on less time spent ever in init. (fall through)
+ }
+
+ return ( (int) (s1.getInvestment() - s2.getInvestment()) );
+ }
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return id.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+
+ if (this == obj) return true;
+
+ if (obj == null) return false;
+
+
+ if (getClass() != obj.getClass()) return false;
+
+ IRmJob other = (IRmJob) obj;
+
+
+ // can't get null id
+ if ( !id.equals(other.getId()) ) return false;
+
+ //can't get null shares.. normal compare should finish it off.
+ return assignedShares.equals(other.getAssignedShares());
+ }
+
+ public Comparator<IEntity> getApportionmentSorter()
+ {
+ return apportionmentSorter;
+ }
+
+ static private class ApportionmentSorterCl
+ implements Comparator<IEntity>
+ {
+ public int compare(IEntity e1, IEntity e2)
+ {
+ if ( e1 == e2 ) return 0;
+ return (int) (e1.getTimestamp() - e2.getTimestamp());
+ }
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedConstants.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedConstants.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedConstants.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedConstants.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+public interface SchedConstants
+{
+ public enum Policy {
+ FAIR_SHARE,
+ FIXED_SHARE,
+ RESERVE,
+ };
+
+ public enum EvictionPolicy {
+ SHRINK_BY_MACHINE, // shrink by largest machine first, to minimize fragmentation
+ SHRINK_BY_INVESTMENT, // shrink by lowest investment first, to minimize waste
+ };
+
+ public static final String COMPONENT_NAME = "RM";
+ public static final int DEFAULT_STABILITY_COUNT = 5;
+ public static final int DEFAULT_INIT_STABILITY_COUNT = 3;
+ public static final int DEFAULT_SCHEDULING_RATIO = 4;
+ public static final int DEFAULT_SCHEDULING_RATE = 60000;
+ public static final int DEFAULT_NODE_METRICS_RATE = 60000;
+
+ public static final int DEFAULT_PROCESSES = 10; // for jobs, number of processes if not specified
+ public static final int DEFAULT_INSTANCES = 1; // for reservations, number of instances if not specified
+
+ public static final int DEFAULT_MAX_PROCESSES = Integer.MAX_VALUE; // for jobs, the class cap, if not configured
+ public static final int DEFAULT_MAX_INSTANCES = Integer.MAX_VALUE; // for reservations, class cap, if not configured
+
+ public static final int DEFAULT_SHARE_WEIGHT = 100;
+ public static final int DEFAULT_PRIORITY = 10;
+
+}
+
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedConstants.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedInternalError.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedInternalError.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedInternalError.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedInternalError.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import org.apache.uima.ducc.common.utils.id.DuccId;
+
+@SuppressWarnings("serial")
+public class SchedInternalError
+ extends RuntimeException
+{
+ DuccId jobid;
+
+ public SchedInternalError(DuccId jobid, String msg)
+ {
+ super(msg);
+
+ this.jobid = jobid;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedInternalError.java
------------------------------------------------------------------------------
svn:eol-style = native