You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:43:38 UTC
svn commit: r1427967 [5/5] - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src: main/ main/java/
main/java/org/ main/java/org/apache/ main/java/org/apache/uima/
main/java/org/apache/uima/ducc/ main/java/org/apache/uima/ducc/rm/
main/java/org/apache/uim...
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,1139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.uima.ducc.common.IIdentity;
+import org.apache.uima.ducc.common.Node;
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.Pair;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccProperties;
+import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
+import org.apache.uima.ducc.common.utils.Version;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.common.utils.id.DuccIdFactory;
+
+
+/**
+ * This process orchestrates scheduling.
+ * - Receives requests from clients ( job manager, service manager, etc ) for resources
+ * - Forwards requests and current state to pluggable scheduling implementation
+ * - Receives a schedule, updates state, sends responses to requestors
+ * - Maintains state as needed (work item life cycle etc)
+ */
+public class Scheduler
+// extends Thread
+ implements ISchedulerMain,
+ SchedConstants
+{
+ IJobManager jobManager;
+ DuccLogger logger = DuccLogger.getLogger(Scheduler.class, COMPONENT_NAME);
+
+ boolean done = false;
+ // Boolean force_epoch = false;
+ String ducc_home;
+ // Integer epoch = 5; // scheduling epoch, seconds
+
+ NodePool nodepool;
+
+ //
+ // Fair-share and fixed-share use shares only, not machines
+ //
+ HashMap<DuccId, Share> busyShares = new HashMap<DuccId, Share>(); // Running "fair" share jobs
+
+ // incoming reports of machines that are now free
+ HashMap<DuccId, Pair<IRmJob, Share>> vacatedShares= new HashMap<DuccId, Pair<IRmJob, Share>>();
+ // boolean growthOccurred = false; // don't care which grew, just that something grew
+
+ ArrayList<IRmJob> incomingJobs = new ArrayList<IRmJob>(); // coming in from external world but not added our queues yet
+ ArrayList<IRmJob> recoveredJobs = new ArrayList<IRmJob>(); // coming in from external world but we don't now about them, (hopefully
+ // because we crashed and not for more nefarious reasons)
+ ArrayList<IRmJob> completedJobs = new ArrayList<IRmJob>(); // signaled complete from outside but not yet dealt with
+ ArrayList<IRmJob> initializedJobs = new ArrayList<IRmJob>(); // Init is complete so we can begin full (un)fair share allocation
+
+ //HashMap<Node, Node> incomingNodes = new HashMap<Node, Node>(); // node updates
+ HashMap<Node, Node> deadNodes = new HashMap<Node, Node>(); // missed too many heartbeats
+ HashMap<Node, Node> allNodes = new HashMap<Node, Node>(); // the guys we know
+
+ HashMap<String, User> users = new HashMap<String, User>(); // Active users - has a job in the system
+ //HashMap<DuccId, IRmJob> runningJobs = new HashMap<DuccId, IRmJob>();
+
+ HashMap<DuccId, IRmJob> allJobs = new HashMap<DuccId, IRmJob>();
+
+ HashMap<ResourceClass, ResourceClass> resourceClasses = new HashMap<ResourceClass, ResourceClass>();
+ HashMap<String, ResourceClass> resourceClassesByName = new HashMap<String, ResourceClass>();
+
+ String defaultClassName = null;
+ int defaultNThreads = 1;
+ int defaultNTasks = 10;
+ int defaultMemory = 16;
+
+ // these two are initialized in constructor
+ String schedImplName;
+ IScheduler scheduler;
+
+ long share_quantum = 16; // 16 GB in KB - smallest share size
+ long share_free_dram = 0; // 0 GB in KB - minim memory after shares are allocated
+ long dramOverride = 0; // if > 0, use this instead of amount reported by agents (modeling and testing)
+
+ EvictionPolicy evictionPolicy = EvictionPolicy.SHRINK_BY_MACHINE;
+
+// int nodeMetricsUpdateRate = 30000;
+// int startupCountdown = 0; // update each epoch. only schedule when it's > nodeStability
+ int nodeStability = 3;
+ boolean stability = false;
+
+ // static boolean expandByDoubling = true;
+ // static int initializationCap = 2; // Max allocation until we know initialization works in
+ // units of *processes*, not shares (i.e.N-shares).
+
+ //
+ // Version
+ // 0 - major version
+ // 6 - minor version
+ // 3 - ptf - forced eviction under fragmentation.
+ // 4 - defrag code complete
+ // beta - not yet "real"!
+ //
+ final static int rmversion_major = 0;
+ final static int rmversion_minor = 6;
+ final static int rmversion_ptf = 4;
+ final static String rmversion_string = "beta";
+
+ boolean initialized = false; // we refuse nodeupdates until this is true
+ public Scheduler()
+ {
+ }
+
+ public void init()
+ throws Exception
+ {
+ String methodName = "init";
+ //setName("Scheduler");
+
+ DuccLogger.setUnthreaded();
+
+ String ep = SystemPropertyResolver.getStringProperty("ducc.rm.eviction.policy", "SHRINK_BY_MACHINE");
+ evictionPolicy = EvictionPolicy.valueOf(ep);
+
+ nodepool = new NodePool(null, evictionPolicy, 0); // global nodepool
+ share_quantum = SystemPropertyResolver.getLongProperty("ducc.rm.share.quantum", share_quantum) * 1024 * 1024; // GB -> KB
+ share_free_dram = SystemPropertyResolver.getLongProperty("ducc.rm.reserved.dram", share_free_dram) * 1024 * 1024; // GB -> KB
+ ducc_home = SystemPropertyResolver.getStringProperty("DUCC_HOME");
+
+ // some defaults, for jobs that don't specify them
+ defaultNTasks = SystemPropertyResolver.getIntProperty("ducc.rm.default.tasks", 10);
+ defaultNThreads = SystemPropertyResolver.getIntProperty("ducc.rm.default.threads", 1);
+ defaultMemory = SystemPropertyResolver.getIntProperty("ducc.rm.default.memory", 16); // in GB
+ // expandByDoubling = RmUtil.getBooleanProperty("ducc.rm.expand.by.doubling", true);
+
+ nodeStability = SystemPropertyResolver.getIntProperty("ducc.rm.node.stability", 3); // number of node metrics updates to wait for before scheduling
+ // 0 means, just jump right in and don't wait
+ // initializationCap = RmUtil.getIntProperty("ducc.rm.initialization.cap", 2);
+
+ dramOverride = SystemPropertyResolver.getLongProperty("ducc.rm.override.dram", 0);
+ if ( dramOverride > 0 ) {
+ dramOverride = dramOverride * (1024 * 1024); // convert to KB
+ }
+
+ try {
+ schedImplName = SystemPropertyResolver.getStringProperty("ducc.rm.scheduler", "org.apache.uima.ducc.rm.rm.ClassBasedScheduler");
+ @SuppressWarnings("unchecked")
+ Class<IScheduler> cl = (Class<IScheduler>) Class.forName(schedImplName);
+ scheduler = (IScheduler) cl.newInstance();
+ } catch (ClassNotFoundException e) {
+ throw new SchedulingException(null, "Cannot find class " + schedImplName);
+ } catch (InstantiationException e) {
+ throw new SchedulingException(null, "Cannot instantiate class " + schedImplName);
+ } catch (IllegalAccessException e) {
+ throw new SchedulingException(null, "Cannot instantiate class " + schedImplName + ": can't access constructor.");
+ }
+
+ String class_definitions = SystemPropertyResolver.getStringProperty("ducc.rm.class.definitions", "scheduler.classes");
+ try {
+ initClasses(class_definitions);
+ } catch ( Exception e ) {
+ logger.error(methodName, null, e);
+ throw e;
+ }
+
+ // we share most of the state with the actual scheduling code - no need to keep passing this around
+ // TODO: Make sure these are all Sialized correctly
+ scheduler.setEvictionPolicy(evictionPolicy);
+ scheduler.setClasses(resourceClasses);
+ scheduler.setNodePool(nodepool);
+
+ logger.info(methodName, null, "Scheduler running with share quantum : ", (share_quantum / (1024*1024)), " GB");
+ logger.info(methodName, null, " reserved DRAM : ", (share_free_dram / (1024*1024)), " GB");
+ logger.info(methodName, null, " DRAM override : ", (dramOverride / (1024*1024)), " GB");
+ logger.info(methodName, null, " scheduler : ", schedImplName);
+ logger.info(methodName, null, " default threads : ", defaultNThreads);
+ logger.info(methodName, null, " default tasks : ", defaultNTasks);
+ logger.info(methodName, null, " default memory : ", defaultMemory);
+ logger.info(methodName, null, " class definition file : ", class_definitions);
+ logger.info(methodName, null, " RM:OR scheduling ratio : ", SystemPropertyResolver.getIntProperty("ducc.rm.state.publish.ratio",
+ DEFAULT_SCHEDULING_RATIO) + ":1");
+ logger.info(methodName, null, " eviction policy : ", evictionPolicy);
+ logger.info(methodName, null, " use prediction : ", SystemPropertyResolver.getBooleanProperty("ducc.rm.prediction", true));
+ logger.info(methodName, null, " prediction fudge factor : ", SystemPropertyResolver.getIntProperty("ducc.rm.prediction.fudge", 10000));
+ logger.info(methodName, null, " node stability : ", nodeStability);
+ logger.info(methodName, null, " metrics update rate : ", SystemPropertyResolver.getIntProperty("ducc.agent.node.metrics.publish.rate",
+ DEFAULT_NODE_METRICS_RATE));
+ logger.info(methodName, null, " initialization cap : ", SystemPropertyResolver.getIntProperty("ducc.rm.initialization.cap"));
+ logger.info(methodName, null, " expand by doubling : ", SystemPropertyResolver.getBooleanProperty("ducc.rm.expand.by.doubling", true));
+ logger.info(methodName, null, " fragmentation threshold : ", SystemPropertyResolver.getIntProperty("ducc.rm.fragmentation.threshold", 2));
+ logger.info(methodName, null, " do defragmentation : ", SystemPropertyResolver.getBooleanProperty("ducc.rm.defragmentation", true));
+ logger.info(methodName, null, " DUCC home : ", System.getProperty("DUCC_HOME"));
+ logger.info(methodName, null, " ActiveMQ URL : ", SystemPropertyResolver.getStringProperty("ducc.broker.url"));
+ logger.info(methodName, null, " JVM : ", System.getProperty("java.vendor") +
+ " "+ System.getProperty("java.version"));
+ logger.info(methodName, null, " JAVA_HOME : ", System.getProperty("java.home"));
+ logger.info(methodName, null, " JVM Path : ", System.getProperty("ducc.jvm"));
+ logger.info(methodName, null, " JMX URL : ", System.getProperty("ducc.jmx.url"));
+ logger.info(methodName, null, " OS Architecture : ", System.getProperty("os.arch"));
+ logger.info(methodName, null, " DUCC Version : ", Version.version());
+ logger.info(methodName, null, " RM Version : ", ""+ rmversion_major + "."
+ + rmversion_minor + "."
+ + rmversion_ptf + "-"
+ + rmversion_string);
+ initialized = true;
+ }
+
+ public Machine getMachine(NodeIdentity ni)
+ {
+ return nodepool.getMachine(ni);
+ }
+
+ public void setJobManager(IJobManager jobmanager)
+ {
+ this.jobManager = jobmanager;
+ }
+
+ public String getDefaultClassName()
+ {
+ return defaultClassName;
+ }
+
+ public int getDefaultNThreads()
+ {
+ return defaultNThreads;
+ }
+
+ public int getDefaultNTasks()
+ {
+ return defaultNTasks;
+ }
+
+ public int getDefaultMemory()
+ {
+ return defaultMemory;
+ }
+
+ public ResourceClass getResourceClass(String name)
+ {
+ return resourceClassesByName.get(name);
+ }
+
+ public IRmJob getJob(DuccId id)
+ {
+ return allJobs.get(id);
+ }
+
+ public Share getShare(DuccId id)
+ {
+ return busyShares.get(id);
+ }
+
+// public static int getInitializationCap()
+// {
+// return initializationCap;
+// }
+//
+// public static boolean isExpandByDoubling()
+// {
+// return expandByDoubling;
+// }
+
+ /**
+ * Calculate share order, given some memory size in GB (as in from a job spec)
+ */
+ int calcShareOrder(long mem)
+ {
+ // Calculate its share order
+ mem = mem * 1024 * 1024; // to GB
+
+ int share_order = (int) (mem / share_quantum); // liberal calc, round UP
+ if ( (mem % share_quantum) > 0 ) {
+ share_order++;
+ }
+ return share_order;
+ }
+
+ /**
+ * Use the NodeIdentity to infer my the domain name.
+ *
+ * Itertate through the possible names - if one of them has a '.'
+ * the we have to assume the following stuff is the domain name.
+ * We only get one such name, so we give up the search if we find
+ * it.
+ */
+ private String getDomainName()
+ {
+ String methodName = "getDomainName";
+ try {
+ NodeIdentity ni = new NodeIdentity();
+ for ( IIdentity id : ni.getNodeIdentities()) {
+ String n = id.getName();
+ int ndx = n.indexOf(".");
+ if ( ndx > 0 ) {
+ return n.substring(ndx + 1);
+ }
+ }
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ logger.warn(methodName, null, "Cannot create my own node identity:", e);
+ }
+ return null; // crappy config if this happens, some stuff may not match nodepools and
+ // nothing to do about it.
+ }
+
+ Map<String, String> readNodepoolFile(String npfile)
+ {
+ String my_domain = getDomainName();
+ String ducc_home = System.getProperty("DUCC_HOME");
+ npfile = ducc_home + "/resources/" + npfile;
+
+ Map<String, String> response = new HashMap<String, String>();
+
+ try {
+ BufferedReader br = new BufferedReader(new FileReader(npfile));
+ String node = "";
+ while ( (node = br.readLine()) != null ) {
+ int ndx = node.indexOf("#");
+ if ( ndx >= 0 ) {
+ node = node.substring(0, ndx);
+ }
+ node = node.trim();
+ if (node.equals("") ) {
+ continue;
+ }
+
+ if ( node.startsWith("import") ) {
+ String[] tmp = node.split("\\s");
+ response.putAll(readNodepoolFile(tmp[1]));
+ continue;
+ }
+ response.put(node, node);
+
+ // include fully and non-fully qualified names to allow sloppiness of config
+ ndx = node.indexOf(".");
+ String dnode;
+ if ( ndx >= 0 ) {
+ dnode = node.substring(0, ndx);
+ response.put(dnode, dnode);
+ } else if ( my_domain != null ) {
+ dnode = node + "." + my_domain;
+ response.put(dnode, dnode);
+ }
+ }
+ br.close();
+
+ } catch (FileNotFoundException e) {
+ throw new SchedulingException(null, "Cannot open NodePool file \"" + npfile + "\": file not found.");
+ } catch (IOException e) {
+ throw new SchedulingException(null, "Cannot read NodePool file \"" + npfile + "\": I/O Error.");
+ }
+
+ return response;
+ }
+
+ void initClasses(String filename)
+ throws Exception
+ {
+ String methodName = "initClasses";
+ DuccProperties props = new DuccProperties();
+ props.load(ducc_home + "/resources/" + filename);
+
+ defaultClassName = props.getProperty("scheduling.default.name");
+
+ // read in nodepools
+ String npn = props.getProperty("scheduling.nodepool");
+ if ( npn != null ) {
+ String[] npnames = npn.split(" ");
+ for ( String nodepoolName : npnames ) {
+ int nporder = props.getIntProperty("scheduling.nodepool." + nodepoolName + ".order", 100);
+ String npfile = props.getProperty("scheduling.nodepool." + nodepoolName).trim();
+ Map<String,String> npnodes = readNodepoolFile(npfile);
+ nodepool.createSubpool(nodepoolName, npnodes, nporder);
+// } catch (FileNotFoundException e) {
+// throw new SchedulingException(null, "Cannot open NodePool file \"" + npfile + "\": file not found.");
+// } catch (IOException e) {
+// throw new SchedulingException(null, "Cannot read NodePool file \"" + npfile + "\": I/O Error.");
+// }
+ }
+ }
+
+ // read in the class definitions
+ String cn = props.getProperty("scheduling.class_set");
+ if ( cn == null ) {
+ throw new SchedulingException(null, "No class definitions found, scheduler cannot start.");
+ }
+
+ String[] classNames = cn.split("\\s+");
+ logger.info(methodName, null, "Classes:");
+ logger.info(methodName, null, ResourceClass.getHeader());
+ logger.info(methodName, null, ResourceClass.getDashes());
+ for ( String n : classNames ) {
+ n = n.trim();
+ ResourceClass rc = new ResourceClass(n); //, nodepool.getMachinesByName(), nodepool.getMachinesByIp());
+ rc.init(props);
+ resourceClasses.put(rc, rc);
+ resourceClassesByName.put(n, rc);
+ logger.info(methodName, null, rc.toString());
+ }
+ }
+
+ void initClassesX(String filename)
+ throws Exception
+ {
+ String methodName = "initClasses";
+ DuccProperties props = new DuccProperties();
+ props.load(ducc_home + "/resources/" + filename);
+
+ defaultClassName = props.getProperty("scheduling.default.name");
+ String my_domain = getDomainName();
+
+ // read in nodepools
+ String npn = props.getProperty("scheduling.nodepool");
+ if ( npn != null ) {
+ String[] npnames = npn.split(" ");
+ for ( String nodepoolName : npnames ) {
+ int nporder = props.getIntProperty("scheduling.nodepool." + nodepoolName + ".order", 100);
+ String npfile = props.getProperty("scheduling.nodepool." + nodepoolName).trim();
+ try {
+ String ducc_home = System.getProperty("DUCC_HOME");
+ npfile = ducc_home + "/resources/" + npfile;
+ BufferedReader br = new BufferedReader(new FileReader(npfile));
+ String node = "";
+ HashMap<String, String> npnodes = new HashMap<String, String>();
+ while ( (node = br.readLine()) != null ) {
+ int ndx = node.indexOf("#");
+ if ( ndx >0 ) {
+ node = node.substring(0, ndx);
+ }
+ node = node.trim();
+ if (node.equals("") ) {
+ continue;
+ }
+
+ npnodes.put(node, node);
+
+ // include fully and non-fully qualified names to allow sloppiness of config
+ ndx = node.indexOf(".");
+ String dnode;
+ if ( ndx >= 0 ) {
+ dnode = node.substring(0, ndx);
+ npnodes.put(dnode, dnode);
+ } else if ( my_domain != null ) {
+ dnode = node + "." + my_domain;
+ npnodes.put(dnode, dnode);
+ }
+ }
+ br.close();
+ nodepool.createSubpool(nodepoolName, npnodes, nporder);
+
+ } catch (FileNotFoundException e) {
+ throw new SchedulingException(null, "Cannot open NodePool file \"" + npfile + "\": file not found.");
+ } catch (IOException e) {
+ throw new SchedulingException(null, "Cannot read NodePool file \"" + npfile + "\": I/O Error.");
+ }
+ }
+ }
+
+ // read in the class definitions
+ String cn = props.getProperty("scheduling.class_set");
+ if ( cn == null ) {
+ throw new SchedulingException(null, "No class definitions found, scheduler cannot start.");
+ }
+
+ String[] classNames = cn.split("\\s+");
+ logger.info(methodName, null, "Classes:");
+ logger.info(methodName, null, ResourceClass.getHeader());
+ logger.info(methodName, null, ResourceClass.getDashes());
+ for ( String n : classNames ) {
+ n = n.trim();
+ ResourceClass rc = new ResourceClass(n); //, nodepool.getMachinesByName(), nodepool.getMachinesByIp());
+ rc.init(props);
+ resourceClasses.put(rc, rc);
+ resourceClassesByName.put(n, rc);
+ logger.info(methodName, null, rc.toString());
+ }
+ }
+
+
+
+ /**
+ * Called only from schedule, under the 'this' monitor.
+ *
+ * We then take the SchedulingUpdate from the IScheduler and dispatches orders to
+ * the world to make it happen.
+ *
+ * For jobs that lose resources, job manager is asked to stop execution in specific shares.
+ * For jobs that gain resources, job manager is asked to start execution in specific shares.
+ * Jobs that don't change are leftovers. If they're not running at all, they're in the pending
+ * list; they might also be in the running list but had no allocation changes in the current epoch.
+ */
+ private JobManagerUpdate dispatch(SchedulingUpdate upd, JobManagerUpdate jmu)
+ {
+ String methodName = "dispatch";
+ HashMap<IRmJob, IRmJob> jobs;
+
+ // Go through shrunken jobs - if they are shrunken to 0, move to dormant
+ jobs = upd.getShrunkenJobs();
+ for (IRmJob j : jobs.values()) {
+
+ logger.info(methodName, j.getId(), ">>>>>>>>>> SHRINK");
+
+ HashMap<Share, Share> sharesE = j.getAssignedShares();
+ HashMap<Share, Share> sharesR = j.getPendingRemoves();
+ logger.info(methodName, j.getId(), "removing", sharesR.size(), "of existing", sharesE.size(), "shares.");
+
+ for ( Share s : sharesE.values() ) {
+ logger.debug(methodName, j.getId(), " current", s.toString());
+ }
+
+ for ( Share s : sharesR.values() ) {
+ logger.debug(methodName, j.getId(), " remove ", s.toString());
+ }
+ logger.info(methodName, j.getId(), ">>>>>>>>>>");
+
+ jmu.removeShares(j, sharesR);
+ // jobManager.stopJob(j, shares); // stops job on everything on the pendingRemoves list
+ // j.clearPendingRemoves();
+ }
+
+ // Go through expanded jobs - if they are dormant, remove from dormant
+ // then add to running.
+ // Tell the server it needs to start some machines for the job
+ jobs = upd.getExpandedJobs();
+ for (IRmJob j : jobs.values() ) {
+ HashMap<Share, Share> sharesE = j.getAssignedShares();
+ HashMap<Share, Share> sharesN = j.getPendingShares();
+
+ logger.info(methodName, j.getId(), "<<<<<<<<<< EXPAND");
+ logger.info(methodName, j.getId(), "adding", sharesN.size(), "new shares to existing", sharesE.size(), "shares.");
+
+ for ( Share s : sharesE.values()) {
+ logger.debug(methodName, j.getId(), " existing ", s.toString());
+ }
+
+ for ( Share s : sharesN.values()) {
+ logger.debug(methodName, j.getId(), " expanding", s.toString());
+ }
+ logger.info(methodName, j.getId(), "<<<<<<<<<<");
+
+ sharesN = j.promoteShares();
+ if ( sharesN.size() == 0 ) {
+ // internal error - should not be marked expanded if no machines
+ throw new SchedulingException(j.getId(), "Trying to execute expanded job but no pending machines.");
+ }
+
+ for ( Share s : sharesN.values()) { // update machine books
+ // Sanity checks on the bookkeeping
+ busyShares.put(s.getId(), s);
+ }
+
+// DuccId id = j.getId(); // pull from dormant, maybe
+// if ( dormantJobs .containsKey(id) ) {
+// dormantJobs .remove(id);
+// }
+
+ //runningJobs.put(id, j);
+ jmu.addShares(j, sharesN);
+ // jobManager.executeJob(j, shares); // will update job's pending lists
+
+ }
+
+ jobs = upd.getStableJobs(); // squirrel these away to try next time
+ for (IRmJob j: jobs.values()) {
+ if ( j.countNShares() < 0 ) {
+ throw new SchedulingException(j.getId(), "Share count went negative " + j.countNShares());
+ }
+ logger.info(methodName, j.getId(), ".......... STABLE with ", j.countNShares(), " shares.");
+ }
+
+ jobs = upd.getDormantJobs(); // squirrel these away to try next time
+ for (IRmJob j: jobs.values()) {
+ logger.info(methodName, j.getId(), ".......... DORMANT");
+// dormantJobs .put(j.getId(), j);
+ }
+
+ jobs = upd.getReservedJobs();
+ for (IRmJob j: jobs.values()) {
+ logger.info(methodName, j.getId(), "<<<<<<<<<< RESERVE");
+
+ HashMap<Share, Share> sharesE = j.getAssignedShares();
+ HashMap<Share, Share> sharesN = j.getPendingShares();
+
+ if ( sharesE.size() == j.countInstances() ) {
+ logger.info(methodName, j.getId(), "reserve_stable", sharesE.size(), "machines");
+ } else if ( sharesN.size() == j.countInstances() ) { // reservation is complete but not yet confirmed?
+ logger.info(methodName, j.getId(), "reserve_adding", sharesN.size(), "machines");
+ for ( Share s : sharesN.values()) {
+ logger.debug(methodName, j.getId(), " reserve_expanding ", s.toString());
+ }
+ jmu.addShares(j, sharesN);
+ j.promoteShares();
+ } else {
+ logger.info(methodName, j.getId(), "reserve_pending", j.countInstances(), "machines");
+ }
+ logger.info(methodName, j.getId(), "<<<<<<<<<<");
+ }
+
+ jmu.setAllJobs(allJobs);
+
+ jobs = upd.getRefusedJobs();
+ Iterator<IRmJob> iter = jobs.values().iterator();
+ while ( iter.hasNext() ) {
+ IRmJob j = iter.next();
+ logger.info(methodName, j.getId(), ".......... REFUSED");
+ }
+
+ return jmu;
+ }
+
+ /**
+ * We don't accept new work or even Orchestrator state updates until "ready". We do
+ * want machines, but be sure the internal structures are protected.
+ */
+ public boolean ready()
+ {
+ return stability;
+ }
+
+ public void start()
+ {
+ stability = true;
+ }
+
+ protected void handleDeadNodes()
+ {
+ String methodName = "handleDeadNodes";
+
+ if ( ! initialized ) {
+ return;
+ }
+
+ HashMap<Node, Node> nodeUpdates = new HashMap<Node, Node>();
+ synchronized(deadNodes) {
+ nodeUpdates.putAll(deadNodes);
+ deadNodes.clear();
+ }
+
+ synchronized(this) {
+
+ for ( Node n : nodeUpdates.values() ) {
+ Machine m = nodepool.getMachine(n);
+
+ if ( m == null ) {
+ // must have been removed because of earlier missed hb
+ continue;
+ }
+
+ logger.warn(methodName, null, "***Purging machine***", m.getId(), "due to missed heartbeats. THreshold:", nodeStability);
+ NodePool np = m.getNodepool();
+ np.nodeLeaves(m);
+ }
+ }
+ }
+
+ /**
+ * We first accept any changes and requests from the outside world and place them where they
+ * can be acted on in this epoch.
+ *
+ * We then pass all relevent requests and resources to the IScheduler. This returns a
+ * SchedulingUpdate which is passed to the dispatcher to be acted upon.
+ */
+ public JobManagerUpdate schedule()
+ {
+ String methodName = "schedule";
+
+
+// if ( startupCountdown++ < nodeStability ) {
+// logger.info(methodName, null, "Startup countdown:", startupCountdown, "of", nodeStability);
+// return null;
+// }
+
+ if ( ! stability ) {
+ return null;
+ }
+
+ // tracking the OR hang problem - are topics being delivered?
+ logger.debug("nodeArrives", null, "Total arrivals:", total_arrivals);
+
+ handleDeadNodes();
+ nodepool.reset(NodePool.getMaxOrder());
+
+ // TODO: Can we combine these two into one?
+ SchedulingUpdate upd = new SchedulingUpdate(); // state from internal scheduler
+ JobManagerUpdate jmu = new JobManagerUpdate(); // state we forward to job manager
+
+ // int nchanges = 0;
+
+
+ ArrayList<IRmJob> jobsToRecover = new ArrayList<IRmJob>();
+ synchronized(recoveredJobs) {
+ jobsToRecover.addAll(recoveredJobs);
+ recoveredJobs.clear();
+ // nchanges += jobsToRecover.size();
+ }
+
+ ArrayList<IRmJob> newJobs = new ArrayList<IRmJob>();
+ //
+ // If there are new jobs we need to init some things and start a scheduling cycle.
+ //
+ synchronized(incomingJobs) {
+ newJobs.addAll(incomingJobs);
+ incomingJobs.clear();
+ // nchanges += newJobs.size();
+ }
+
+ //
+ // If some jobs pased initializion we need to signal a scheduling cycle to get
+ // them their fair share
+ //
+// synchronized(initializedJobs) {
+// if ( initializedJobs.size() > 0 ) {
+// nchanges++;
+// }
+// initializedJobs.clear();
+// }
+
+ //
+ // If some jobs completed we need to process clearning them out and signal a
+ // scheduling cycle to try to reuse their resources.
+ //
+ ArrayList<IRmJob> doneJobs = new ArrayList<IRmJob>();
+ synchronized(completedJobs) {
+ doneJobs.addAll(completedJobs);
+ completedJobs.clear();
+ //nchanges += doneJobs.size();
+ }
+
+ //
+ // If some shares were vacated we need to clear them out and run a scheduling cycle.
+ //
+ ArrayList<Pair<IRmJob, Share>> doneShares= new ArrayList<Pair<IRmJob, Share>>();
+ synchronized(vacatedShares) {
+ doneShares.addAll(vacatedShares.values());
+ vacatedShares.clear();
+ //nchanges += doneShares.size();
+
+ // we use the vacatedShares object to control share growth as well
+ //if ( growthOccurred ) nchanges++;
+ //growthOccurred = false;
+ }
+
+// boolean must_run = false;
+// synchronized(force_epoch) {
+// must_run = force_epoch;
+// force_epoch = false;
+// }
+
+// if ( (nchanges == 0) && !must_run ) {
+// jmu.setAllJobs(allJobs);
+// return jmu;
+// }
+// TODO if we remove this code above be sure to clear out all the force_epoch nonsense
+// TODO does this even use growthOccurred?
+
+ synchronized(this) {
+
+ // before looking at jobs, insure we're updated after a crash
+ for ( IRmJob j : jobsToRecover ) {
+ processRecovery(j);
+ }
+
+ // process these next to free up resources for the scheduling cycle
+ for (Pair<IRmJob, Share> p : doneShares) {
+ processCompletion(p.first(), p.second());
+ }
+
+ for (IRmJob j : doneJobs) {
+ processCompletion(j);
+ }
+
+ // update user records, "check in" new jobs
+ if ( newJobs.size() > 0 ) {
+ logger.info(methodName, null, "Jobs arrive:");
+ logger.info(methodName, null, "submit", RmJob.getHeader());
+ }
+
+ Iterator<IRmJob> iter = newJobs.iterator();
+ while ( iter.hasNext() ) {
+ IRmJob j = iter.next();
+
+
+ if ( j.isRefused() ) { // the JobManagerConverter has already refused it
+ upd.refuse(j, j.getRefusalReason());
+ }
+
+ String user = j.getUserName();
+ User u = users.get(user);
+ if ( u == null ) {
+ u = new User(user);
+ users.put(user, u);
+ }
+ j.setUser(u);
+
+ // Calculate its share order
+ int share_order = calcShareOrder(j.getMemory());
+ j.setShareOrder(share_order);
+
+ // Assign it to its priority class
+ String clid = j.getClassName();
+ ResourceClass prclass = resourceClassesByName.get(clid);
+
+ u.addJob(j);
+ allJobs.put(j.getId(), j);
+ if ( prclass == null ) {
+ upd.refuse(j, "Cannot find priority class " + clid + " for job");
+ continue;
+ }
+
+ /**
+ * We want to allow this - a normal job, submitted to a reservation class.
+ if ( (prclass.getPolicy() == Policy.RESERVE ) && ( ! j.isReservation() ) ) {
+ upd.refuse(j, "Reservaction class " +
+ prclass.getId() + " specified but work is not a reservation.");
+ continue;
+ }
+ */
+
+ if ( ((prclass.getPolicy() != Policy.RESERVE ) && (prclass.getPolicy() != Policy.FIXED_SHARE)) && ( j.isReservation() ) ) {
+ upd.refuse(j, "Class " + prclass.getName() + " is policy " +
+ prclass.getPolicy() + " but the work is submitted as a reservation.");
+ continue;
+ }
+
+ prclass.addJob(j);
+ j.setResourceClass(prclass);
+ logger.info(methodName, j.getId(), "submit", j.toString());
+ }
+
+ logger.info(methodName, null, "Scheduling " + newJobs.size(), " new jobs. Existing jobs: " + allJobs.size());
+ scheduler.schedule(upd);
+
+ logger.info(methodName, null, "--------------- Scheduler returns ---------------");
+ logger.info(methodName, null, "\n", upd.toString());
+ logger.info(methodName, null, "------------------------------------------------");
+ dispatch(upd, jmu); // my own job lists get updated by this
+
+ return jmu;
+ }
+ }
+
+ synchronized public void shutdown()
+ {
+ done = true;
+ }
+
+// public void run()
+// {
+// String methodName = "run";
+// while ( ! done ) {
+// try { sleep(epoch); } catch (InterruptedException e) { }
+
+// logger.info(methodName, null, "========================== Epoch starts ===========================");
+// try {
+// schedule();
+// } catch ( SchedulingException e ) {
+// logger.info(methodName, e.jobid, e);
+// }
+
+// logger.info(methodName, null, "========================== Epoch ends ===========================");
+// }
+// }
+
+ private int total_arrivals = 0;
+ public void nodeArrives(Node node)
+ {
+ // String methodName = "nodeArrives";
+ // The first block insures the node is in the scheduler's records as soon as possible
+
+ total_arrivals++; // report these in the main schedule loop
+ synchronized(this) {
+ // the amount of memory available for shares, adjusted with configured overhead
+
+ Machine m = nodepool.getMachine(node);
+ int share_order = 0;
+
+ if ( m == null ) {
+ allNodes.put(node, node);
+ long allocatable_mem = node.getNodeMetrics().getNodeMemory().getMemTotal() - share_free_dram;
+ if ( dramOverride > 0 ) {
+ allocatable_mem = dramOverride;
+ }
+ share_order = (int) (allocatable_mem / share_quantum); // conservative - rounds down (this will always cast ok)
+ } else {
+ share_order = m.getShareOrder();
+ }
+
+ m = nodepool.nodeArrives(node, share_order); // announce to the nodepools
+ // m.heartbeat_down();
+ // logger.info(methodName, null, "Node arrives:", m.getId()); // make SURE it's reset ok
+ }
+
+ // The second block registers it in the heartbeat map
+// synchronized(incomingNodes) {
+// incomingNodes.put(node, node);
+// }
+ }
+
+ public void nodeDeath(Map<Node, Node> nodes)
+ {
+ synchronized(deadNodes) {
+ deadNodes.putAll(nodes);
+ }
+ }
+
+ /**
+ * Callback from job manager, need shares for a new fair-share job.
+ */
+ public void signalNewWork(IRmJob job)
+ {
+ // We'll synchronize only on the incoming job list
+ synchronized(incomingJobs) {
+ incomingJobs.add(job);
+ }
+ }
+
+// public void signalForceEpoch()
+// {
+// synchronized( force_epoch ) {
+// force_epoch = true;
+// }
+// }
+
+ public void signalInitialized(IRmJob job)
+ {
+ // We'll synchronize only on the incoming job list
+ synchronized(initializedJobs) {
+ initializedJobs.add(job);
+ }
+ }
+
+ public void signalRecovery(IRmJob job)
+ {
+ synchronized(recoveredJobs) {
+ recoveredJobs.add(job);
+ }
+ }
+
+ public void jobCancelled(DuccId id)
+ {
+ // TODO Fill this in.
+ }
+
+ /**
+ * Callback from job manager when a job completes. We just believe him, no sanity checks or other such stuff.
+ */
+ public void signalCompletion(DuccId id)
+ {
+ String methodName = "signalCompletion";
+ synchronized(completedJobs) {
+ try {
+ IRmJob job = allJobs.get(id);
+ logger.info(methodName, id, "Job completion signal.");
+ completedJobs.add(job);
+ } catch (Exception e) {
+ logger.warn(methodName, id, e);
+ }
+ }
+ }
+
+ /**
+ * Callback from job manager when a specific share exits but the job is still alive.
+ */
+ public void signalCompletion(IRmJob job, Share share)
+ {
+ String methodName = "signalCompletion";
+ synchronized(vacatedShares) {
+ logger.info(methodName, job.getId(), "Job vacate signal share: ", share.toString());
+ vacatedShares.put(share.getId(), new Pair<IRmJob, Share>(job, share));
+ }
+ }
+
+ /**
+ * Callback from job manager when a specific share gets a process associated.
+ */
+// public void signalGrowth(DuccId jobid, Share share)
+// {
+// String methodName = "signalGrowth";
+// synchronized(vacatedShares) {
+// logger.info(methodName, jobid, "Job growth signal share: ", share.toString());
+// growthOccurred = true;
+// }
+// }
+
+ /**
+ * Called in scheduling cycle, to actually complete the job - avoids deadlock
+ */
+ private synchronized void processCompletion(IRmJob job)
+ {
+ String methodName = "processCompletion";
+ logger.info(methodName, job.getId(), "Job completes.");
+
+ // -- clean up the running jobs list
+ IRmJob j = allJobs.remove(job.getId());
+ if ( j == null ) {
+ logger.info(methodName, job.getId(), "Job is not in run list!"); // can happen if job is refused very early
+ return;
+ }
+
+ j.markComplete();
+
+ // -- clean up user list
+ User user = users.get(j.getUserName());
+ if ( user.remove(job) == 0 ) {
+ users.remove(user.getName());
+ }
+
+ ResourceClass rc = job.getResourceClass();
+ if ( rc != null ) {
+ rc.removeJob(j); // also clears it if it's a reservation
+ } else if ( !j.isRefused() ) {
+ throw new SchedInternalError(j.getId(), "Job exits from class " + job.getClassName() + " but we cannot find the priority class definition.");
+ }
+
+
+ // -- clean up machine lists
+ HashMap<Share, Share> shares= job.getAssignedShares();
+ for (Share s: shares.values()) {
+ purgeShare(s, job);
+ }
+ job.removeAllShares();
+ }
+
+ /**
+ * Called from scheduling cycle - a specific share has run out of work for the give job (but the
+ * job is not done yet).
+ */
+ private synchronized void processCompletion(IRmJob job, Share share)
+ {
+ String methodName = "processCompletion";
+
+ logger.debug(methodName, job.getId(), "Job vacates share ", share.toString());
+ //share.removeJob();
+ job.removeShare(share);
+ purgeShare(share, job);
+ }
+
+ private synchronized void processRecovery(IRmJob j)
+ {
+ String methodName = "processRecovery";
+
+ int share_order = calcShareOrder(j.getMemory());
+ j.setShareOrder(share_order);
+ j.setResourceClass(resourceClassesByName.get(j.getClassName()));
+ HashMap<Share, Share> shares = j.getRecoveredShares();
+ StringBuffer sharenames = new StringBuffer();
+ for ( Share s : shares.values() ) {
+ sharenames.append(s.toString());
+ sharenames.append(" ");
+
+ if ( !j.isReservation() ) { // if it's a reservation, the share order is already set from
+ // the machine. nodepools can cause the actual order to differ
+ // from the order derived from indicated memory.
+ s.setShareOrder(share_order);
+ }
+ Machine m = s.getMachine();
+ NodePool np = m.getNodepool();
+ np.connectShare(s, m, j, s.getShareOrder());
+
+ busyShares.put(s.getId(), s);
+ }
+ String username = j.getUserName();
+ User user = users.get(username);
+ if ( user == null ) {
+ user = new User(username);
+ users.put(username, user);
+ logger.info(methodName, j.getId(), "&&&&&&&&&&&&&&&& new user", user.toString(), "-------------------");
+ }
+ j.setUser(user);
+ user.addJob(j);
+
+ j.promoteShares(); // NOT expanded, just recovered, promote them right away
+ j.clearRecoveredShares();
+
+ String clid = j.getClassName();
+ ResourceClass prclass = resourceClassesByName.get(clid);
+
+ allJobs.put(j.getId(), j);
+ prclass.addJob(j);
+ j.setResourceClass(prclass);
+ logger.info(methodName, j.getId(), "Recovered job:", j.toString());
+ logger.info(methodName, j.getId(), "Recovered shares:", sharenames.toString());
+ }
+
+ /**
+ * The share is gone, purge from our structures.
+ */
+ private void purgeShare(Share s, IRmJob j)
+ {
+ busyShares.remove(s.getId()); // so long, and thanks for all the fish
+ Machine m = s.getMachine();
+ m.removeShare(s);
+ }
+
+ private static DuccIdFactory idFactory = new DuccIdFactory();
+ public static DuccId newId()
+ {
+ return idFactory.next();
+ }
+
+ public void queryMachines()
+ {
+ nodepool.queryMachines();
+ }
+
+ class MachineByOrderSorter
+ implements Comparator<Machine>
+ {
+ public int compare(Machine m1, Machine m2)
+ {
+ if (m1.getShareOrder() == m2.getShareOrder()) {
+ return (m1.getId().compareTo(m2.getId()));
+ }
+ return (int) (m1.getShareOrder() - m2.getShareOrder());
+ }
+ }
+
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingException.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingException.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingException.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingException.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import org.apache.uima.ducc.common.utils.id.DuccId;
+
+@SuppressWarnings("serial")
+public class SchedulingException
+ extends RuntimeException
+{
+ DuccId jobid;
+
+ public SchedulingException(DuccId jobid, String msg)
+ {
+ super(msg);
+
+ this.jobid = jobid;
+ }
+
+ public SchedulingException(DuccId jobid, String msg, Throwable t)
+ {
+ super(msg, t);
+
+ this.jobid = jobid;
+ }
+
+ public DuccId getJobId()
+ {
+ return jobid;
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingUpdate.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingUpdate.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingUpdate.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingUpdate.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import java.util.HashMap;
+
+public class SchedulingUpdate
+{
+ private HashMap<IRmJob, IRmJob> shrunken;
+ private HashMap<IRmJob, IRmJob> expanded;
+ private HashMap<IRmJob, IRmJob> stable;
+ private HashMap<IRmJob, IRmJob> dormant;
+ private HashMap<IRmJob, IRmJob> reservations;
+ private HashMap<IRmJob, IRmJob> refusals;
+
+ public SchedulingUpdate()
+ {
+ shrunken = new HashMap<IRmJob, IRmJob>();
+ expanded = new HashMap<IRmJob, IRmJob>();
+ stable = new HashMap<IRmJob, IRmJob>();
+ dormant = new HashMap<IRmJob, IRmJob>();
+ reservations = new HashMap<IRmJob, IRmJob>();
+ refusals = new HashMap<IRmJob, IRmJob>();
+ }
+
+// public SchedulingUpdate(
+// ArrayList<Job> shrunken,
+// ArrayList<Job> expanded,
+// ArrayList<Job> leftovers)
+// {
+// this.shrunken = shrunken;
+// this.expanded = expanded;
+// this.leftovers = leftovers;
+// }
+
+ void addShrunkenJob(IRmJob j)
+ {
+ shrunken.put(j, j);
+ }
+
+ void addExpandedJob(IRmJob j)
+ {
+ expanded.put(j, j);
+ }
+
+ void addDormantJob(IRmJob j)
+ {
+ dormant.put(j, j);
+ }
+
+ void addStableJob(IRmJob j)
+ {
+ stable.put(j, j);
+ }
+
+ void addReservation(IRmJob j)
+ {
+ reservations.put(j, j);
+ }
+
+ void refuse(IRmJob j, String reason)
+ {
+ j.refuse(reason);
+ refusals.put(j, j);
+ }
+
+ HashMap<IRmJob, IRmJob> getRefusedJobs()
+ {
+ return refusals;
+ }
+
+ HashMap<IRmJob, IRmJob> getReservedJobs()
+ {
+ return reservations;
+ }
+
+ HashMap<IRmJob, IRmJob> getShrunkenJobs()
+ {
+ return shrunken;
+ }
+
+ HashMap<IRmJob, IRmJob> getExpandedJobs()
+ {
+ return expanded;
+ }
+
+ HashMap<IRmJob, IRmJob> getStableJobs()
+ {
+ return stable;
+ }
+
+ HashMap<IRmJob, IRmJob> getDormantJobs()
+ {
+ return dormant;
+ }
+
+// HashMap<IRmJob, IRmJob> getReservations()
+// {
+// return reservations;
+// }
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append("Expanded:\n");
+ if ( expanded.size() == 0 ) {
+ sb.append("<none>\n");
+ } else {
+ sb.append(" ");
+ sb.append(RmJob.getHeader());
+ sb.append("\n");
+ for (IRmJob j : expanded.values()) {
+ sb.append(" ");
+ sb.append(j.toString());
+ sb.append("\n");
+ }
+ }
+
+ sb.append("\nShrunken:\n");
+ if ( shrunken.size() == 0 ) {
+ sb.append(" <none>\n");
+ } else {
+ sb.append(" ");
+ sb.append(RmJob.getHeader());
+ sb.append("\n");
+ for (IRmJob j : shrunken.values()) {
+ sb.append(" ");
+ sb.append(j.toString());
+ sb.append("\n");
+ }
+ }
+
+ sb.append("\nStable:\n");
+ if ( stable.size() == 0 ) {
+ sb.append(" <none>\n");
+ } else {
+ sb.append(" ");
+ sb.append(RmJob.getHeader());
+ sb.append("\n");
+ for (IRmJob j : stable.values()) {
+ sb.append(" ");
+ sb.append(j.toString());
+ sb.append("\n");
+ }
+ }
+
+ sb.append("\nDormant:\n");
+ if ( dormant.size() == 0 ) {
+ sb.append(" <none>\n");
+ } else {
+ sb.append(" ");
+ sb.append(RmJob.getHeader());
+ sb.append("\n");
+ for (IRmJob j : dormant.values()) {
+ sb.append(" ");
+ sb.append(j.toString());
+ sb.append("\n");
+ }
+ }
+
+ sb.append("\nReserved:\n");
+ if ( reservations.size() == 0 ) {
+ sb.append(" <none>\n");
+ } else {
+ sb.append(" ");
+ sb.append(RmJob.getHeader());
+ sb.append("\n");
+ for (IRmJob j : reservations.values()) {
+ sb.append(" ");
+ sb.append(j.toString());
+ sb.append("\n");
+ }
+ }
+
+ sb.append("\n");
+
+ return sb.toString();
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingUpdate.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ServerException.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ServerException.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ServerException.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ServerException.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+@SuppressWarnings("serial")
+public class ServerException
+ extends RuntimeException
+{
+ public ServerException()
+ {
+ super();
+ }
+
+ public ServerException(String msg)
+ {
+ super(msg);
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ServerException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.ITimeWindow;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+
+/**
+ * This may more correctly thought of as representing a Process.
+ *
+ * A share is ALWAYS associated with a Machine.
+ */
+public class Share
+ implements SchedConstants
+{
+ //private transient DuccLogger logger = DuccLogger.getLogger(Share.class, COMPONENT_NAME);
+ private Machine machine; // machine associatede with this share, assigned after "what-of"
+ private DuccId id; // unique *within this machine* assigned after "what-of"
+ private IRmJob job; // job executing in this share, if any, assigned after "what-of"
+ private int share_order; // may not be same as machine's order
+
+ private ITimeWindow init_time = null; // how much time this process spends initializing
+ @SuppressWarnings("unused")
+ private ITimeWindow run_time = null; // how much time this process spends initializing
+
+
+ // private HashMap<Integer, Long> activeQuestions = new HashMap<Integer, Long>();
+
+ private boolean evicted = false; // during preemption, remember this share has already been removed
+ // (multiple classes might try to evict, this prevents multiple eviction)
+ private boolean purged = false; // share is forcibly removed because it's machine died
+ private boolean fixed = false; // if true, can't be preempted
+
+ @SuppressWarnings("unused")
+ private long resident_memory = 0;
+ @SuppressWarnings("unused")
+ private ProcessState state = ProcessState.Undefined;
+ @SuppressWarnings("unused")
+ private String pid = "<none>";
+
+
+ /**
+ * This constructor is used during recovery ONLY.
+ */
+ public Share(DuccId id, Machine machine, IRmJob job, int share_order)
+ {
+ this.id = id;
+ this.machine = machine;
+ this.job = job;
+ this.share_order = share_order;
+ }
+
+ /**
+ * Normal constructor.
+ */
+ public Share(Machine machine, IRmJob job, int share_order)
+ {
+ this.machine = machine;
+ this.id = Scheduler.newId();
+ this.job = job;
+ this.share_order = share_order;
+ }
+
+// /**
+// * Simulation only.
+// */
+// void questionStarted(WorkItem q)
+// {
+// if ( activeQuestions.containsKey(q.getId()) ) {
+// throw new SchedulingException(job.getId(), "Share.questionStarted: work item " + q.getId() + " already running in share " + toString());
+// }
+
+// activeQuestions.put(q.getId(), System.currentTimeMillis());
+// }
+
+// /**
+// * Simulation only.
+// */
+// void questionComplete(WorkItem q)
+// {
+// String methodName = "questionComplete";
+// if ( !activeQuestions.containsKey(q.getId()) ) {
+// throw new SchedulingException(job.getId(), "Share.questionComplete: work item " + q.getId() + " not found in share " + toString());
+// }
+
+// logger.info(methodName, job.getId(), q.toString(), " completes in ",
+// ""+(System.currentTimeMillis() - activeQuestions.get(q.getId())), " milliseconds in share ", toString());
+// activeQuestions.remove(q.getId());
+// }
+
+ public NodePool getNodepool()
+ {
+ return machine.getNodepool();
+ }
+
+ public int getNodepoolDepth()
+ {
+ return getNodepool().getDepth();
+ }
+
+ public String getNodepoolId()
+ {
+ return getNodepool().getId();
+ }
+
+ public IRmJob getJob()
+ {
+ return job;
+ }
+
+ public DuccId getId()
+ {
+ return id;
+ }
+
+ Machine getMachine()
+ {
+ return machine;
+ }
+
+ boolean isPending()
+ {
+ return job.isPendingShare(this);
+ }
+
+ /**
+ * Defrag - this (pending) share is given to a different job before OR learns about it.
+ */
+ void reassignJob(IRmJob job)
+ {
+ this.job = job;
+ }
+
+ public NodeIdentity getNodeIdentity()
+ {
+ return machine.getNodeIdentity();
+ }
+
+ /**
+ * The order of the share itself.
+ */
+ public int getShareOrder()
+ {
+ return share_order;
+ }
+
+ /**
+ * Update the share order, used during recovery.
+ */
+ void setShareOrder(int o)
+ {
+ this.share_order = o;
+ }
+
+ /**
+ * The size of the machine the share resides on.
+ */
+ int getMachineOrder()
+ {
+ return machine.getShareOrder();
+ }
+
+ /**
+ * It's forceable if it's not a permanent share and it's not already evicted for some reason.
+ */
+ boolean isForceable()
+ {
+ if ( evicted ) return false;
+ if ( purged ) return false;
+ if ( fixed ) return false;
+ return true;
+ }
+
+ /**
+ * It's preemptable if:
+ * - it's not yet preempted
+ * - it belongs to a job that has a "loser" count > 0
+ * - it's fair-share share
+ */
+ boolean isPreemptable()
+ {
+ return ( isForceable() && (( job.countNShares() - job.countNSharesGiven()) > 0 ));
+ // This last works because if the job has preempted shares they count against the countNShares count
+ // so we don't end up counting this job more than it deserves.
+ }
+
+ public boolean update(DuccId jobid, long mem, ProcessState state, ITimeWindow init_time, ITimeWindow run_time, String pid)
+ {
+ if ( ! jobid.equals(job.getId()) ) return false; // something has gone horribly wrong
+
+ this.resident_memory = mem;
+ this.state = state;
+ this.pid = pid;
+ this.init_time = init_time;
+ this.run_time = run_time;
+ return true;
+ }
+
+// /**
+// * Calculate the "investment" this share has in the questions it's running.
+// * Currently, this will be the longest time any of the current questions has been running.
+// *
+// * We have to separete this from "getting" so we can freeze the timestamp - in a sort the
+// * getter could be called multiple times, and get inconsistent results.
+// */
+// synchronized void calculateInvestment()
+// {
+// investment = 0;
+// long now = System.currentTimeMillis();
+// for ( Long elapsed : activeQuestions.values() ) {
+// investment = Math.max(investment, (now - elapsed));
+// }
+// }
+
+ /**
+ * Caller must always call calculateInvestment before retrieving the investment, or it is
+ * likely to be wrong.
+ *
+ * Investment is a combination of initialization time and execution time. We always
+ * want shares that aren't yet initialized to sort LOWER than shares that are initialized.
+ *
+ * For now we only take into account initialzation time.
+ *
+ * NOTE: We no longer set investment directly. Instead each state update refreshes the
+ * init_time and run_time structures and we calculate investment from that. This
+ * affects the RM simulator, which will need updating if we decide to revive it.
+ */
+ long getInvestment()
+ {
+ if ( init_time == null ) return 0;
+ return init_time.getElapsedMillis();
+ }
+
+ /**
+ * Returns only initialization time. Eventually getInvestment() may take other things into
+ * consideration so we separate these two (even though currently they do the same thing.)
+ */
+ long getInitializationTime()
+ {
+ if ( init_time == null ) return 0;
+ return init_time.getElapsedMillis();
+ }
+
+ boolean isInitialized()
+ {
+ if ( init_time == null ) return false;
+
+ return (init_time.getEnd() != null);
+ }
+
+ public void setFixed()
+ {
+ fixed = true;
+ }
+
+ boolean isFixed()
+ {
+ return fixed;
+ }
+
+ void evict()
+ {
+ evicted = true;
+ }
+
+ boolean isEvicted()
+ {
+ return evicted || purged;
+ }
+
+ void purge()
+ {
+ purged = true;
+ }
+
+ public boolean isPurged()
+ {
+ return purged;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return id.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if ( o == null ) return false;
+ if ( this == o ) return true;
+ if ( this.getClass() != o.getClass() ) return false;
+
+ Share s = (Share) o;
+ //return (id.equals(m.getId()));
+ return this.id.equals(s.getId());
+ }
+
+ public String toString()
+ {
+ return machine.getId() + "." + getId();
+ }
+
+}
+
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import java.util.Comparator;
+import java.util.HashMap;
+
+public class User
+ implements IEntity
+{
+ private String id;
+ private HashMap<IRmJob, IRmJob> jobs = new HashMap<IRmJob, IRmJob>(); // my jobs
+ private HashMap<ResourceClass, HashMap<IRmJob, IRmJob>> jobsByClass = new HashMap<ResourceClass, HashMap<IRmJob, IRmJob>>();
+
+ private HashMap<Integer, HashMap<IRmJob, IRmJob>> jobsByOrder = new HashMap<Integer, HashMap<IRmJob, IRmJob>>();
+ private int user_shares; // number of shares to apportion to jobs in this user in current epoch
+ private int pure_fair_share; // uncapped un-bonused counts
+ private int share_wealth; // defrag, how many relevent Q shares do i really have?
+ private int[] given_by_order = null;
+ private int[] wanted_by_order = null;
+
+ private static Comparator<IEntity> apportionmentSorter = new ApportionmentSorterCl();
+ public User(String name)
+ {
+ this.id = name;
+ }
+
+ public long getTimestamp()
+ {
+ return 0;
+ }
+
+ void addJob(IRmJob j)
+ {
+ jobs.put(j, j);
+ int order = j.getShareOrder();
+
+ HashMap<IRmJob, IRmJob> ojobs = jobsByOrder.get(order);
+ if ( ! jobsByOrder.containsKey(order) ) {
+ ojobs = new HashMap<IRmJob, IRmJob>();
+ jobsByOrder.put(order, ojobs);
+ }
+ ojobs.put(j, j);
+
+ ResourceClass cl = j.getResourceClass();
+ ojobs = jobsByClass.get(cl);
+ if ( ojobs == null ) {
+ ojobs = new HashMap<IRmJob, IRmJob>();
+ jobsByClass.put(cl, ojobs);
+ }
+ ojobs.put(j, j);
+ }
+
+ /**
+ * Remove a job from the list and return how many jobs remain.
+ */
+ int remove(IRmJob j)
+ {
+ if ( jobs.containsKey(j) ) {
+ jobs.remove(j);
+
+ int order = j.getShareOrder();
+ HashMap<IRmJob, IRmJob> ojobs = jobsByOrder.get(order);
+ ojobs.remove(j);
+
+ ResourceClass cl = j.getResourceClass();
+ if ( jobsByClass.containsKey(cl) ) { // if not it's likely an early refusal
+ ojobs = jobsByClass.get(cl);
+ ojobs.remove(j);
+ }
+ } else {
+ throw new SchedulingException(j.getId(), "User " + id + " is asked to remove job " + j.getId() + " but the job is not assigned.");
+ }
+ return jobs.size();
+ }
+
+ /**
+ * Currently, all users are equal.
+ */
+ public int getShareWeight()
+ {
+ return 1;
+ }
+
+ /**
+ * Returns total N-shares wanted by order for a given class. Processes of size order.
+ */
+ private int countNSharesWanted(int order, ResourceClass rc)
+ {
+ int K = 0;
+
+ // First sum the max shares all my jobs can actually use
+ HashMap<IRmJob, IRmJob> jobs = jobsByOrder.get(order);
+ if ( jobs == null ) {
+ return 0;
+ }
+
+ String rcname = rc.getName();
+ for ( IRmJob j : jobs.values() ) {
+ if ( j.getResourceClass().getName().equals(rcname) ) {
+ K += j.getJobCap();
+ }
+ }
+
+ return K;
+ }
+
+ public void initWantedByOrder(ResourceClass rc)
+ {
+ wanted_by_order = NodePool.makeArray();
+ for ( int o = NodePool.getMaxOrder(); o > 0; o-- ) {
+ wanted_by_order[o] = countNSharesWanted(o, rc);
+ wanted_by_order[0] += wanted_by_order[o];
+ }
+ }
+
+ public void setPureFairShare(int pfs)
+ {
+ this.pure_fair_share = pfs;
+ }
+
+ public int getPureFairShare()
+ {
+ return pure_fair_share;
+ }
+
+ public int[] getWantedByOrder()
+ {
+ return wanted_by_order;
+ }
+
+ public void setGivenByOrder(int[] gbo)
+ {
+ this.given_by_order = gbo;
+ }
+
+ public int[] getGivenByOrder()
+ {
+ return given_by_order;
+ }
+
+ public void setShareWealth(int w)
+ {
+ this.share_wealth = w; // qshares
+ }
+
+ public int getShareWealth()
+ {
+ return share_wealth; // qshares
+ }
+
+ public void subtractWealth(int w)
+ {
+ share_wealth -= w;
+ }
+
+ public int calculateCap(int order, int total)
+ {
+ return Integer.MAX_VALUE; // no cap for users
+ }
+
+ HashMap<IRmJob, IRmJob> getJobs()
+ {
+ return jobs;
+ }
+
+ HashMap<IRmJob, IRmJob> getJobsOfOrder(int order)
+ {
+ return jobsByOrder.get(order);
+ }
+
+ HashMap<Integer, HashMap<IRmJob, IRmJob>> getJobsByOrder()
+ {
+ return jobsByOrder;
+ }
+
+ HashMap<String, Machine> getMachines()
+ {
+ // TODO: fill this in - walk the jobs and return the hash
+ System.out.println("Warning: getMachines() is not implemented and is returning null");
+ return null;
+ }
+
+ public int countJobs()
+ {
+ return jobs.size();
+ }
+
+ public int countJobs(int o)
+ {
+ if ( jobsByOrder.containsKey(o) ) {
+ return jobsByOrder.get(o).size();
+ }
+ return 0;
+ }
+
+ public void clearShares()
+ {
+ user_shares = 0;
+ //System.out.println("**** user " + getId() + "/" + uniqueId + " clearing shares");
+ //sharesByOrder.clear();
+ }
+
+ public void addQShares(int s)
+ {
+ user_shares += s;
+ //System.out.println("***** user " + getId() + "/" + uniqueId + " shares are " + s);
+ }
+
+ /**
+ * Try to find the smallest bonus shares we can use.
+ */
+ public int canUseBonus(int bonus, int[] tmpSharesByOrder)
+ {
+ for ( int i = 1; i <= Math.min(bonus, tmpSharesByOrder.length); i++ ) {
+
+ if ( jobsByOrder.containsKey(i) && (tmpSharesByOrder[i] > 0) ) {
+ return i;
+ }
+ }
+ return 0;
+ }
+
+ public int countQShares(String x)
+ {
+ //System.out.println(x + " **** user " + getId() + "/" + uniqueId + " returning " + user_shares + " shares");
+ return this.user_shares;
+ }
+
+
+ int countCappedQShares(int physicalCap, int order)
+ {
+ int K = 0;
+ physicalCap = physicalCap * order; // to quantum shares
+ HashMap<IRmJob, IRmJob> jobs = jobsByOrder.get(order);
+
+ if ( jobs == null ) {
+ return 0;
+ }
+
+ for ( IRmJob j : jobs.values() ) {
+ K += (Math.min(j.getJobCap(), physicalCap));
+ }
+
+ return Math.min(K, physicalCap) * order;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return id.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if ( o == null ) return false;
+ if ( this == o ) return true;
+ if ( this.getClass() != o.getClass() ) return false;
+
+ User u = (User) o;
+ return this.id.equals(u.getName());
+ }
+
+ /**
+ public String getId()
+ {
+ return id;
+ }
+ */
+
+ public String getName()
+ {
+ return id;
+ }
+
+ public String toString()
+ {
+ return id;
+ }
+
+ public Comparator<IEntity> getApportionmentSorter()
+ {
+ return apportionmentSorter;
+ }
+
+ static private class ApportionmentSorterCl
+ implements Comparator<IEntity>
+ {
+ public int compare(IEntity e1, IEntity e2)
+ {
+ if ( e1 == e2 ) return 0;
+ return e1.getName().compareTo(e2.getName());
+ }
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/WorkItem.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/WorkItem.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/WorkItem.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/WorkItem.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+public class WorkItem
+{
+ private int id;
+ //private int cputime;
+ private int walltime;
+
+ public WorkItem(int id, int walltime)
+ {
+ // not checking these because they come from generated code that we assume is correct
+ this.id = id;
+ this.walltime = walltime;
+ }
+
+ int getId()
+ {
+ return id;
+ }
+
+ String getStringId()
+ {
+ // convenience, for logging
+ return Integer.toString(id);
+ }
+
+ int getWalltime()
+ {
+ return walltime;
+ }
+
+ public String toString()
+ {
+ return "Qid " + id + " " + walltime;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/WorkItem.java
------------------------------------------------------------------------------
svn:eol-style = native