You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:43:38 UTC
svn commit: r1427967 [1/5] - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src: main/ main/java/
main/java/org/ main/java/org/apache/ main/java/org/apache/uima/
main/java/org/apache/uima/ducc/ main/java/org/apache/uima/ducc/rm/
main/java/org/apache/uim...
Author: cwiklik
Date: Wed Jan 2 19:43:37 2013
New Revision: 1427967
URL: http://svn.apache.org/viewvc?rev=1427967&view=rev
Log:
UIMA-2491
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStatus.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManager.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/config/
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/config/ResourceManagerConfiguration.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/ResourceManagerEventListener.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IEntity.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IJobManager.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IScheduler.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/JobManagerUpdate.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/README (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedConstants.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedInternalError.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingException.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingUpdate.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ServerException.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/WorkItem.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/resources/
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/test/
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/test/java/
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/test/resources/
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,812 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.utils.DuccCollectionUtils;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapDifference;
+import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapValueDifference;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.rm.scheduler.IJobManager;
+import org.apache.uima.ducc.rm.scheduler.IRmJob;
+import org.apache.uima.ducc.rm.scheduler.ISchedulerMain;
+import org.apache.uima.ducc.rm.scheduler.JobManagerUpdate;
+import org.apache.uima.ducc.rm.scheduler.Machine;
+import org.apache.uima.ducc.rm.scheduler.ResourceClass;
+import org.apache.uima.ducc.rm.scheduler.RmJob;
+import org.apache.uima.ducc.rm.scheduler.SchedConstants;
+import org.apache.uima.ducc.rm.scheduler.SchedulingException;
+import org.apache.uima.ducc.rm.scheduler.Share;
+import org.apache.uima.ducc.transport.event.RmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+import org.apache.uima.ducc.transport.event.common.IDuccPerWorkItemStatistics;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
+import org.apache.uima.ducc.transport.event.common.IDuccReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
+import org.apache.uima.ducc.transport.event.common.IDuccSchedulingInfo;
+import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
+import org.apache.uima.ducc.transport.event.common.IDuccWork;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkExecutable;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.ITimeWindow;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+import org.apache.uima.ducc.transport.event.rm.IResource;
+import org.apache.uima.ducc.transport.event.rm.IRmJobState;
+import org.apache.uima.ducc.transport.event.rm.Resource;
+import org.apache.uima.ducc.transport.event.rm.RmJobState;
+
+
+/**
+ * Convert the scheduler's structures into the events that get returned to the world.
+ */
+
+public class JobManagerConverter
+ implements IJobManager,
+ SchedConstants
+{
+ DuccLogger logger = DuccLogger.getLogger(JobManagerConverter.class, COMPONENT_NAME);
+ ISchedulerMain scheduler;
+
+ DuccWorkMap localMap = null;
+ JobManagerUpdate lastJobManagerUpdate = new JobManagerUpdate();
+
+ Map<IRmJob, IRmJob> refusedJobs = new HashMap<IRmJob, IRmJob>();
+
+ public JobManagerConverter(ISchedulerMain scheduler)
+ {
+ this.scheduler = scheduler;
+ this.localMap = new DuccWorkMap();
+ DuccLogger.setUnthreaded();
+ }
+
+ int toInt(String s, int deflt)
+ {
+ try {
+ int val = Integer.parseInt(s);
+ return ( val == 0 ) ? deflt : val;
+ } catch ( Throwable t ) {
+ return deflt;
+ }
+ }
+
+ void refuse(IRmJob j, String reason)
+ {
+ j.refuse(reason);
+ synchronized(refusedJobs) {
+ refusedJobs.put(j, j);
+ }
+ }
+
+// void formatSchedulingInfo(DuccId id, IDuccSchedulingInfo si, int remaining_work)
+// {
+// String methodName = "formatSchedulingInfo";
+// SynchronizedDescriptiveStatistics stats = si.getPerWorkItemProcessingTime();
+// double arith_mean = stats.getMean();
+// double geom_mean = stats.getGeometricMean();
+// double[] vals = stats.getSortedValues();
+//
+// logger.info(methodName, null, id, "STATS: arithmetic mean:", arith_mean);
+// logger.info(methodName, null, id, "STATS: geometric mean:", geom_mean);
+// logger.info(methodName, null, id, "STATS: remaining work:", remaining_work);
+// logger.info(methodName, null, id, "STATS: nvals :", vals.length);
+//
+// if ( vals.length > 0 ) {
+// StringBuffer buf = new StringBuffer();
+// int cnt = 0;
+//
+// for ( int i = 0; i < vals.length; i++ ) {
+// buf.append(Double.toString(vals[i]));
+// if ( (++cnt) % 10 == 0 ) {
+// buf.append("\n");
+// } else {
+// buf.append(" ");
+// }
+// }
+// logger.info(methodName, null, id, "STATS: vals:\n", buf.toString());
+// }
+//
+// }
+
+ /**
+ * Update scheduler internal job structure with updated data from arriving job state.
+ */
+ void jobUpdate(IDuccWork job)
+ {
+ String methodName = "jobUpate";
+ IDuccSchedulingInfo si = job.getSchedulingInfo();
+
+ DuccId jobid = job.getDuccId();
+ IRmJob j = scheduler.getJob(jobid);
+ if ( j == null ) {
+ // this can happen right when the job is submitted, if we haven't yet called
+ // the scheduler to deal with it. just ignore, but take note.
+ // logger.info(methodName, jobid, "**** Cannot find job to update! ****");
+ return;
+ } else {
+ int total_work = toInt(si.getWorkItemsTotal(), scheduler.getDefaultNTasks());
+ int completed_work = toInt(si.getWorkItemsCompleted(), 0) + toInt(si.getWorkItemsError(), 0);
+
+ int max_shares = toInt(si.getSharesMax(), Integer.MAX_VALUE);
+ int existing_max_shares = j.getMaxShares();
+
+ // we need at least 1 if the job isn't reported complete, in case the user forgot to set the
+ // work item count. the job will run, but slowly in that case.
+ int remaining_work = Math.max(total_work - completed_work, 1);
+
+ logger.info(methodName, job.getDuccId(),
+ String.format("total_work: %5d items completed: %5s items error %3s remaining work %5d",
+ total_work,
+ si.getWorkItemsCompleted(), // note this comes in as string (!) from OR
+ si.getWorkItemsError(), // also string
+ remaining_work
+ ));
+
+ if ( max_shares != existing_max_shares ) {
+ j.setMaxShares(max_shares);
+ logger.info(methodName, job.getDuccId(), "Max shares adjusted from", existing_max_shares, "to", max_shares, "(incoming)",
+ si.getSharesMax());
+ }
+
+ double arith_mean = Double.NaN;
+ IDuccPerWorkItemStatistics stats = si.getPerWorkItemStatistics();
+ if(stats != null) {
+ arith_mean = stats.getMean();
+ }
+ j.setNQuestions(total_work, remaining_work, arith_mean);
+
+ // formatSchedulingInfo(job.getDuccId(), si, remaining_work);
+
+ if ( job instanceof IDuccWorkJob ) {
+ if ( j.setInitWait( ((IDuccWorkJob) job).isRunnable()) ) {
+ logger.debug(methodName, jobid, "Set Initialized.");
+ scheduler.signalInitialized(j);
+ }
+ } else {
+ j.setInitWait(true); // pop is always ready to go
+ }
+ }
+ }
+
+ /**
+ * NOTE: If this returns false, it maust also refuse().
+ */
+ private boolean receiveExecutable(IRmJob j, IDuccWork job)
+ {
+ String methodName = "receiveExecutable";
+ IDuccWorkExecutable de = (IDuccWorkExecutable) job;
+ IDuccProcessMap pm = de.getProcessMap();
+
+ if ( (pm.size() > 0) && !job.isCompleted() ) { // need to recover, apparently RM crashed. hmph.
+ for ( IDuccProcess proc : pm.values() ) { // build up Shares from the incoming state
+
+ ProcessState state = proc.getProcessState();
+ String pid = proc.getPID();
+ NodeIdentity ni = proc.getNodeIdentity();
+
+ if ( proc.isComplete() ) {
+ logger.debug(methodName, j.getId(), "Skipping process", pid, "on", ni.getName(), "beacause state is", state);
+ continue;
+ }
+
+ Machine m = scheduler.getMachine(ni);
+ if ( m == null ) { // not known, huh? maybe next epoch it will have checked in
+ refuse(j, "Cannot restore job because node " + ni.getName() + " is unknown.");
+ return false; // so we don't add it to global tables
+ } else {
+ DuccId id = proc.getDuccId();
+ Share s = new Share(id, m, j, m.getShareOrder()); // guess share order; scheduler will reset when it recovers job
+ long mem = proc.getResidentMemory();
+
+ logger.info(methodName, j.getId(), "Assigning share in state", state, "pid", pid, "for recovery", s.toString());
+ j.recoverShare(s);
+ s.update(j.getId(), mem, state, proc.getTimeWindowInit(), proc.getTimeWindowRun(), pid);
+ }
+ }
+ logger.info(methodName, j.getId(), "Scheduling for recovery.");
+ scheduler.signalRecovery(j);
+ } else {
+ logger.info(methodName, j.getId(), "Scheduling as new.");
+ scheduler.signalNewWork(j);
+ }
+ return true;
+ }
+
+ /**
+ * NOTE: If this returns false, it maust also refuse().
+ */
+ private boolean receiveReservation(IRmJob j, IDuccWork job)
+ {
+ String methodName = "receiveReservation";
+ j.setReservation();
+
+ IDuccWorkReservation dr = (IDuccWorkReservation) job;
+ IDuccReservationMap rm = dr.getReservationMap();
+ if ( (rm.size() > 0) && !job.isCompleted() ) { // need to recover, apparently RM crashed. hmph.
+ for ( IDuccReservation res : rm.values() ) { // build up Shares from the incoming state
+ NodeIdentity ni = res.getNodeIdentity();
+ Machine m = scheduler.getMachine(ni);
+ if ( m == null ) { // not known, huh? maybe next epoch it will have checked in
+ refuse(j, "Cannot restore reservation because node " + ni.getName() + " is unknown.");
+ return false; // so we don't add it to global tables
+ } else {
+ DuccId id = res.getDuccId();
+ Share s = new Share(id, m, j, m.getShareOrder());
+ s.setFixed();
+ j.recoverShare(s);
+ logger.debug(methodName, j.getId(), "Assigning share for recovery", s.toString());
+ }
+ }
+ logger.info(methodName, j.getId(), "Scheduling for recovery.");
+ scheduler.signalRecovery(j);
+ } else {
+ logger.info(methodName, j.getId(), "Scheduling as new.");
+ scheduler.signalNewWork(j);
+ }
+ return true;
+ }
+
+ /**
+ * Convert a JobManager Job into a ResourceManager RmJob. We assume this job is NOT in
+ * our lists.
+ *
+ * NOTE IMPORTANT NOTE
+ *
+ * Until Lou's job contains all required scheduling fields I do a conversion that enhances
+ * what I receive with defaults so the scheduler can actually schedule the job.
+ *
+ * NOTE IMPORTANT NOTE
+ *
+ * @param job
+ */
+ boolean jobArrives(IDuccWork job)
+ {
+ String methodName = "jobArrives";
+ logger.debug(methodName, job.getDuccId(), "Job arives");
+ logger.debug(methodName, job.getDuccId(), "Job is of type", job.getDuccType());
+
+ // Properties props = new Properties();
+
+ // Set<String> keys = props.stringPropertyNames();
+ // for ( String k : keys ) {
+ // logger.debug("methodName", job.getDuccId(), "Next property [", k, ", ", props.getProperty(k), "]");
+ // }
+
+ // Properties rmProps = new DuccProperties();
+ // for ( int i = 0; i < requiredProperties.length; i++ ) {
+ // String v = props.getProperty(requiredProperties[i]);
+ // if ( v == null ) {
+ // v = defaultValues[i];
+ // }
+ // rmProps.setProperty(rmProperties[i], v);
+ // }
+ // IRmJob j = new RmJob(job.getDuccId(), rmProps);
+
+ // Convert Lou's structure into mine.
+ IRmJob j = new RmJob(job.getDuccId());
+
+ IDuccSchedulingInfo si = job.getSchedulingInfo();
+ IDuccStandardInfo sti = job.getStandardInfo();
+
+ String name = sti.getDescription();
+ String user_name = sti.getUser();
+ j.setUserName(user_name);
+ j.setJobName(name);
+
+ int min_shares = toInt(si.getSharesMin(), 0);
+ int threads = toInt(si.getThreadsPerShare(), scheduler.getDefaultNThreads());
+ int user_priority = toInt(si.getSchedulingPriority(), 100);
+
+ int total_work = toInt(si.getWorkItemsTotal(), scheduler.getDefaultNTasks());
+ int completed_work = toInt(si.getWorkItemsCompleted(), 0);
+ int remaining_work = Math.max(total_work - completed_work, 1); // never let this go 0 or negative - both cases
+ // are (probably user) errors.
+
+ logger.info(methodName, job.getDuccId(), "total_work", total_work, "completed_work", completed_work,"remaining_work", remaining_work);
+
+ int memory = toInt(si.getShareMemorySize(), scheduler.getDefaultMemory());
+ String className = si.getSchedulingClass();
+ if ( className == null ) {
+ className = scheduler.getDefaultClassName();
+ }
+
+ j.setMinShares(min_shares);
+ j.setThreads(threads);
+ j.setUserPriority(user_priority);
+ j.setNQuestions(total_work, remaining_work, 0.0);
+ j.setClassName(className);
+
+ switch (si.getShareMemoryUnits()) {
+ case GB:
+ break;
+ default:
+ logger.warn(methodName, job.getDuccId(), "Memory units other than GB are not currently supported. Job returned.");
+ break;
+ }
+ j.setMemory(memory);
+ j.init();
+
+ j.setTimestamp(Long.parseLong(sti.getDateOfSubmission()));
+ // logger.info(methodName, j.getId(), "SUBMISSION DATE:", subd, (new Date(subd)).toString());
+
+ if ( job instanceof IDuccWorkJob ) {
+ j.setInitWait( ((IDuccWorkJob) job).isRunnable());
+ } else {
+ j.setInitWait(true); // pop is always ready to go
+ }
+
+ j.setDuccType(job.getDuccType()); // ugly and artificial but ... not going to rant here
+ // it's needed so messages can be made legible
+
+ //
+ // Now: must either create a new job, or recover one that we didn't know about, on the assumption that we
+ // have just crashed and are recovering.
+ //
+ // Be SURE that if status is turned false for any reason, or if you exit early with false, that you
+ // refuse() the job.
+ //
+ boolean status = true;
+
+ int max_processes = 0;
+ int max_machines = 0;
+ ResourceClass rescl = scheduler.getResourceClass(className);
+
+ if ( rescl == null ) {
+ // ph darn, we can't continue past this point
+ refuse(j, "Cannot find priority class " + className + " for job");
+ return false;
+ }
+
+// if ( logger.isDebug() ) {
+// logger.debug(methodName, j.getId(),"sharesMax", si.getSharesMax());
+// logger.debug(methodName, j.getId(),"getInstancesCount", si.getInstancesCount());
+// logger.debug(methodName, j.getId(), "rescl.getMaxProcesses", rescl.getMaxProcesses());
+// logger.debug(methodName, j.getId(), "rescl.getMaxMachines", rescl.getMaxMachines());
+// }
+
+ switch ( job.getDuccType() ) {
+ case Service:
+ case Pop:
+ case Job:
+ // instance and share count are a function of the class
+ switch ( rescl.getPolicy() ) {
+ case FAIR_SHARE:
+ max_processes = toInt(si.getSharesMax(), DEFAULT_PROCESSES);
+ max_processes = Math.min(rescl.getMaxProcesses(), max_processes);
+ j.setMaxShares(max_processes);
+ j.setNInstances(-1);
+ break;
+
+ case FIXED_SHARE:
+ max_processes = toInt(si.getSharesMax(), DEFAULT_INSTANCES);
+ j.setMaxShares(max_processes);
+ j.setNInstances(max_processes);
+ break;
+
+ case RESERVE:
+ max_machines = toInt(si.getInstancesCount(), DEFAULT_INSTANCES);
+ j.setMaxShares(max_machines);
+ j.setNInstances(max_machines);
+ break;
+ }
+
+ status = receiveExecutable(j, job);
+ logger.debug(methodName, j.getId(), "Serivce, Pop, or Job arrives, accepted:", status);
+ break;
+ case Reservation:
+ switch ( rescl.getPolicy() ) {
+ case FIXED_SHARE:
+ max_machines = toInt(si.getInstancesCount(), DEFAULT_INSTANCES);
+ break;
+ case RESERVE:
+ max_machines = toInt(si.getInstancesCount(), DEFAULT_INSTANCES);
+ break;
+ }
+
+ j.setMaxShares(-1);
+ j.setNInstances(max_machines);
+
+ status = receiveReservation(j, job);
+ logger.debug(methodName, j.getId(), "Reservation arrives, accepted:", status);
+ break;
+ default:
+ refuse(j, "Unknown job type: " + job.getDuccType());
+ status = false;
+ break;
+ }
+
+// logger.debug(methodName, j.getId(), "Max_processes:", max_processes);
+// logger.debug(methodName, j.getId(), "Max_machines:", max_machines);
+
+ return status;
+ }
+
+ /**
+ * Our records indicate that we know about this job but JM doesn't so we purge
+ * it from the scheduler
+ * @param job
+ */
+ void jobRemoved(DuccId id)
+ {
+ String methodName = "jobRemoved";
+ logger.debug(methodName, id, "Signalling removal");
+ scheduler.signalCompletion(id);
+ localMap.removeDuccWork(id);
+ logger.debug(methodName, id, "Remove signalled");
+ }
+
+ public void reconcileProcesses(DuccId jobid, IDuccWork l, IDuccWork r)
+ {
+ String methodName = "reconcileProcess";
+ IDuccProcessMap lpm = ((IDuccWorkJob )l).getProcessMap();
+ IDuccProcessMap rpm = ((IDuccWorkJob)r).getProcessMap();
+
+ @SuppressWarnings("unchecked")
+ DuccMapDifference<DuccId, IDuccProcess> diffmap = DuccCollectionUtils.difference(lpm, rpm);
+
+ // new stuff in in the left side of the map
+ Map<DuccId, IDuccProcess> lproc = diffmap.getLeft();
+
+ for ( IDuccProcess p : lproc.values() ) {
+ // look up share, update resident memory, process state, investment (eventually), maybe pid?
+ // simply update the share with the information. we pass in the jobid as a sanity check so
+ // we can crash or at least complain loudly on mismatch.
+
+ Share s = scheduler.getShare(p.getDuccId());
+ long mem = p.getResidentMemory();
+ ProcessState state = p.getProcessState();
+ String pid = p.getPID();
+ if ( ! s.update(jobid, mem, state, p.getTimeWindowInit(), p.getTimeWindowRun(), pid) ) {
+ // TODO: probably change to just a warning and cancel the job - for now I want an attention-getter
+ throw new SchedulingException(jobid, "Process assignemnt arrives for share " + s.toString() +
+ " but jobid " + jobid + " does not match share " + s.getJob().getId());
+ }
+ //scheduler.signalGrowth(jobid, s);
+ // sadly, the pid is almost always null here
+ //logger.info(methodName, jobid,
+ // "New process arrives for share", s.toString(), "PID", pid);
+ }
+
+ // gone stuff in in the right side of the map
+ Map<DuccId, IDuccProcess> rproc = diffmap.getRight();
+ for ( IDuccProcess p : rproc .values()) {
+ // these processes are done. look up the job and tell it process complete.
+ Share s = scheduler.getShare(p.getDuccId());
+ IRmJob j = scheduler.getJob(jobid);
+ if ( j == null ) {
+ throw new SchedulingException(jobid, "Process completion arrives for share " + s.toString() +
+ " but job " + jobid + "cannot be found.");
+ }
+ scheduler.signalCompletion(j, s);
+ logger.info(methodName, jobid,
+ String.format("Process %5s", p.getPID()),
+ "Completes in share", s.toString());
+ }
+
+ for( DuccMapValueDifference<IDuccProcess> pd: diffmap ) {
+ IDuccProcess pl = pd.getLeft();
+ IDuccProcess pr = pd.getRight();
+
+ Share sl = scheduler.getShare(pl.getDuccId());
+ Share sr = scheduler.getShare(pr.getDuccId());
+
+ String shareL = ( sl == null ) ? "<none>" : sl.toString();
+ String shareR = ( sr == null ) ? "<none>" : sr.toString();
+
+ ITimeWindow initL = pl.getTimeWindowInit();
+ ITimeWindow initR = pr.getTimeWindowInit();
+ long init_timeL = (initL == null) ? 0 : initL.getElapsedMillis();
+ long init_timeR = (initR == null) ? 0 : initR.getElapsedMillis();
+
+ /** extreme debugging only*/
+ if ( logger.isTrace() ) {
+ logger.trace(methodName, jobid,
+ "\n\tReconciling. incoming.(pid, mem, state, share, initTime)",
+ pl.getPID(),
+ pl.getResidentMemory(),
+ pl.getProcessState(),
+ shareL,
+ init_timeL,
+ "\n\tReconciling. existing.(pid, mem, state, share, initTime)",
+ pr.getPID(),
+ pr.getResidentMemory(),
+ pr.getProcessState(),
+ shareR,
+ init_timeR
+ );
+ } else {
+ if ( (pr.getPID() == null) && (pl.getPID() != null) ) {
+ logger.info(methodName, jobid,
+ String.format("Process %5s", pl.getPID()),
+ "PID assignement for share", shareL);
+ }
+ if ( pl.getProcessState() != pr.getProcessState() ) {
+ logger.info(methodName, jobid,
+ String.format("Process %5s", pl.getPID()),
+ "State update:", pr.getProcessState(), "-->", pl.getProcessState());
+ }
+ }
+
+ long mem = pl.getResidentMemory();
+ ProcessState state = pl.getProcessState();
+ String pid = pl.getPID();
+ Share s = scheduler.getShare(pl.getDuccId());
+ if ( pl.isActive() ) {
+
+ if ( s == null ) {
+ // this can happen if a node dies and the share is purged so it's ok.
+ logger.warn(methodName, jobid, "Update for share from process", pl.getPID(), pl.getDuccId(), "but cannot find share.");
+ continue;
+ }
+
+// if ( s.isPurged() ) {
+// IRmJob j = scheduler.getJob(jobid);
+// scheduler.signalCompletion(j, s);
+// logger.info(methodName, jobid, "Process", pl.getPID(), "marked complete because it is purged. State:", state);
+// }
+
+ if ( ! s.update(jobid, mem, state, pl.getTimeWindowInit(), pl.getTimeWindowRun(), pid) ) {
+ // TODO: probably change to just a warning and cancel the job - for now I want an attention-getter
+ throw new SchedulingException(jobid, "Process update arrives for share " + s.toString() +
+ " but jobid " + jobid + " does not match job in share " + s.getJob().getId());
+ }
+ // logger.debug(methodName, jobid, "Process update to process ", pid, "mem", mem, "state", state, "is assigned for share", s.toString());
+
+ } else if ( pl.isComplete() ) {
+ if ( s != null ) { // in some final states the share is already gone, not an error (e.g. Stopped)
+ IRmJob j = scheduler.getJob(jobid);
+ scheduler.signalCompletion(j, s);
+ logger.debug(methodName, jobid, "Process", pl.getPID(), " completed due to state", state);
+ }
+ } else {
+ logger.debug(methodName, jobid, "Process", pl.getPID(), "ignoring update because of state", state);
+ }
+
+ }
+
+ }
+
+ public void eventArrives(DuccWorkMap jobMap)
+ {
+ String methodName = "eventArrives";
+
+ logger.debug(methodName, null, "Got Orchestrator");
+
+ if ( jobMap.size() == 0 ) {
+ logger.debug(methodName, null, "No state from Orchestrator");
+ return;
+ }
+
+ if ( !scheduler.ready() ) {
+ logger.info(methodName, null, "Orchestrator event is discarded because scheduler is not yet ready.");
+ return;
+ }
+
+ @SuppressWarnings("unchecked")
+ DuccMapDifference<DuccId, IDuccWork> diffmap = DuccCollectionUtils.difference(jobMap, localMap);
+
+ for ( IDuccWork w : jobMap.values() ) {
+ //IDuccWork j = (IDuccWork) w;
+ logger.debug(methodName, w.getDuccId(), "Arrives in JmStateEvent state =", w.getStateObject());
+ }
+
+ //
+ // First handle new stuff
+ //
+ Map<DuccId, IDuccWork> jobs = diffmap.getLeft();
+ for ( IDuccWork w : jobs.values() ) {
+
+ if ( w.isSchedulable() ) {
+ logger.debug(methodName, w.getDuccId(), "Incoming, state = ", w.getStateObject());
+ try {
+ if ( jobArrives(w) ) { // if not ... something is fubar and we have to ignore it for now
+ localMap.addDuccWork(w);
+ }
+ } catch ( Exception e ) {
+ logger.error(methodName, w.getDuccId(), "Can't receive job because of exception", e);
+ }
+ } else {
+ logger.debug(methodName, w.getDuccId(), "Received non-schedulable job, state = ", w.getStateObject());
+ }
+ }
+
+ jobs = diffmap.getRight();
+ for ( IDuccWork w :jobs.values() ) {
+ logger.debug(methodName, w.getDuccId(), "Gone");
+ jobRemoved(w.getDuccId());
+ }
+
+ // Todo : manage stuff that is diffs, not new, not deletions
+ for( DuccMapValueDifference<IDuccWork> jd: diffmap ) {
+ IDuccWork r = jd.getRight();
+ IDuccWork l = jd.getLeft();
+ logger.debug(methodName, l.getDuccId(), "Reconciling, incoming state = ", l.getStateObject(), " my state = ", r.getStateObject());
+
+ if ( ! l.isSchedulable() ) {
+ logger.debug(methodName, l.getDuccId(), "Removing unschedulable job state = ", r.getStateObject());
+ jobRemoved(r.getDuccId());
+ } else {
+
+ localMap.addDuccWork(l); // still schedulable, and we already know about it, just sync the state
+
+ //
+ // Terrible ugliness here because the states should be so hard to get a handle on.
+ //
+ // Nonetheless, if this code hits, it's because the same job keeps coming back in
+ // WaitingForResources so it's not in the left-side map. We need to insure the
+ // scheduler runs an epoch next opportunity it has.
+ //
+ // NO LONGER NEEDED because scheduler always runs without checking if there's work to do.
+ //
+ // Object s = l.getStateObject();
+ // if ( s instanceof JobState ) {
+ // if ( ((JobState) s) == JobState.WaitingForResources ) {
+ // logger.debug(methodName, l.getDuccId(), "Job: force epoch");
+ // scheduler.signalForceEpoch();
+ // continue;
+ // }
+ // }
+
+ // if ( s instanceof ReservationState ) {
+ // if ( ((ReservationState) s) == ReservationState.WaitingForResources ) {
+ // logger.debug(methodName, l.getDuccId(), "Reservation: force epoch");
+ // scheduler.signalForceEpoch();
+ // continue;
+ // }
+ // }
+
+
+ switch ( l.getDuccType() ) {
+ case Job:
+ jobUpdate(l);
+ reconcileProcesses(l.getDuccId(), l, r);
+ break;
+ case Service:
+ case Pop:
+ case Reservation:
+ // for the moment, these guyes have nothing to reconcile.
+ break;
+ case Undefined:
+ throw new SchedulingException(l.getDuccId(), "Work arrives as type Undefined - should have been filtered out by now.");
+ }
+ }
+
+ }
+
+ logger.debug(methodName, null, "Done with JmStateDuccEvent with some jobs processed");
+
+ }
+
+ /**
+ * If no state has changed, we just resend that last one.
+ */
+ Map<DuccId, IRmJobState> previousJobState = new HashMap<DuccId, IRmJobState>();
+
+ /**
+ * Here's where we make a IRmStateEvent from the JobManagerUpdate so the caller can publish it.
+ */
+ public RmStateDuccEvent createState(JobManagerUpdate jmu)
+ {
+ String methodName = "createState";
+ //ArrayList<IRmJobState> rmJobState = null;
+ Map<DuccId, IRmJobState> rmJobState = null;
+
+
+ if ( jmu == null ) { // no changes
+ logger.debug(methodName, null, "Publishing old RM state");
+ rmJobState = previousJobState;
+ } else {
+ // TODO: create a new rmJobState; remember to set previousJobState
+ logger.debug(methodName, null, "Publishing new RM state");
+
+ rmJobState = new HashMap<DuccId, IRmJobState>();
+
+ // Must handle all jobs that ar refused here in JMC because nobody else knows about them
+ Map<IRmJob, IRmJob> refused = new HashMap<IRmJob, IRmJob>();
+ synchronized(refusedJobs) {
+ refused.putAll(refusedJobs);
+ refusedJobs.clear();
+ }
+
+ for ( IRmJob j : refused.values() ) {
+ RmJobState rjs = new RmJobState(j.getId(), j.getRefusalReason());
+ rjs.setDuccType(j.getDuccType());
+ rmJobState.put(j.getId(), rjs);
+ }
+
+ // Now handle the jobs that made it into the scheduler proper
+ Map<DuccId, IRmJob> jobs = jmu.getAllJobs();
+ Map<DuccId, HashMap<Share, Share>> shrunken = jmu.getShrunkenShares();
+ Map<DuccId, HashMap<Share, Share>> expanded = jmu.getExpandedShares();
+// for ( DuccId id : expanded.keySet() ) {
+// logger// .info(methodName, id, "Fetched these expanded shares:", expanded.get(id));
+// }
+
+ /**
+ * Convert RM internal state into the simplified externally published state.
+ */
+ for (IRmJob j : jobs.values()) {
+
+ if ( j.isRefused() ) {
+ RmJobState rjs = new RmJobState(j.getId(), j.getRefusalReason());
+ rjs.setDuccType(j.getDuccType());
+ rmJobState.put(j.getId(), rjs);
+ jobRemoved(j.getId());
+ logger.warn(methodName, j.getId(), "Refusal: ", j.getRefusalReason());
+ continue;
+ }
+
+ Map<DuccId, IResource> all_shares = new HashMap<DuccId, IResource>();
+ Map<DuccId, IResource> shrunken_shares = new HashMap<DuccId, IResource>();
+ Map<DuccId, IResource> expanded_shares = new HashMap<DuccId, IResource>();
+ Map<Share, Share> shares;
+
+ shares = j.getAssignedShares();
+ if ( shares != null ) {
+ for ( Share s : shares.values() ) {
+ Resource r = new Resource(s.getId(), s.getNodeIdentity(), s.isPurged(), s.getShareOrder());
+ all_shares.put(s.getId(), r);
+ //logger.debug(methodName, j.getId(), "Assigned:", s.toString());
+ }
+ }
+
+ shares = shrunken.get(j.getId());
+ if ( shares != null ) {
+ for ( Share s : shares.values() ) {
+ Resource r = new Resource(s.getId(), s.getNodeIdentity(), s.isPurged(), s.getShareOrder());
+ shrunken_shares.put(s.getId(), r);
+ //logger.debug(methodName, j.getId(), "Shrunken:", s.toString());
+ }
+ }
+
+ shares = expanded.get(j.getId());
+ if ( shares != null ) {
+ for ( Share s : shares.values() ) {
+ Resource r = new Resource(s.getId(), s.getNodeIdentity(), s.isPurged(), s.getShareOrder());
+ expanded_shares.put(s.getId(), r);
+ //logger.debug(methodName, j.getId(), "Expanded:", s.toString());
+ }
+ }
+
+ RmJobState rjs = new RmJobState(j.getId(), all_shares, shrunken_shares, expanded_shares);
+ rjs.setDuccType(j.getDuccType());
+ rmJobState.put(j.getId(), rjs);
+ }
+
+ previousJobState = rmJobState;
+ }
+
+ RmStateDuccEvent response = new RmStateDuccEvent(rmJobState);
+ try {
+ logger.debug(methodName, null, response.toString() );
+ } catch (Exception e) {
+ logger.error(methodName, null, e);
+ }
+
+ return response;
+
+ }
+
+}
+
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm;
+
+import java.util.Map;
+
+import org.apache.uima.ducc.common.ANodeStability;
+import org.apache.uima.ducc.common.Node;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.rm.scheduler.ISchedulerMain;
+import org.apache.uima.ducc.rm.scheduler.SchedConstants;
+
+
+public class NodeStability
+ extends ANodeStability
+ implements SchedConstants
+{
+ ISchedulerMain scheduler;
+ ResourceManagerComponent rm;
+ DuccLogger logger = DuccLogger.getLogger(NodeStability.class, COMPONENT_NAME);
+
+ public NodeStability(ResourceManagerComponent rm, int nodeStabilityLimit, int agentMetricsRate)
+ {
+ super(nodeStabilityLimit, agentMetricsRate);
+ this.rm = rm;
+ this.scheduler = rm.getScheduler();
+ }
+
+ public void nodeDeath(Map<Node, Node> nodes)
+ {
+ String methodName = "nodeDeath";
+
+ scheduler.nodeDeath(nodes);
+ for ( Node n : nodes.keySet() ) {
+ logger.debug(methodName, null, "*** ! Notification of node death:", n.getNodeIdentity().getName());
+ }
+ }
+
+ public void missedNode(Node n, int c)
+ {
+ String methodName = "missedNode";
+ logger.warn(methodName, null, "*** Missed heartbeat ***", n.getNodeIdentity().getName(), "count[", c, "]");
+ }
+
+ public void nodeArrives(Node n)
+ {
+ String methodName = "nodeArrives";
+ if ( ! rm.isSchedulerReady() ) {
+ logger.warn(methodName, null, "Ignoring node update, scheduler is still booting.");
+ return;
+ } else {
+ // logger.info(methodName, null, n.getNodeIdentity().getName());
+ scheduler.nodeArrives(n); // tell RM
+ super.nodeArrives(n); // tell heartbeat monitor
+ }
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStatus.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStatus.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStatus.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStatus.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Properties;
+
+public class NodeStatus {
+
+ private static String dir_home = System.getenv("DUCC_HOME");
+ private static String dir_resources = "resources";
+ private static String ducc_properties_filename = dir_home+File.separator+dir_resources+File.separator+"nodes.offline";
+
+ private static NodeStatus instance = new NodeStatus();
+
+ private static SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM dd HH:mm:ss zzz yyyy");
+
+ public static NodeStatus getInstance() {
+ return instance;
+ }
+
+ private Properties properties = new Properties();
+
+ public NodeStatus() {
+ load();
+ }
+
+ private String normalize(String nodeName) {
+ String retVal = nodeName;
+ if(nodeName != null) {
+ retVal = nodeName.trim();
+ }
+ return retVal;
+ }
+
+ private void load() {
+ try {
+ File file = new File(ducc_properties_filename);
+ FileInputStream fis;
+ fis = new FileInputStream(file);
+ properties.load(fis);
+ fis.close();
+ }
+ catch(Throwable t) {
+ handle(t);
+ }
+ }
+
+ private void store() {
+ try {
+ File file = new File(ducc_properties_filename);
+ FileOutputStream fos;
+ fos = new FileOutputStream(file);
+ properties.store(fos, "");
+ fos.close();
+ }
+ catch(Throwable t) {
+ handle(t);
+ }
+ }
+
+ public boolean isOffline(String nodeName) {
+ boolean retVal = false;
+ String name = normalize(nodeName);
+ if(name != null) {
+ load();
+ retVal = properties.containsKey(name);
+ }
+ return retVal;
+ }
+
+ public boolean varyOffline(String nodeName) {
+ boolean retVal = false;
+ String name = normalize(nodeName);
+ if(nodeName != null) {
+ load();
+ Date date = new Date(System.currentTimeMillis());
+ properties.put(name,""+sdf.format(date));
+ store();
+ }
+ return retVal;
+ }
+
+
+ public boolean varyOnline(String nodeName) {
+ boolean retVal = false;
+ String name = normalize(nodeName);
+ if(nodeName != null) {
+ load();
+ properties.remove(name);
+ store();
+ }
+ return retVal;
+ }
+
+ private void handle(Throwable t) {
+ t.printStackTrace();
+ }
+
+ //
+
+ private static void _query(String nodeName) {
+ NodeStatus nodeStatus = NodeStatus.getInstance();
+ String state = "offline";
+ if(!nodeStatus.isOffline(nodeName)) {
+ state = "!offline";
+ }
+ System.out.println(nodeName+"="+state);
+ }
+
+ private static void _offline(String nodeName) {
+ NodeStatus nodeStatus = NodeStatus.getInstance();
+ nodeStatus.varyOffline(nodeName);
+ }
+
+ private static void _online(String nodeName) {
+ NodeStatus nodeStatus = NodeStatus.getInstance();
+ nodeStatus.varyOnline(nodeName);
+ }
+
+ public static void main(String[] args) {
+ _offline("bluej9999");
+ _online("bluej8888");
+ _offline("bluej7777");
+ _online("bluej7777");
+ _query("bluej7777");
+ _query("bluej8888");
+ _query("bluej9999");
+ _query("bluej0000");
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStatus.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManager.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManager.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManager.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm;
+
+import org.apache.uima.ducc.transport.event.RmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+
+public interface ResourceManager
+{
+ public RmStateDuccEvent getState() throws Exception;
+
+ // public void onNodeMetricsUpdate(Node node);
+ // public void nodeArrives(Node node);
+
+ public void onOrchestratorStateUpdate(DuccWorkMap map);
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.camel.CamelContext;
+import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties;
+import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties.DaemonName;
+import org.apache.uima.ducc.common.component.AbstractDuccComponent;
+import org.apache.uima.ducc.common.main.DuccService;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
+import org.apache.uima.ducc.rm.scheduler.ISchedulerMain;
+import org.apache.uima.ducc.rm.scheduler.JobManagerUpdate;
+import org.apache.uima.ducc.rm.scheduler.SchedConstants;
+import org.apache.uima.ducc.rm.scheduler.Scheduler;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.RmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+
+
+public class ResourceManagerComponent
+ extends AbstractDuccComponent
+ implements ResourceManager,
+ SchedConstants,
+ Runnable
+{
+ private static DuccLogger logger = DuccLogger.getLogger(ResourceManagerComponent.class, COMPONENT_NAME);
+
+ int nodeStability; // number of heartbeats from agent metrics we are allowed to miss before purging node
+ int initStability; // number of heartbeats from agent metrics we must wait for during init befor starting
+ int nodeMetricsUpdateRate;
+ boolean schedulerReady = false;
+
+ ISchedulerMain scheduler;
+ JobManagerConverter converter;
+
+ // These guys are used to manage my own epoch
+ int schedulingRatio = 6;
+ int schedulingEpoch = 60000;
+ DuccEventDispatcher eventDispatcher;
+ String stateEndpoint;
+
+ public ResourceManagerComponent(CamelContext context) {
+ super("ResourceManager", context);
+ this.scheduler = new Scheduler();
+ }
+
+ public ISchedulerMain getScheduler()
+ {
+ return this.scheduler;
+ }
+
+ public boolean isSchedulerReady()
+ {
+ return schedulerReady;
+ }
+
+ public void start(DuccService service, String[] args)
+ throws Exception
+ {
+ super.start(service, args);
+ DuccDaemonRuntimeProperties.getInstance().boot(DaemonName.ResourceManager,getProcessJmxUrl());
+
+ converter = new JobManagerConverter(scheduler);
+
+ initStability = SystemPropertyResolver.getIntProperty("ducc.rm.init.stability", DEFAULT_INIT_STABILITY_COUNT);
+ nodeStability = SystemPropertyResolver.getIntProperty("ducc.rm.node.stability", DEFAULT_STABILITY_COUNT);
+ nodeMetricsUpdateRate = SystemPropertyResolver.getIntProperty("ducc.agent.node.metrics.publish.rate", DEFAULT_NODE_METRICS_RATE);
+ schedulingRatio = SystemPropertyResolver.getIntProperty("ducc.rm.state.publish.ratio", DEFAULT_SCHEDULING_RATIO);
+ schedulingEpoch = SystemPropertyResolver.getIntProperty("ducc.rm.state.publish.rate", DEFAULT_SCHEDULING_RATE);
+ scheduler.init();
+
+ startStabilityTimer();
+
+ // Start the main processing loop
+ Thread rmThread = new Thread(this);
+ rmThread.setDaemon(true);
+ rmThread.start();
+
+ schedulerReady = true;
+ }
+
+ public RmStateDuccEvent getState() throws Exception
+ {
+ String methodName = "getState";
+ JobManagerUpdate jobManagerUpdate = null;
+
+ try {
+ logger.info(methodName, null, "-------------------- Entering scheduling loop --------------------");
+ jobManagerUpdate = scheduler.schedule();
+ logger.info(methodName, null, "-------------------- Scheduling loop returns --------------------");
+ } catch (Exception e1) {
+ logger.error(methodName, null, "Error running scheduler:", e1);
+ }
+
+ try {
+ if ( jobManagerUpdate != null ) {
+ return converter.createState(jobManagerUpdate);
+ }
+ } catch ( Exception e ) {
+ logger.error(methodName, null, "Error converting state for Orchestrator", e);
+ }
+ return null;
+ }
+
+ public void setTransportConfiguration(DuccEventDispatcher eventDispatcher, String endpoint)
+ {
+ this.eventDispatcher = eventDispatcher;
+ this.stateEndpoint = endpoint;
+ }
+
+ public void run()
+ {
+ while ( true ) {
+ runScheduler();
+ }
+ }
+
+ long epoch_counter = 0;
+ public void runScheduler()
+ //public void runScheduler()
+ {
+ String methodName = "runScheduler";
+ JobManagerUpdate jobManagerUpdate;
+
+ while ( true ) {
+
+ try {
+ Thread.sleep(schedulingEpoch); // and linger a while
+ //wait();
+ } catch (InterruptedException e) {
+ logger.info(methodName, null, "Scheduling wait interrupted, executing out-of-band epoch.");
+ }
+
+ try {
+ // logger.info(methodName, null, "Publishing RM state to", stateEndpoint);
+ logger.info(methodName, null, "--------", ++epoch_counter, "------- Entering scheduling loop --------------------");
+
+ jobManagerUpdate = scheduler.schedule();
+ if ( jobManagerUpdate != null ) { // returns null while waiting for node stability
+ RmStateDuccEvent ev = converter.createState(jobManagerUpdate);
+ eventDispatcher.dispatch(stateEndpoint, ev, ""); // tell the world what is scheduled (note empty string)
+ }
+
+ logger.info(methodName, null, "--------", epoch_counter, "------- Scheduling loop returns --------------------");
+ } catch (Throwable e1) {
+ logger.fatal(methodName, null, e1);
+ }
+
+ }
+ }
+
+// public void nodeArrives(Node n)
+// {
+// String methodName = "nodeArrives";
+// try {
+// if ( ! schedulerReady ) {
+// logger.warn(methodName, null, "Ignoring node update, scheduler is still booting.");
+// return;
+// }
+
+// scheduler.nodeArrives(n);
+// } catch ( Exception e ) {
+// logger.error(methodName, null, "Exception processing Agent event for node", n, ":\n", e);
+// }
+// }
+
+ int stabilityCount = 0;
+ Timer stabilityTimer = new Timer();
+ protected void startStabilityTimer()
+ {
+ String methodName = "startStabilityTimer";
+ logger.info(methodName, null, "Starting stability timer[", nodeMetricsUpdateRate, "] init stability[", initStability, "]");
+ stabilityTimer.schedule(new StabilityTask(), nodeMetricsUpdateRate);
+ }
+
+ private class StabilityTask
+ extends TimerTask
+ {
+ public void run()
+ {
+ if ( ++stabilityCount < initStability ) {
+ logger.info("NodeStability", null, "NodeStability wait: Countdown", stabilityCount, ":", initStability);
+ stabilityTimer.schedule(new StabilityTask(), nodeMetricsUpdateRate);
+ } else {
+ stabilityTimer = null; // done with it, discard it
+ scheduler.start();
+ logger.info("NodeStability", null, "Initial node stability reached: scheduler started.");
+ }
+ }
+ }
+
+ public void onOrchestratorStateUpdate(DuccWorkMap map)
+ {
+ String methodName = "onJobManagerStateUpdate";
+ try {
+ logger.info(methodName, null, "-------> OR state arrives");
+ converter.eventArrives(map);
+ //if ( ((epoch_counter++) % schedulingRatio) == 0 ) {
+ // notify();
+ //}
+ } catch ( Throwable e ) {
+ logger.error(methodName, null, "Excepton processing Orchestrator event:", e);
+ }
+ }
+
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/config/ResourceManagerConfiguration.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/config/ResourceManagerConfiguration.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/config/ResourceManagerConfiguration.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/config/ResourceManagerConfiguration.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.config;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
+import org.apache.uima.ducc.rm.NodeStability;
+import org.apache.uima.ducc.rm.ResourceManager;
+import org.apache.uima.ducc.rm.ResourceManagerComponent;
+import org.apache.uima.ducc.rm.event.ResourceManagerEventListener;
+import org.apache.uima.ducc.rm.scheduler.SchedConstants;
+import org.apache.uima.ducc.transport.DuccTransportConfiguration;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+
+@Configuration
+ @Import({DuccTransportConfiguration.class,CommonConfiguration.class})
+
+ public class ResourceManagerConfiguration
+ implements SchedConstants
+ {
+ @Autowired CommonConfiguration common;
+ @Autowired DuccTransportConfiguration resourceManagerTransport;
+
+ DuccLogger logger = DuccLogger.getLogger(this.getClass(), COMPONENT_NAME);
+
+ public ResourceManagerEventListener resourceManagerDelegateListener(ResourceManagerComponent rm) {
+ ResourceManagerEventListener jmel = new ResourceManagerEventListener(rm);
+ int nodeStability = SystemPropertyResolver.getIntProperty("ducc.rm.node.stability", DEFAULT_STABILITY_COUNT);
+ int agentMetricsRate = SystemPropertyResolver.getIntProperty("ducc.agent.node.metrics.publish.rate", DEFAULT_NODE_METRICS_RATE);
+ NodeStability ns = new NodeStability(rm, nodeStability, agentMetricsRate);
+ jmel.setEndpoint(common.rmStateUpdateEndpoint);
+ jmel.setNodeStability(ns);
+ ns.start();
+ return jmel;
+ }
+
+ public RouteBuilder routeBuilderForEndpoint(final String endpoint, final ResourceManagerEventListener delegate) {
+ return new RouteBuilder() {
+ public void configure() {
+ from(endpoint)
+ .bean(delegate);
+ }
+ };
+ }
+
+
+ // test and debug only - user routeBuilderForEndpoint normally
+ public RouteBuilder routeBuilderForJmEndpoint(final String endpoint, final ResourceManagerEventListener delegate) {
+ System.out.println("Starting JM endpoint " + endpoint + " ???????????????????????");
+ return new RouteBuilder() {
+ public void configure() {
+ from(endpoint)
+ .threads(10)
+ .bean(delegate);
+ }
+ };
+ }
+
+ @Bean
+ public ResourceManagerComponent resourceManager()
+ throws Throwable
+ {
+ String methodName = "resourceManager";
+ ResourceManagerComponent rm = null;
+ try {
+ rm = new ResourceManagerComponent(common.camelContext());
+
+ // rm.init();
+
+ rm.setTransportConfiguration(resourceManagerTransport.duccEventDispatcher(common.rmStateUpdateEndpoint, rm.getContext()),
+ common.rmStateUpdateEndpoint);
+
+ // Instantiate Resource Manager delegate listener. This listener will receive
+ // incoming messages.
+ ResourceManagerEventListener delegateListener = this.resourceManagerDelegateListener(rm);
+
+ // Inject a dispatcher into the listener in case it needs to send
+ // a message to another component.
+ delegateListener.setDuccEventDispatcher(resourceManagerTransport.duccEventDispatcher(common.orchestratorStateUpdateEndpoint,rm.getContext()));
+
+ // Inject Camel Router that will generate state updates at regular intervals
+ // jrc rm.getContext().addRoutes(this.routeBuilderForRMStateUpdate(rm, common.rmStateUpdateEndpoint, Integer.parseInt(common.rmStatePublishRate)));
+
+ // Inject Camel Router that will handle Job Manager state update messages
+ // rm.getContext().addRoutes(this.routeBuilderForEndpoint(common.orchestratorStateUpdateEndpoint, delegateListener));
+ rm.getContext().addRoutes(this.routeBuilderForEndpoint(common.orchestratorAbbreviatedStateUpdateEndpoint, delegateListener));
+
+ // Inject Camel Router that will handle Agent Node inventory update messages
+ // rm.getContext().addRoutes(this.routeBuilderForEndpoint(common.nodeInventoryEndpoint,delegateListener));
+
+ // Inject Camel Router that will handle Node Metrics messages
+ rm.getContext().addRoutes(this.routeBuilderForEndpoint(common.nodeMetricsEndpoint, delegateListener));
+
+ return rm;
+ } catch ( Throwable t ) {
+ logger.fatal(methodName, null, t);
+ throw new IllegalStateException("Can't start RM: " + t.getMessage());
+ }
+
+ }
+
+ public class ResourceManagerStateUpdateProcessor implements Processor {
+ private ResourceManager resourceManager;
+ public ResourceManagerStateUpdateProcessor(ResourceManager resourceManager) {
+ this.resourceManager = resourceManager;
+ }
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setBody(resourceManager.getState()); //new RmStateDuccEvent());
+ }
+
+ }
+
+ public class NodeInventoryProcessor implements Processor {
+
+ public void process(Exchange exchange) throws Exception {
+ // System.out.println("... transport received Event. Body Type:"+exchange.getIn().getBody().getClass().getName());
+ //Destination replyTo = exchange.getIn().getHeader("JMSReplyTo", Destination.class);
+ // System.out.println("... transport - value of replyTo:" + replyTo);
+ }
+
+ }
+
+ public class NodeMetricsProcessor implements Processor {
+
+ public void process(Exchange exchange) throws Exception {
+ // System.out.println("... transport received Event. Body Type:"+exchange.getIn().getBody().getClass().getName());
+ //Destination replyTo = exchange.getIn().getHeader("JMSReplyTo", Destination.class);
+ // System.out.println("... transport - value of replyTo:" + replyTo);
+ }
+
+ }
+
+ }
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/config/ResourceManagerConfiguration.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/ResourceManagerEventListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/ResourceManagerEventListener.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/ResourceManagerEventListener.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/ResourceManagerEventListener.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.event;
+
+import org.apache.camel.Body;
+import org.apache.uima.ducc.common.ANodeStability;
+import org.apache.uima.ducc.rm.ResourceManager;
+import org.apache.uima.ducc.rm.scheduler.SchedConstants;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.NodeInventoryUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.NodeMetricsUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.OrchestratorAbbreviatedStateDuccEvent;
+import org.apache.uima.ducc.transport.event.OrchestratorStateDuccEvent;
+import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
+
+
+public class ResourceManagerEventListener
+ implements DuccEventDelegateListener,
+ SchedConstants
+{
+ //private static DuccLogger logger = DuccLogger.getLogger(ResourceManagerEventListener.class, COMPONENT_NAME);
+
+ private String targetEndpoint;
+ private ResourceManager rm;
+ private DuccEventDispatcher eventDispatcher;
+ private ANodeStability nodeStability;
+
+ public DuccEventDispatcher getEventDispatcher() {
+ return eventDispatcher;
+ }
+
+ public void setEventDispatcher(DuccEventDispatcher eventDispatcher) {
+ this.eventDispatcher = eventDispatcher;
+ }
+
+ public void setNodeStability(ANodeStability ns)
+ {
+ this.nodeStability = ns;
+ }
+
+ public String getTargetEndpoint() {
+ return targetEndpoint;
+ }
+
+ public void setTargetEndpoint(String targetEndpoint) {
+ this.targetEndpoint = targetEndpoint;
+ }
+
+ public ResourceManagerEventListener(ResourceManager rm)
+ {
+ this.rm = rm;
+ }
+
+ public void setDuccEventDispatcher( DuccEventDispatcher eventDispatcher )
+ {
+ this.eventDispatcher = eventDispatcher;
+ }
+
+ public void setEndpoint( String endpoint )
+ {
+ this.targetEndpoint = endpoint;
+ }
+
+ /**
+ * Receives {@code NodeMetricsUpdateDuccEvent} events from transport.
+ *
+ * @param duccEvent
+ * @throws Exception
+ */
+ public void onNodeMetricsEvent(@Body NodeMetricsUpdateDuccEvent duccEvent) throws Exception
+ {
+ //rm.nodeArrives(duccEvent.getNode());
+ nodeStability.nodeArrives(duccEvent.getNode());
+ }
+
+ public void onNodeInventoryUpdateEvent(@Body NodeInventoryUpdateDuccEvent duccEvent) throws Exception {
+ }
+
+ /**
+ * Receives {@code OrchestratorDuccEvent} events from transport.
+ *
+ * @param duccEvent
+ * @throws Exception
+ */
+ public void onOrchestratorStateUpdateEvent(@Body OrchestratorStateDuccEvent duccEvent) throws Exception
+ {
+ //String methodName = "onOrchestratorStateUpdateEvent";
+ //logger.info(methodName, null, "Event arrives");
+ rm.onOrchestratorStateUpdate(duccEvent.getWorkMap());
+ }
+
+ /**
+ * Receives {@code OrchestratedAbbreviatedDuccEvent} events from transport.
+ *
+ * @param duccEvent
+ * @throws Exception
+ */
+ public void onOrchestratedAbbreviatedStateUpdateEvent(@Body OrchestratorAbbreviatedStateDuccEvent duccEvent) throws Exception
+ {
+ //String methodName = "onOrchestratorAbbreviatedStateUpdateEvent";
+ //logger.info(methodName, null, "Event arrives");
+ rm.onOrchestratorStateUpdate(duccEvent.getWorkMap());
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/ResourceManagerEventListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IEntity.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IEntity.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IEntity.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IEntity.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import java.util.Comparator;
+
+
+/**
+ * A SchedulingEntity is one of a collection of similar objects that compete for the same set
+ * of resources. For instance -
+ *
+ * The collection of ResourceClasses is a set of ScheduingEntities competing for all the resources
+ * that are still left after scheduling higher priority work.
+ *
+ * A collection of Users is a set of SchedulingEntities competing for all the resouces assigned
+ * to a specific ResourceClass.
+ *
+ * A collection of Jobs is a set of SchedulingEntities competing for all the resources assigned to
+ * a specific user.
+ *
+ * Note that this interface does NOT fully define any concrete entity so we don't have or want a
+ * full beany-like get/set interface.
+ */
+interface IEntity
+{
+ int getShareWeight(); // the fair-share weight of this entity within its collection
+ // the setter isn't required, must come some other way
+
+ String getName(); // the name / id of the entity (for messages)
+ // the setter isn't required, name must be set some other way
+
+ void initWantedByOrder(ResourceClass rc);
+ int[] getWantedByOrder(); // the number of resources of each order wanted by this entity
+ // setter isn't required, often an entity will produce this by
+ // calculation anyway
+
+ int[] getGivenByOrder(); // the number of resources actually allocated after scheduling for this entity.
+ void setGivenByOrder(int[] gbo); // the scheduler uses this to set the allocation after each
+ // scheduling round
+
+ int calculateCap(int order, int basis); // The entity must work out any caps that may restrict the counts
+
+ long getTimestamp(); // for tiebreaks
+
+ Comparator<IEntity> getApportionmentSorter();
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IEntity.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IJobManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IJobManager.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IJobManager.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IJobManager.java Wed Jan 2 19:43:37 2013
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import org.apache.uima.ducc.transport.event.rm.IRmStateEvent;
+
+/**
+ *Interface to the job manager
+ */
+
+public interface IJobManager
+{
+ IRmStateEvent createState(JobManagerUpdate jmu);
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IJobManager.java
------------------------------------------------------------------------------
svn:eol-style = native