You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:43:38 UTC

svn commit: r1427967 [1/5] - in /uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src: main/ main/java/ main/java/org/ main/java/org/apache/ main/java/org/apache/uima/ main/java/org/apache/uima/ducc/ main/java/org/apache/uima/ducc/rm/ main/java/org/apache/uim...

Author: cwiklik
Date: Wed Jan  2 19:43:37 2013
New Revision: 1427967

URL: http://svn.apache.org/viewvc?rev=1427967&view=rev
Log:
UIMA-2491

Added:
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStatus.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManager.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/config/
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/config/ResourceManagerConfiguration.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/ResourceManagerEventListener.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IEntity.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IJobManager.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IScheduler.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/JobManagerUpdate.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/README   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedConstants.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedInternalError.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingException.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedulingUpdate.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ServerException.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/User.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/WorkItem.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/resources/
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/test/
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/test/java/
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/test/resources/

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,812 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.utils.DuccCollectionUtils;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapDifference;
+import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapValueDifference;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.rm.scheduler.IJobManager;
+import org.apache.uima.ducc.rm.scheduler.IRmJob;
+import org.apache.uima.ducc.rm.scheduler.ISchedulerMain;
+import org.apache.uima.ducc.rm.scheduler.JobManagerUpdate;
+import org.apache.uima.ducc.rm.scheduler.Machine;
+import org.apache.uima.ducc.rm.scheduler.ResourceClass;
+import org.apache.uima.ducc.rm.scheduler.RmJob;
+import org.apache.uima.ducc.rm.scheduler.SchedConstants;
+import org.apache.uima.ducc.rm.scheduler.SchedulingException;
+import org.apache.uima.ducc.rm.scheduler.Share;
+import org.apache.uima.ducc.transport.event.RmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+import org.apache.uima.ducc.transport.event.common.IDuccPerWorkItemStatistics;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
+import org.apache.uima.ducc.transport.event.common.IDuccReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
+import org.apache.uima.ducc.transport.event.common.IDuccSchedulingInfo;
+import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
+import org.apache.uima.ducc.transport.event.common.IDuccWork;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkExecutable;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.ITimeWindow;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+import org.apache.uima.ducc.transport.event.rm.IResource;
+import org.apache.uima.ducc.transport.event.rm.IRmJobState;
+import org.apache.uima.ducc.transport.event.rm.Resource;
+import org.apache.uima.ducc.transport.event.rm.RmJobState;
+
+
+/**
+ * Convert the scheduler's structures into the events that get returned to the world.
+ */
+
+public class JobManagerConverter
+    implements IJobManager,
+    	SchedConstants
+{
+    DuccLogger logger = DuccLogger.getLogger(JobManagerConverter.class, COMPONENT_NAME);
+    ISchedulerMain scheduler;
+    
+    DuccWorkMap localMap = null;
+    JobManagerUpdate lastJobManagerUpdate = new JobManagerUpdate();
+
+    Map<IRmJob, IRmJob> refusedJobs = new HashMap<IRmJob, IRmJob>();
+
+    public JobManagerConverter(ISchedulerMain scheduler)
+    {
+        this.scheduler = scheduler;
+        this.localMap = new DuccWorkMap();
+        DuccLogger.setUnthreaded();
+    }
+
+    int toInt(String s, int deflt)
+    {
+        try {
+            int val = Integer.parseInt(s);
+            return ( val == 0 ) ? deflt : val;
+        } catch ( Throwable t ) {
+            return deflt;
+        }
+    }
+  
+    void refuse(IRmJob j, String reason)
+    {
+        j.refuse(reason);
+        synchronized(refusedJobs) {
+            refusedJobs.put(j, j);
+        }
+    }
+
+//    void formatSchedulingInfo(DuccId id, IDuccSchedulingInfo si, int remaining_work)
+//    {
+//    	String methodName = "formatSchedulingInfo";
+//        SynchronizedDescriptiveStatistics stats = si.getPerWorkItemProcessingTime();        
+//        double arith_mean = stats.getMean();
+//        double geom_mean = stats.getGeometricMean();
+//        double[] vals = stats.getSortedValues();
+//        
+//        logger.info(methodName, null, id, "STATS: arithmetic mean:", arith_mean);
+//        logger.info(methodName, null, id, "STATS: geometric  mean:", geom_mean);
+//        logger.info(methodName, null, id, "STATS: remaining  work:", remaining_work);
+//        logger.info(methodName, null, id, "STATS: nvals          :", vals.length);
+//        
+//        if ( vals.length > 0 ) {
+//            StringBuffer buf = new StringBuffer();
+//            int cnt = 0;
+//         
+//            for ( int i = 0; i < vals.length; i++ ) {
+//                buf.append(Double.toString(vals[i]));
+//                if ( (++cnt) % 10 == 0 ) {
+//                    buf.append("\n");
+//                } else {
+//                    buf.append(" ");
+//                }
+//            }
+//            logger.info(methodName, null, id, "STATS: vals:\n", buf.toString());
+//        }
+//
+//    }
+
+    /**
+     * Update scheduler internal job structure with updated data from arriving job state.
+     */
+    void jobUpdate(IDuccWork job)
+    {
+    	String methodName = "jobUpate";
+        IDuccSchedulingInfo si = job.getSchedulingInfo();
+
+        DuccId jobid = job.getDuccId();
+        IRmJob j = scheduler.getJob(jobid);
+        if ( j == null ) {
+            // this can happen right when the job is submitted, if we haven't yet called
+            // the scheduler to deal with it.  just ignore, but take note.
+            // logger.info(methodName, jobid, "**** Cannot find job to update! ****");
+            return;
+        } else {            
+            int total_work     = toInt(si.getWorkItemsTotal(), scheduler.getDefaultNTasks());
+            int completed_work = toInt(si.getWorkItemsCompleted(), 0)  + toInt(si.getWorkItemsError(), 0);
+
+            int max_shares     = toInt(si.getSharesMax(), Integer.MAX_VALUE);
+            int existing_max_shares = j.getMaxShares();
+
+            // we need at least 1 if the job isn't reported complete, in case the user forgot to set the
+            // work item count.  the job will run, but slowly in that case.
+            int remaining_work = Math.max(total_work - completed_work, 1);
+
+            logger.info(methodName, job.getDuccId(), 
+                        String.format("total_work: %5d items completed: %5s items error %3s remaining work %5d",
+                                      total_work,  
+                                      si.getWorkItemsCompleted(),    // note this comes in as string (!) from OR
+                                      si.getWorkItemsError(),        // also string
+                                      remaining_work
+                                      ));
+
+            if ( max_shares != existing_max_shares ) {
+                j.setMaxShares(max_shares);
+                logger.info(methodName, job.getDuccId(), "Max shares adjusted from", existing_max_shares, "to", max_shares, "(incoming)",
+                            si.getSharesMax());
+            } 
+                
+            double arith_mean = Double.NaN;
+            IDuccPerWorkItemStatistics stats = si.getPerWorkItemStatistics();        
+            if(stats != null) {
+            	arith_mean = stats.getMean();
+            }
+            j.setNQuestions(total_work, remaining_work, arith_mean);
+
+            // formatSchedulingInfo(job.getDuccId(), si, remaining_work);
+
+            if ( job instanceof IDuccWorkJob ) {
+                if ( j.setInitWait( ((IDuccWorkJob) job).isRunnable()) ) {
+                    logger.debug(methodName, jobid, "Set Initialized.");
+                    scheduler.signalInitialized(j);
+                }
+            } else {
+                j.setInitWait(true);                           // pop is always ready to go
+            }            
+        }
+    }
+
+    /**
+     * NOTE: If this returns false, it maust also refuse().
+     */
+    private boolean receiveExecutable(IRmJob j, IDuccWork job)
+    {
+    	String methodName = "receiveExecutable";
+        IDuccWorkExecutable de = (IDuccWorkExecutable) job;
+        IDuccProcessMap     pm = de.getProcessMap();
+
+        if ( (pm.size() > 0) && !job.isCompleted() ) {          // need to recover, apparently RM crashed. hmph.
+            for ( IDuccProcess proc : pm.values() ) {          // build up Shares from the incoming state
+
+                ProcessState state = proc.getProcessState();                
+                String pid = proc.getPID();                        
+                NodeIdentity ni = proc.getNodeIdentity();
+
+                if ( proc.isComplete() ) {
+                    logger.debug(methodName, j.getId(), "Skipping process", pid, "on", ni.getName(), "beacause state is", state);
+                    continue;
+                 }
+
+                Machine m = scheduler.getMachine(ni);
+                if ( m == null ) {                             // not known, huh? maybe next epoch it will have checked in
+                    refuse(j, "Cannot restore job because node " + ni.getName()  + " is unknown.");
+                    return false;                              // so we don't add it to global tables
+                } else {
+                    DuccId id = proc.getDuccId();
+                    Share   s = new Share(id, m, j, m.getShareOrder());        // guess share order; scheduler will reset when it recovers job
+                    long mem = proc.getResidentMemory();
+
+                    logger.info(methodName, j.getId(), "Assigning share in state", state, "pid", pid, "for recovery", s.toString());
+                    j.recoverShare(s);
+                    s.update(j.getId(), mem, state, proc.getTimeWindowInit(), proc.getTimeWindowRun(), pid);                    
+                }
+            }
+            logger.info(methodName, j.getId(), "Scheduling for recovery.");
+            scheduler.signalRecovery(j);
+        } else {
+            logger.info(methodName, j.getId(), "Scheduling as new.");
+            scheduler.signalNewWork(j);
+        }
+        return true;
+    }
+
+    /**
+     * NOTE: If this returns false, it maust also refuse().
+     */
+    private boolean receiveReservation(IRmJob j, IDuccWork job)
+    {
+    	String methodName = "receiveReservation";
+        j.setReservation();
+
+        IDuccWorkReservation dr = (IDuccWorkReservation) job;
+        IDuccReservationMap rm = dr.getReservationMap();
+        if ( (rm.size() > 0) && !job.isCompleted() ) {          // need to recover, apparently RM crashed. hmph.
+            for ( IDuccReservation res : rm.values() ) {       // build up Shares from the incoming state
+                NodeIdentity ni = res.getNodeIdentity();
+                Machine m = scheduler.getMachine(ni);
+                if ( m == null ) {                             // not known, huh? maybe next epoch it will have checked in
+                    refuse(j, "Cannot restore reservation because node " + ni.getName() + " is unknown.");
+                    return false;                              // so we don't add it to global tables
+                } else {
+                    DuccId id = res.getDuccId();
+                    Share   s = new Share(id, m, j, m.getShareOrder());
+                    s.setFixed();
+                    j.recoverShare(s);
+                    logger.debug(methodName, j.getId(), "Assigning share for recovery", s.toString());
+                }
+            }
+            logger.info(methodName, j.getId(), "Scheduling for recovery.");
+            scheduler.signalRecovery(j);
+        } else {
+            logger.info(methodName, j.getId(), "Scheduling as new.");
+            scheduler.signalNewWork(j);
+        }
+        return true;
+    }
+
+    /**
+     * Convert a JobManager Job into a ResourceManager RmJob.  We assume this job is NOT in
+     * our lists.
+     *
+     * NOTE IMPORTANT NOTE
+     *
+     *    Until Lou's job contains all required scheduling fields I do a conversion that enhances
+     *    what I receive with defaults so the scheduler can actually schedule the job.
+     *
+     * NOTE IMPORTANT NOTE
+     *
+     * @param job
+     */
+    boolean jobArrives(IDuccWork job)
+    {
+    	String methodName = "jobArrives";
+        logger.debug(methodName, job.getDuccId(), "Job arives");
+        logger.debug(methodName, job.getDuccId(), "Job is of type", job.getDuccType());
+
+        // Properties props = new Properties();
+        
+        // Set<String> keys = props.stringPropertyNames();
+        // for ( String k : keys ) {
+        //     logger.debug("methodName", job.getDuccId(), "Next property [", k, ", ", props.getProperty(k), "]");
+        // }
+        
+        // Properties rmProps = new DuccProperties();
+        // for ( int i = 0; i < requiredProperties.length; i++ ) {
+        //     String v = props.getProperty(requiredProperties[i]);
+        //     if ( v == null ) {
+        //         v = defaultValues[i];
+        //     }
+        //     rmProps.setProperty(rmProperties[i], v);
+        // }
+        // IRmJob j = new RmJob(job.getDuccId(), rmProps);
+
+        // Convert Lou's structure into mine.
+        IRmJob j = new RmJob(job.getDuccId());
+        
+        IDuccSchedulingInfo si = job.getSchedulingInfo();
+        IDuccStandardInfo   sti = job.getStandardInfo();
+        
+        String name       = sti.getDescription();
+        String user_name  = sti.getUser();
+        j.setUserName(user_name);
+        j.setJobName(name);
+
+        int min_shares    = toInt(si.getSharesMin(), 0);
+        int threads       = toInt(si.getThreadsPerShare(), scheduler.getDefaultNThreads());
+        int user_priority = toInt(si.getSchedulingPriority(), 100);
+
+        int total_work    =  toInt(si.getWorkItemsTotal(), scheduler.getDefaultNTasks());
+        int completed_work = toInt(si.getWorkItemsCompleted(), 0);
+        int remaining_work = Math.max(total_work - completed_work, 1);  // never let this go 0 or negative - both cases
+                                                                        // are (probably user) errors.
+
+        logger.info(methodName, job.getDuccId(), "total_work", total_work, "completed_work", completed_work,"remaining_work", remaining_work);
+
+        int memory        = toInt(si.getShareMemorySize(), scheduler.getDefaultMemory());
+        String className  = si.getSchedulingClass();
+        if ( className == null ) {
+            className = scheduler.getDefaultClassName();
+        }
+
+        j.setMinShares(min_shares);
+        j.setThreads(threads);
+        j.setUserPriority(user_priority);
+        j.setNQuestions(total_work, remaining_work, 0.0);
+        j.setClassName(className);
+
+        switch (si.getShareMemoryUnits()) {
+            case GB:
+                break;
+            default:
+                logger.warn(methodName, job.getDuccId(), "Memory units other than GB are not currently supported.  Job returned.");
+                break;
+        }
+        j.setMemory(memory);
+        j.init();
+
+        j.setTimestamp(Long.parseLong(sti.getDateOfSubmission()));
+        // logger.info(methodName, j.getId(), "SUBMISSION DATE:", subd, (new Date(subd)).toString());
+
+        if ( job instanceof IDuccWorkJob ) {
+            j.setInitWait( ((IDuccWorkJob) job).isRunnable());
+        } else {
+            j.setInitWait(true);                          // pop is always ready to go
+        }
+
+        j.setDuccType(job.getDuccType());                 // ugly and artificial but ... not going to rant here
+                                                          // it's needed so messages can be made legible
+
+        //
+        // Now: must either create a new job, or recover one that we didn't know about, on the assumption that we
+        // have just crashed and are recovering.
+        //
+        // Be SURE that if status is turned false for any reason, or if you exit early with false, that you
+        // refuse() the job.
+        //
+        boolean status = true;        
+        
+        int max_processes = 0;
+       	int max_machines = 0;	
+        ResourceClass rescl = scheduler.getResourceClass(className);
+
+        if ( rescl == null ) {
+            // ph darn, we can't continue past this point
+            refuse(j, "Cannot find priority class " + className + " for job");
+            return false;
+        }
+
+//         if ( logger.isDebug() ) {
+//             logger.debug(methodName, j.getId(),"sharesMax", si.getSharesMax());
+//                        logger.debug(methodName, j.getId(),"getInstancesCount", si.getInstancesCount());
+//                        logger.debug(methodName, j.getId(), "rescl.getMaxProcesses", rescl.getMaxProcesses());
+//                        logger.debug(methodName, j.getId(), "rescl.getMaxMachines", rescl.getMaxMachines());
+//         }
+
+        switch ( job.getDuccType() ) {
+          case Service:
+          case Pop:
+          case Job:              
+              // instance and share count are a function of the class
+              switch ( rescl.getPolicy() ) {
+                  case FAIR_SHARE:
+                      max_processes    = toInt(si.getSharesMax(), DEFAULT_PROCESSES);
+                      max_processes    = Math.min(rescl.getMaxProcesses(), max_processes);
+                      j.setMaxShares(max_processes);
+                      j.setNInstances(-1);
+                      break;
+                      
+                  case FIXED_SHARE:
+                      max_processes   = toInt(si.getSharesMax(), DEFAULT_INSTANCES);
+                      j.setMaxShares(max_processes);
+                      j.setNInstances(max_processes);
+                      break;
+                      
+                  case RESERVE:
+                      max_machines   = toInt(si.getInstancesCount(), DEFAULT_INSTANCES);
+                      j.setMaxShares(max_machines);
+                      j.setNInstances(max_machines);
+                      break;
+              }
+              
+              status = receiveExecutable(j, job);
+              logger.debug(methodName, j.getId(), "Serivce, Pop, or Job arrives, accepted:", status);
+              break;
+          case Reservation:
+              switch ( rescl.getPolicy() ) {
+                  case FIXED_SHARE:
+                      max_machines   = toInt(si.getInstancesCount(), DEFAULT_INSTANCES);
+                      break;
+                  case RESERVE:
+                      max_machines   = toInt(si.getInstancesCount(), DEFAULT_INSTANCES);
+                      break;
+              }
+                            
+              j.setMaxShares(-1);
+              j.setNInstances(max_machines);
+
+              status = receiveReservation(j, job);
+              logger.debug(methodName, j.getId(), "Reservation arrives, accepted:", status);
+              break;
+          default:
+              refuse(j, "Unknown job type: " + job.getDuccType());
+              status = false;
+              break;
+        }
+        
+//         logger.debug(methodName, j.getId(), "Max_processes:", max_processes);
+//         logger.debug(methodName, j.getId(), "Max_machines:", max_machines);
+
+        return status;
+    }
+
+    /**
+     * Our records indicate that we know about this job but JM doesn't so we purge
+     * it from the scheduler
+     * @param job
+     */
+    void jobRemoved(DuccId id)
+    {
+    	String methodName = "jobRemoved";
+        logger.debug(methodName, id, "Signalling removal");
+        scheduler.signalCompletion(id);
+        localMap.removeDuccWork(id);
+        logger.debug(methodName, id, "Remove signalled");
+    }
+
+    public void reconcileProcesses(DuccId jobid, IDuccWork l, IDuccWork r)
+    {
+    	String methodName = "reconcileProcess";
+        IDuccProcessMap lpm = ((IDuccWorkJob )l).getProcessMap();
+        IDuccProcessMap rpm = ((IDuccWorkJob)r).getProcessMap();
+
+        @SuppressWarnings("unchecked")
+        DuccMapDifference<DuccId, IDuccProcess> diffmap = DuccCollectionUtils.difference(lpm, rpm);
+
+        // new stuff in in the left side of the map
+        Map<DuccId, IDuccProcess> lproc = diffmap.getLeft();
+        
+        for ( IDuccProcess p : lproc.values() ) {
+            // look up share, update resident memory, process state, investment (eventually), maybe pid?
+            // simply update the share with the information.  we pass in the jobid as a sanity check so
+            // we can crash or at least complain loudly on mismatch.
+
+            Share s = scheduler.getShare(p.getDuccId());
+            long mem = p.getResidentMemory();
+            ProcessState state = p.getProcessState();
+            String pid = p.getPID();
+            if ( ! s.update(jobid, mem, state, p.getTimeWindowInit(), p.getTimeWindowRun(), pid) ) {
+                // TODO: probably change to just a warning and cancel the job - for now I want an attention-getter
+                throw new SchedulingException(jobid, "Process assignemnt arrives for share " + s.toString() +
+                                              " but jobid " + jobid + " does not match share " + s.getJob().getId());
+            }
+            //scheduler.signalGrowth(jobid, s);
+            // sadly, the pid is almost always null here
+            //logger.info(methodName, jobid, 
+            //            "New process arrives for share", s.toString(), "PID", pid);
+        }
+            
+        // gone stuff in in the right side of the map
+        Map<DuccId, IDuccProcess> rproc = diffmap.getRight();
+        for ( IDuccProcess p : rproc .values()) {
+            // these processes are done.  look up the job and tell it process complete.
+            Share s = scheduler.getShare(p.getDuccId());
+            IRmJob j = scheduler.getJob(jobid);
+            if ( j == null ) {
+                throw new SchedulingException(jobid, "Process completion arrives for share " + s.toString() +
+                                              " but job " + jobid + "cannot be found.");
+            }
+            scheduler.signalCompletion(j, s);
+            logger.info(methodName, jobid, 
+                         String.format("Process %5s", p.getPID()),
+                         "Completes in share", s.toString());
+        }
+
+        for( DuccMapValueDifference<IDuccProcess> pd: diffmap ) {
+            IDuccProcess pl = pd.getLeft();
+            IDuccProcess pr = pd.getRight();
+
+            Share sl = scheduler.getShare(pl.getDuccId());
+            Share sr = scheduler.getShare(pr.getDuccId());
+
+            String shareL = ( sl == null ) ? "<none>" : sl.toString();
+            String shareR = ( sr == null ) ? "<none>" : sr.toString();
+
+            ITimeWindow initL = pl.getTimeWindowInit();
+            ITimeWindow initR = pr.getTimeWindowInit();
+            long init_timeL = (initL == null) ? 0 : initL.getElapsedMillis();
+            long init_timeR = (initR == null) ? 0 : initR.getElapsedMillis();
+
+            /** extreme debugging only*/
+            if ( logger.isTrace() ) {
+                logger.trace(methodName, jobid, 
+                             "\n\tReconciling. incoming.(pid, mem, state, share, initTime)", 
+                             pl.getPID(),
+                             pl.getResidentMemory(),
+                             pl.getProcessState(),
+                             shareL,
+                             init_timeL,
+                             "\n\tReconciling. existing.(pid, mem, state, share, initTime)", 
+                             pr.getPID(),
+                             pr.getResidentMemory(),
+                             pr.getProcessState(),
+                             shareR,
+                             init_timeR
+                             );
+            } else {
+                if ( (pr.getPID() == null) && (pl.getPID() != null) ) {
+                    logger.info(methodName, jobid, 
+                                String.format("Process %5s", pl.getPID()),
+                                "PID assignement for share", shareL);
+                }
+                if ( pl.getProcessState() != pr.getProcessState() ) {
+                    logger.info(methodName, jobid, 
+                                String.format("Process %5s", pl.getPID()),
+                                "State update:", pr.getProcessState(), "-->", pl.getProcessState());
+                }
+            }
+
+            long mem = pl.getResidentMemory();
+            ProcessState state = pl.getProcessState();
+            String pid = pl.getPID();                        
+            Share s = scheduler.getShare(pl.getDuccId());
+            if ( pl.isActive() ) {
+                
+                if ( s == null ) {
+                    // this can happen if a node dies and the share is purged so it's ok.
+                    logger.warn(methodName, jobid, "Update for share from process", pl.getPID(), pl.getDuccId(), "but cannot find share.");
+                    continue;
+                }
+                
+//                 if ( s.isPurged() ) {
+//                     IRmJob j = scheduler.getJob(jobid);
+//                     scheduler.signalCompletion(j, s);
+//                     logger.info(methodName, jobid, "Process", pl.getPID(), "marked complete because it is purged. State:", state);
+//                 }
+
+                if ( ! s.update(jobid, mem, state, pl.getTimeWindowInit(), pl.getTimeWindowRun(), pid) ) {
+                    // TODO: probably change to just a warning and cancel the job - for now I want an attention-getter
+                    throw new SchedulingException(jobid, "Process update arrives for share " + s.toString() +
+                                                  " but jobid " + jobid + " does not match job in share " + s.getJob().getId());
+                }
+                // logger.debug(methodName, jobid, "Process update to process ", pid, "mem", mem, "state", state, "is assigned for share", s.toString());
+
+            } else if ( pl.isComplete() ) {
+                if ( s != null ) {              // in some final states the share is already gone, not an error (e.g. Stopped)
+                    IRmJob j = scheduler.getJob(jobid);
+                    scheduler.signalCompletion(j, s);
+                    logger.debug(methodName, jobid, "Process", pl.getPID(), " completed due to state", state);
+                }
+            } else {
+                logger.debug(methodName, jobid, "Process", pl.getPID(), "ignoring update because of state", state);
+            }
+                    
+        }            
+
+    }
+
+    public void eventArrives(DuccWorkMap jobMap)
+    {
+    	String methodName = "eventArrives";
+
+        logger.debug(methodName, null, "Got Orchestrator");
+
+        if ( jobMap.size() == 0 ) {
+            logger.debug(methodName, null, "No state from Orchestrator");
+            return;
+        }
+
+        if ( !scheduler.ready() ) {
+            logger.info(methodName, null, "Orchestrator event is discarded because scheduler is not yet ready.");
+            return;
+        }
+
+        @SuppressWarnings("unchecked")
+		DuccMapDifference<DuccId, IDuccWork> diffmap = DuccCollectionUtils.difference(jobMap, localMap);        
+
+        for ( IDuccWork w : jobMap.values() ) {
+        	//IDuccWork j = (IDuccWork) w;
+            logger.debug(methodName, w.getDuccId(), "Arrives in JmStateEvent state =", w.getStateObject());
+        }
+
+        //
+        // First handle new stuff
+        //
+        Map<DuccId, IDuccWork> jobs = diffmap.getLeft();
+        for ( IDuccWork w : jobs.values() ) {
+
+            if ( w.isSchedulable() ) {
+                logger.debug(methodName, w.getDuccId(), "Incoming, state = ", w.getStateObject());
+                try {
+                    if ( jobArrives(w) ) {                // if not ... something is fubar and we have to ignore it for now
+                        localMap.addDuccWork(w);
+                    } 
+                } catch ( Exception e ) {
+                    logger.error(methodName, w.getDuccId(), "Can't receive job because of exception", e);
+                }
+            } else {
+                logger.debug(methodName, w.getDuccId(), "Received non-schedulable job, state = ", w.getStateObject());
+            }
+        }
+        
+        jobs = diffmap.getRight();
+        for ( IDuccWork w :jobs.values() ) {
+            logger.debug(methodName, w.getDuccId(), "Gone");
+            jobRemoved(w.getDuccId());
+        }
+
+        // Todo : manage stuff that is diffs, not new, not deletions
+        for( DuccMapValueDifference<IDuccWork> jd: diffmap ) {
+            IDuccWork r = jd.getRight();
+            IDuccWork l = jd.getLeft();
+            logger.debug(methodName, l.getDuccId(), "Reconciling, incoming state = ", l.getStateObject(), " my state = ", r.getStateObject());
+
+            if ( ! l.isSchedulable() ) {
+                logger.debug(methodName, l.getDuccId(), "Removing unschedulable job state = ", r.getStateObject());
+                jobRemoved(r.getDuccId());
+            } else {
+
+                localMap.addDuccWork(l);           // still schedulable, and we already know about it, just sync the state
+
+                // 
+                // Terrible ugliness here because the states should be so hard to get a handle on.
+                //
+                // Nonetheless, if this code hits, it's because the same job keeps coming back in
+                // WaitingForResources so it's not in the left-side map.  We need to insure the
+                // scheduler runs an epoch next opportunity it has.
+                //
+                // NO LONGER NEEDED because scheduler always runs without checking if there's work to do.
+                //
+                // Object s = l.getStateObject();
+                // if ( s instanceof JobState ) {
+                //     if ( ((JobState) s)  == JobState.WaitingForResources ) {
+                //         logger.debug(methodName, l.getDuccId(), "Job: force epoch");
+                //         scheduler.signalForceEpoch();
+                //         continue;
+                //     }
+                //   }
+
+                // if ( s instanceof ReservationState ) {
+                //     if ( ((ReservationState) s)  == ReservationState.WaitingForResources ) {
+                //         logger.debug(methodName, l.getDuccId(), "Reservation: force epoch");
+                //         scheduler.signalForceEpoch();
+                //         continue;
+                //     }
+                //   }
+
+
+                switch ( l.getDuccType() ) {
+                  case Job:    
+                      jobUpdate(l);
+                      reconcileProcesses(l.getDuccId(), l, r);
+                      break;
+                  case Service:
+                  case Pop:
+                  case Reservation:
+                      // for the moment, these guyes have nothing to reconcile.
+                      break;
+                  case Undefined:
+                      throw new SchedulingException(l.getDuccId(), "Work arrives as type Undefined - should have been filtered out by now.");                      
+                }
+            }
+           
+        }
+
+        logger.debug(methodName, null, "Done with JmStateDuccEvent with some jobs processed");
+
+    }
+
+    /**
+     * If no state has changed, we just resend that last one.
+     */
+    Map<DuccId, IRmJobState> previousJobState = new HashMap<DuccId, IRmJobState>();
+
+    /**
+     * Here's where we make a IRmStateEvent from the JobManagerUpdate so the caller can publish it.
+     */    
+    public RmStateDuccEvent createState(JobManagerUpdate jmu)
+    {
+        String methodName = "createState";
+        //ArrayList<IRmJobState> rmJobState = null;
+        Map<DuccId, IRmJobState> rmJobState = null;
+
+
+        if ( jmu == null ) {                     // no changes
+            logger.debug(methodName, null, "Publishing old RM state");
+            rmJobState = previousJobState;
+        } else {
+            // TODO: create a new rmJobState; remember to set previousJobState
+            logger.debug(methodName, null, "Publishing new RM state");
+
+            rmJobState = new HashMap<DuccId, IRmJobState>();
+
+            // Must handle all jobs that ar refused here in JMC because nobody else knows about them
+            Map<IRmJob, IRmJob> refused = new HashMap<IRmJob, IRmJob>();
+            synchronized(refusedJobs) {
+                refused.putAll(refusedJobs);
+                refusedJobs.clear();
+            }
+            
+            for ( IRmJob j : refused.values() ) {
+                RmJobState rjs = new RmJobState(j.getId(), j.getRefusalReason());
+                rjs.setDuccType(j.getDuccType());
+                rmJobState.put(j.getId(), rjs);
+            }
+
+            // Now handle the jobs that made it into the scheduler proper
+            Map<DuccId, IRmJob> jobs = jmu.getAllJobs();
+            Map<DuccId, HashMap<Share, Share>> shrunken = jmu.getShrunkenShares();
+            Map<DuccId, HashMap<Share, Share>> expanded = jmu.getExpandedShares();
+//          for ( DuccId id : expanded.keySet() ) {
+//              logger// .info(methodName, id, "Fetched these expanded shares:", expanded.get(id));
+//          }
+
+            /**
+             * Convert RM internal state into the simplified externally published state.
+             */
+            for (IRmJob j : jobs.values()) {
+
+                if ( j.isRefused() ) {
+                    RmJobState rjs = new RmJobState(j.getId(), j.getRefusalReason());
+                    rjs.setDuccType(j.getDuccType());
+                    rmJobState.put(j.getId(), rjs);
+                    jobRemoved(j.getId());
+                    logger.warn(methodName, j.getId(), "Refusal: ", j.getRefusalReason());
+                    continue;
+                }
+
+                Map<DuccId, IResource> all_shares      = new HashMap<DuccId, IResource>();
+                Map<DuccId, IResource> shrunken_shares = new HashMap<DuccId, IResource>();
+                Map<DuccId, IResource> expanded_shares = new HashMap<DuccId, IResource>();
+                Map<Share, Share> shares;
+
+                shares = j.getAssignedShares();
+                if ( shares != null ) {
+                    for ( Share s : shares.values() ) {
+                        Resource r = new Resource(s.getId(), s.getNodeIdentity(), s.isPurged(), s.getShareOrder());
+                        all_shares.put(s.getId(), r);
+                        //logger.debug(methodName, j.getId(), "Assigned:", s.toString());
+                    }
+                }
+
+                shares = shrunken.get(j.getId());
+                if ( shares != null ) {
+                    for ( Share s : shares.values() ) {
+                        Resource r = new Resource(s.getId(), s.getNodeIdentity(), s.isPurged(), s.getShareOrder());
+                        shrunken_shares.put(s.getId(), r);
+                        //logger.debug(methodName, j.getId(), "Shrunken:", s.toString());
+                    }
+                }
+
+                shares = expanded.get(j.getId());
+                if ( shares != null ) {
+                    for ( Share s : shares.values() ) {
+                        Resource r = new Resource(s.getId(), s.getNodeIdentity(), s.isPurged(), s.getShareOrder());
+                        expanded_shares.put(s.getId(), r);
+                        //logger.debug(methodName, j.getId(), "Expanded:", s.toString());
+                    }
+                }
+                
+                RmJobState rjs = new RmJobState(j.getId(), all_shares, shrunken_shares, expanded_shares);
+                rjs.setDuccType(j.getDuccType());
+                rmJobState.put(j.getId(), rjs);
+            }
+
+            previousJobState = rmJobState;
+        }        
+
+        RmStateDuccEvent response = new RmStateDuccEvent(rmJobState);
+        try {
+            logger.debug(methodName, null, response.toString() );            
+        } catch (Exception e) {
+            logger.error(methodName, null, e);
+        }
+        
+        return response;
+
+    }
+
+}
+

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm;
+
+import java.util.Map;
+
+import org.apache.uima.ducc.common.ANodeStability;
+import org.apache.uima.ducc.common.Node;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.rm.scheduler.ISchedulerMain;
+import org.apache.uima.ducc.rm.scheduler.SchedConstants;
+
+
+public class NodeStability
+    extends ANodeStability
+    implements SchedConstants
+{
+    ISchedulerMain scheduler;
+    ResourceManagerComponent rm;
+    DuccLogger     logger = DuccLogger.getLogger(NodeStability.class, COMPONENT_NAME);
+
+    public NodeStability(ResourceManagerComponent rm, int nodeStabilityLimit, int agentMetricsRate)
+    {        
+        super(nodeStabilityLimit, agentMetricsRate);
+        this.rm = rm;
+        this.scheduler = rm.getScheduler();
+    }
+
+    public void nodeDeath(Map<Node, Node> nodes)
+    {
+        String methodName = "nodeDeath";
+
+        scheduler.nodeDeath(nodes);
+        for ( Node n : nodes.keySet() ) {
+            logger.debug(methodName, null, "*** ! Notification of node death:", n.getNodeIdentity().getName());
+        }
+    }
+
+    public void missedNode(Node n, int c)
+    {
+    	String methodName = "missedNode";
+        logger.warn(methodName, null, "*** Missed heartbeat ***", n.getNodeIdentity().getName(), "count[", c, "]");
+    }
+
+    public void nodeArrives(Node n)
+    {
+    	String methodName = "nodeArrives";
+        if ( ! rm.isSchedulerReady() ) {
+            logger.warn(methodName, null, "Ignoring node update, scheduler is still booting.");
+            return;
+        } else {
+            // logger.info(methodName, null, n.getNodeIdentity().getName());
+            scheduler.nodeArrives(n);          // tell RM
+            super.nodeArrives(n);              // tell heartbeat monitor
+        }
+    }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStatus.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStatus.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStatus.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStatus.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Properties;
+
+public class NodeStatus {
+
+	private static String dir_home = System.getenv("DUCC_HOME");
+	private static String dir_resources = "resources";
+	private static String ducc_properties_filename = dir_home+File.separator+dir_resources+File.separator+"nodes.offline";
+	
+	private static NodeStatus instance = new NodeStatus();
+	
+	private static SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM dd HH:mm:ss zzz yyyy");
+	
+	public static NodeStatus getInstance() {
+		return instance;
+	}
+	
+	private Properties properties = new Properties();
+	
+	public NodeStatus() {
+		load();
+	}
+	
+	private String normalize(String nodeName) {
+		String retVal = nodeName;
+		if(nodeName != null) {
+			retVal = nodeName.trim();
+		}
+		return retVal;
+	}
+	
+	private void load() {
+		try {
+			File file = new File(ducc_properties_filename);
+			FileInputStream fis;
+			fis = new FileInputStream(file);
+			properties.load(fis);
+			fis.close();
+		} 
+		catch(Throwable t) {
+			handle(t);
+		} 
+	}
+	
+	private void store() {
+		try {
+			File file = new File(ducc_properties_filename);
+			FileOutputStream fos;
+			fos = new FileOutputStream(file);
+			properties.store(fos, "");
+			fos.close();
+		} 
+		catch(Throwable t) {
+			handle(t);
+		} 
+	}
+	
+	public boolean isOffline(String nodeName) {
+		boolean retVal = false;
+		String name = normalize(nodeName);
+		if(name != null) {
+			load();
+			retVal = properties.containsKey(name);
+		}
+		return retVal;
+	}
+	
+	public boolean varyOffline(String nodeName) {
+		boolean retVal = false;
+		String name = normalize(nodeName);
+		if(nodeName != null) {
+			load();
+	        Date date = new Date(System.currentTimeMillis());
+			properties.put(name,""+sdf.format(date));
+			store();
+		}
+		return retVal;
+	}
+	
+	
+	public boolean varyOnline(String nodeName) {
+		boolean retVal = false;
+		String name = normalize(nodeName);
+		if(nodeName != null) {
+			load();
+			properties.remove(name);
+			store();
+		}
+		return retVal;
+	}
+	
+	private void handle(Throwable t) {
+		t.printStackTrace();
+	}
+	
+	//
+	
+	private static void _query(String nodeName) {
+		NodeStatus nodeStatus = NodeStatus.getInstance();
+		String state = "offline";
+		if(!nodeStatus.isOffline(nodeName)) {
+			state = "!offline";
+		}
+		System.out.println(nodeName+"="+state);
+	}
+	
+	private static void _offline(String nodeName) {
+		NodeStatus nodeStatus = NodeStatus.getInstance();
+		nodeStatus.varyOffline(nodeName);
+	}
+	
+	private static void _online(String nodeName) {
+		NodeStatus nodeStatus = NodeStatus.getInstance();
+		nodeStatus.varyOnline(nodeName);
+	}
+	
+	public static void main(String[] args) {
+		_offline("bluej9999");
+		_online("bluej8888");
+		_offline("bluej7777");
+		_online("bluej7777");
+		_query("bluej7777");
+		_query("bluej8888");
+		_query("bluej9999");
+		_query("bluej0000");
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStatus.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManager.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManager.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManager.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm;
+
+import org.apache.uima.ducc.transport.event.RmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+
+public interface ResourceManager 
+{
+	public RmStateDuccEvent getState() throws Exception;
+
+    //	public void onNodeMetricsUpdate(Node node);
+	// public void nodeArrives(Node node);
+
+	public void onOrchestratorStateUpdate(DuccWorkMap map);
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.camel.CamelContext;
+import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties;
+import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties.DaemonName;
+import org.apache.uima.ducc.common.component.AbstractDuccComponent;
+import org.apache.uima.ducc.common.main.DuccService;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
+import org.apache.uima.ducc.rm.scheduler.ISchedulerMain;
+import org.apache.uima.ducc.rm.scheduler.JobManagerUpdate;
+import org.apache.uima.ducc.rm.scheduler.SchedConstants;
+import org.apache.uima.ducc.rm.scheduler.Scheduler;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.RmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+
+
+public class ResourceManagerComponent 
+    extends AbstractDuccComponent
+    implements ResourceManager,
+               SchedConstants,
+               Runnable
+{
+    private static DuccLogger logger = DuccLogger.getLogger(ResourceManagerComponent.class, COMPONENT_NAME);
+
+    int nodeStability;                // number of heartbeats from agent metrics we are allowed to miss before purging node
+    int initStability;                // number of heartbeats from agent metrics we must wait for during init befor starting
+    int nodeMetricsUpdateRate;
+    boolean schedulerReady = false;
+
+    ISchedulerMain scheduler;
+    JobManagerConverter converter;
+
+    // These guys are used to manage my own epoch
+    int schedulingRatio = 6;
+    int schedulingEpoch = 60000;
+    DuccEventDispatcher eventDispatcher;
+    String stateEndpoint;
+
+    public ResourceManagerComponent(CamelContext context) {
+        super("ResourceManager", context);
+        this.scheduler = new Scheduler();
+    }
+
+    public ISchedulerMain getScheduler()
+    {
+        return this.scheduler;
+    }
+
+    public boolean isSchedulerReady()
+    {
+        return schedulerReady;
+    }
+
+    public void start(DuccService service, String[] args)
+        throws Exception
+    {
+        super.start(service, args);
+        DuccDaemonRuntimeProperties.getInstance().boot(DaemonName.ResourceManager,getProcessJmxUrl());
+
+        converter = new JobManagerConverter(scheduler);
+
+        initStability         = SystemPropertyResolver.getIntProperty("ducc.rm.init.stability", DEFAULT_INIT_STABILITY_COUNT);
+        nodeStability         = SystemPropertyResolver.getIntProperty("ducc.rm.node.stability", DEFAULT_STABILITY_COUNT);
+        nodeMetricsUpdateRate = SystemPropertyResolver.getIntProperty("ducc.agent.node.metrics.publish.rate", DEFAULT_NODE_METRICS_RATE);
+        schedulingRatio       = SystemPropertyResolver.getIntProperty("ducc.rm.state.publish.ratio", DEFAULT_SCHEDULING_RATIO);
+        schedulingEpoch       = SystemPropertyResolver.getIntProperty("ducc.rm.state.publish.rate", DEFAULT_SCHEDULING_RATE);
+        scheduler.init();
+
+        startStabilityTimer();
+
+        // Start the main processing loop
+        Thread rmThread = new Thread(this);
+        rmThread.setDaemon(true);
+        rmThread.start();
+
+        schedulerReady = true;
+    }
+
+    public RmStateDuccEvent getState() throws Exception 
+    {
+        String methodName = "getState";        
+        JobManagerUpdate jobManagerUpdate = null;
+
+        try {
+            logger.info(methodName, null, "-------------------- Entering scheduling loop --------------------");
+            jobManagerUpdate = scheduler.schedule();                        
+            logger.info(methodName, null, "-------------------- Scheduling loop returns  --------------------");
+        } catch (Exception e1) {
+            logger.error(methodName, null, "Error running scheduler:", e1);
+        }
+        
+        try {
+        	if ( jobManagerUpdate != null ) { 
+        		return converter.createState(jobManagerUpdate);
+        	}
+        } catch ( Exception e ) {
+            logger.error(methodName, null, "Error converting state for Orchestrator", e);
+        }
+        return null;
+    }
+
+    public void setTransportConfiguration(DuccEventDispatcher eventDispatcher, String endpoint)
+    {
+        this.eventDispatcher = eventDispatcher;
+        this.stateEndpoint = endpoint;
+    }
+
+    public void run()
+    {
+        while ( true ) {
+            runScheduler();
+        }
+    }
+
+    long epoch_counter = 0;
+    public void runScheduler()
+    //public void runScheduler()
+    {
+        String methodName = "runScheduler";
+        JobManagerUpdate jobManagerUpdate;
+
+        while ( true ) {
+
+            try {
+                Thread.sleep(schedulingEpoch);                               // and linger a while
+                //wait();
+            } catch (InterruptedException e) {
+            	logger.info(methodName, null, "Scheduling wait interrupted, executing out-of-band epoch.");
+            }
+            
+            try {
+                // logger.info(methodName, null, "Publishing RM state to", stateEndpoint);
+                logger.info(methodName, null, "--------", ++epoch_counter, "------- Entering scheduling loop --------------------");
+
+                jobManagerUpdate = scheduler.schedule();          
+                if ( jobManagerUpdate != null ) {             // returns null while waiting for node stability
+                    RmStateDuccEvent ev = converter.createState(jobManagerUpdate);
+                    eventDispatcher.dispatch(stateEndpoint, ev, "");  // tell the world what is scheduled (note empty string)
+                }
+
+                logger.info(methodName, null, "--------", epoch_counter, "------- Scheduling loop returns  --------------------");
+            } catch (Throwable e1) {
+            	logger.fatal(methodName, null, e1);
+            }
+            
+        }
+    }
+    
+//     public void nodeArrives(Node n)
+//     {
+//         String methodName = "nodeArrives";        
+//         try {
+//             if ( ! schedulerReady ) {
+//                 logger.warn(methodName, null, "Ignoring node update, scheduler is still booting.");
+//                 return;
+//             }
+
+//             scheduler.nodeArrives(n);
+//         } catch ( Exception e ) {
+//             logger.error(methodName, null, "Exception processing Agent event for node", n, ":\n", e);
+//         }
+//     }
+
+    int stabilityCount = 0;
+    Timer stabilityTimer = new Timer();
+    protected void startStabilityTimer() 
+    {
+    	String methodName = "startStabilityTimer";
+    	logger.info(methodName, null, "Starting stability timer[", nodeMetricsUpdateRate, "] init stability[", initStability, "]");
+        stabilityTimer.schedule(new StabilityTask(), nodeMetricsUpdateRate);
+    }
+
+    private class StabilityTask
+        extends TimerTask
+    {
+        public void run()
+        {
+            if ( ++stabilityCount < initStability ) {
+                logger.info("NodeStability", null, "NodeStability wait:  Countdown", stabilityCount, ":", initStability);
+                stabilityTimer.schedule(new StabilityTask(), nodeMetricsUpdateRate);
+            } else {
+                stabilityTimer = null;              // done with it, discard it
+                scheduler.start();
+                logger.info("NodeStability", null, "Initial node stability reached: scheduler started.");
+            }
+        }
+    }
+
+    public void onOrchestratorStateUpdate(DuccWorkMap map)
+    {
+        String methodName = "onJobManagerStateUpdate";
+        try {
+            logger.info(methodName, null, "-------> OR state arrives");
+            converter.eventArrives(map);
+            //if ( ((epoch_counter++) % schedulingRatio) == 0 ) {
+            //    notify();
+            //}
+        } catch ( Throwable e ) {
+            logger.error(methodName, null, "Excepton processing Orchestrator event:", e);
+        }
+    }
+
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/config/ResourceManagerConfiguration.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/config/ResourceManagerConfiguration.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/config/ResourceManagerConfiguration.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/config/ResourceManagerConfiguration.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.config;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
+import org.apache.uima.ducc.rm.NodeStability;
+import org.apache.uima.ducc.rm.ResourceManager;
+import org.apache.uima.ducc.rm.ResourceManagerComponent;
+import org.apache.uima.ducc.rm.event.ResourceManagerEventListener;
+import org.apache.uima.ducc.rm.scheduler.SchedConstants;
+import org.apache.uima.ducc.transport.DuccTransportConfiguration;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+
+@Configuration
+    @Import({DuccTransportConfiguration.class,CommonConfiguration.class})
+
+    public class ResourceManagerConfiguration 
+    implements SchedConstants
+    {
+        @Autowired CommonConfiguration common;
+        @Autowired DuccTransportConfiguration resourceManagerTransport;
+
+        DuccLogger logger = DuccLogger.getLogger(this.getClass(), COMPONENT_NAME);
+        
+        public ResourceManagerEventListener resourceManagerDelegateListener(ResourceManagerComponent rm) {
+            ResourceManagerEventListener jmel =  new ResourceManagerEventListener(rm);
+            int nodeStability = SystemPropertyResolver.getIntProperty("ducc.rm.node.stability", DEFAULT_STABILITY_COUNT);
+            int agentMetricsRate = SystemPropertyResolver.getIntProperty("ducc.agent.node.metrics.publish.rate", DEFAULT_NODE_METRICS_RATE);
+            NodeStability ns = new NodeStability(rm, nodeStability, agentMetricsRate);            
+            jmel.setEndpoint(common.rmStateUpdateEndpoint);
+            jmel.setNodeStability(ns);
+            ns.start();
+            return jmel;
+        }
+        
+        public RouteBuilder routeBuilderForEndpoint(final String endpoint, final ResourceManagerEventListener delegate) {
+            return new RouteBuilder() {
+                public void configure() {
+                    from(endpoint)
+                        .bean(delegate);
+                }
+            };
+        }
+
+
+        // test and debug only - user routeBuilderForEndpoint normally
+        public RouteBuilder routeBuilderForJmEndpoint(final String endpoint, final ResourceManagerEventListener delegate) {
+            System.out.println("Starting JM endpoint " + endpoint + "  ???????????????????????");
+            return new RouteBuilder() {
+                public void configure() {
+                    from(endpoint)
+                        .threads(10)
+                        .bean(delegate);
+                }
+            };
+        }
+        
+        @Bean 
+        public ResourceManagerComponent resourceManager() 
+        throws Throwable 
+        {
+            String methodName = "resourceManager";
+            ResourceManagerComponent rm = null;
+            try {            
+                rm = new ResourceManagerComponent(common.camelContext());
+
+                // rm.init();
+
+                rm.setTransportConfiguration(resourceManagerTransport.duccEventDispatcher(common.rmStateUpdateEndpoint, rm.getContext()), 
+                                             common.rmStateUpdateEndpoint);
+        
+                //  Instantiate Resource Manager delegate listener. This listener will receive
+                //  incoming messages. 
+                ResourceManagerEventListener delegateListener = this.resourceManagerDelegateListener(rm);
+
+                //  Inject a dispatcher into the listener in case it needs to send
+                //  a message to another component. 
+                delegateListener.setDuccEventDispatcher(resourceManagerTransport.duccEventDispatcher(common.orchestratorStateUpdateEndpoint,rm.getContext()));
+
+                //  Inject Camel Router that will generate state updates at regular intervals
+                // jrc rm.getContext().addRoutes(this.routeBuilderForRMStateUpdate(rm, common.rmStateUpdateEndpoint, Integer.parseInt(common.rmStatePublishRate)));
+
+                //  Inject Camel Router that will handle Job Manager state update messages
+                // rm.getContext().addRoutes(this.routeBuilderForEndpoint(common.orchestratorStateUpdateEndpoint, delegateListener));
+                rm.getContext().addRoutes(this.routeBuilderForEndpoint(common.orchestratorAbbreviatedStateUpdateEndpoint, delegateListener));
+
+                //  Inject Camel Router that will handle Agent Node inventory update messages
+                // rm.getContext().addRoutes(this.routeBuilderForEndpoint(common.nodeInventoryEndpoint,delegateListener));
+
+                //  Inject Camel Router that will handle Node Metrics messages
+                rm.getContext().addRoutes(this.routeBuilderForEndpoint(common.nodeMetricsEndpoint, delegateListener));
+                
+                return rm;
+            } catch ( Throwable t ) {
+                logger.fatal(methodName, null, t);
+                throw new IllegalStateException("Can't start RM: " + t.getMessage());
+            }
+
+        }
+
+        public class ResourceManagerStateUpdateProcessor implements Processor {
+            private ResourceManager resourceManager;
+            public ResourceManagerStateUpdateProcessor(ResourceManager resourceManager) {
+                this.resourceManager = resourceManager;
+            }
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody(resourceManager.getState()); //new RmStateDuccEvent());
+            }
+                
+        }
+
+        public class NodeInventoryProcessor implements Processor {
+
+            public void process(Exchange exchange) throws Exception {
+                //                      System.out.println("... transport received Event. Body Type:"+exchange.getIn().getBody().getClass().getName());
+                //Destination replyTo = exchange.getIn().getHeader("JMSReplyTo", Destination.class); 
+                //                      System.out.println("... transport - value of replyTo:" + replyTo);
+            }
+                
+        }
+
+        public class NodeMetricsProcessor implements Processor {
+
+            public void process(Exchange exchange) throws Exception {
+                //                      System.out.println("... transport received Event. Body Type:"+exchange.getIn().getBody().getClass().getName());
+                //Destination replyTo = exchange.getIn().getHeader("JMSReplyTo", Destination.class); 
+                //                      System.out.println("... transport - value of replyTo:" + replyTo);
+            }
+                
+        }
+
+    }

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/config/ResourceManagerConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/ResourceManagerEventListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/ResourceManagerEventListener.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/ResourceManagerEventListener.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/ResourceManagerEventListener.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.event;
+
+import org.apache.camel.Body;
+import org.apache.uima.ducc.common.ANodeStability;
+import org.apache.uima.ducc.rm.ResourceManager;
+import org.apache.uima.ducc.rm.scheduler.SchedConstants;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.NodeInventoryUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.NodeMetricsUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.OrchestratorAbbreviatedStateDuccEvent;
+import org.apache.uima.ducc.transport.event.OrchestratorStateDuccEvent;
+import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
+
+
+public class ResourceManagerEventListener 
+    implements DuccEventDelegateListener,
+               SchedConstants
+{
+    //private static DuccLogger logger = DuccLogger.getLogger(ResourceManagerEventListener.class, COMPONENT_NAME);
+
+	private String targetEndpoint;
+    private ResourceManager rm;
+    private DuccEventDispatcher eventDispatcher;
+    private ANodeStability nodeStability;
+
+    public DuccEventDispatcher getEventDispatcher() {
+		return eventDispatcher;
+	}
+
+	public void setEventDispatcher(DuccEventDispatcher eventDispatcher) {
+		this.eventDispatcher = eventDispatcher;
+	}
+
+    public void setNodeStability(ANodeStability ns)
+    {
+        this.nodeStability = ns;
+    }
+
+	public String getTargetEndpoint() {
+		return targetEndpoint;
+	}
+
+	public void setTargetEndpoint(String targetEndpoint) {
+		this.targetEndpoint = targetEndpoint;
+	}
+        
+    public ResourceManagerEventListener(ResourceManager rm) 
+    {
+        this.rm = rm;
+    }
+
+    public void setDuccEventDispatcher( DuccEventDispatcher eventDispatcher ) 
+    {
+        this.eventDispatcher = eventDispatcher;
+    }
+
+    public void setEndpoint( String endpoint ) 
+    {
+        this.targetEndpoint = endpoint;
+    }
+
+    /**
+     * Receives {@code NodeMetricsUpdateDuccEvent} events from transport. 
+     * 
+     * @param duccEvent
+     * @throws Exception
+     */
+    public void onNodeMetricsEvent(@Body NodeMetricsUpdateDuccEvent duccEvent) throws Exception 
+    {
+        //rm.nodeArrives(duccEvent.getNode());
+        nodeStability.nodeArrives(duccEvent.getNode());
+    }
+
+    public void onNodeInventoryUpdateEvent(@Body NodeInventoryUpdateDuccEvent duccEvent) throws Exception {
+    }
+
+    /**
+     * Receives {@code OrchestratorDuccEvent} events from transport.
+     * 
+     * @param duccEvent
+     * @throws Exception
+     */
+    public void onOrchestratorStateUpdateEvent(@Body OrchestratorStateDuccEvent duccEvent) throws Exception 
+    {
+    	//String methodName = "onOrchestratorStateUpdateEvent";
+        //logger.info(methodName, null, "Event arrives");
+        rm.onOrchestratorStateUpdate(duccEvent.getWorkMap());
+    }
+
+    /**
+     * Receives {@code OrchestratedAbbreviatedDuccEvent} events from transport.
+     * 
+     * @param duccEvent
+     * @throws Exception
+     */
+    public void onOrchestratedAbbreviatedStateUpdateEvent(@Body OrchestratorAbbreviatedStateDuccEvent duccEvent) throws Exception 
+    {
+    	//String methodName = "onOrchestratorAbbreviatedStateUpdateEvent";
+        //logger.info(methodName, null, "Event arrives");
+        rm.onOrchestratorStateUpdate(duccEvent.getWorkMap());
+    }
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/ResourceManagerEventListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IEntity.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IEntity.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IEntity.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IEntity.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import java.util.Comparator;
+
+
+/**
+ * A SchedulingEntity is one of a collection of similar objects that compete for the same set
+ * of resources.  For instance -
+ *
+ * The collection of ResourceClasses is a set of ScheduingEntities competing for all the resources
+ * that are still left after scheduling higher priority work.
+ *
+ * A collection of Users is a set of SchedulingEntities competing for all the resouces assigned
+ * to a specific ResourceClass.
+ *
+ * A collection of Jobs is a set of SchedulingEntities competing for all the resources assigned to
+ * a specific user.
+ * 
+ * Note that this interface does NOT fully define any concrete entity so we don't have or want a
+ * full beany-like get/set interface.  
+ */
+interface IEntity
+{
+    int    getShareWeight();           // the fair-share weight of this entity within its collection
+                                       // the setter isn't required, must come some other way
+
+    String getName();                  // the name / id of the entity (for messages)
+                                       // the setter isn't required, name must be set some other way
+
+    void   initWantedByOrder(ResourceClass rc);
+    int[]  getWantedByOrder();         // the number of resources of each order wanted by this entity
+                                       // setter isn't required, often an entity will produce this by
+                                       //   calculation anyway
+
+    int[]  getGivenByOrder();          // the number of resources actually allocated after scheduling for this entity.
+    void   setGivenByOrder(int[] gbo); // the scheduler uses this to set the allocation after each
+                                       //    scheduling round
+
+    int    calculateCap(int order, int basis); // The entity must work out any caps that may restrict the counts
+
+    long   getTimestamp();                   // for tiebreaks
+
+    Comparator<IEntity> getApportionmentSorter();
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IEntity.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IJobManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IJobManager.java?rev=1427967&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IJobManager.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IJobManager.java Wed Jan  2 19:43:37 2013
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.rm.scheduler;
+
+import org.apache.uima.ducc.transport.event.rm.IRmStateEvent;
+
+/**
+ *Interface to the job manager
+ */
+
+public interface IJobManager
+{
+    IRmStateEvent createState(JobManagerUpdate jmu);
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IJobManager.java
------------------------------------------------------------------------------
    svn:eol-style = native