You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:37:56 UTC

svn commit: r1427956 [3/4] - in /uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator: main/ main/java/ main/java/org/ main/java/org/apache/ main/java/org/apache/uima/ main/java/org/apache/uima/ducc/ main/java/org/apache/uima/ducc/orchestrator/ main/jav...

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateManager.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateManager.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateManager.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,1124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.TimeStamp;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.DuccProcess;
+import org.apache.uima.ducc.transport.event.common.DuccReservation;
+import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+import org.apache.uima.ducc.transport.event.common.DuccWorkPopDriver;
+import org.apache.uima.ducc.transport.event.common.DuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessWorkItems;
+import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
+import org.apache.uima.ducc.transport.event.common.IDuccUimaDeploymentDescriptor;
+import org.apache.uima.ducc.transport.event.common.IDuccWork;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkService;
+import org.apache.uima.ducc.transport.event.common.IRationale;
+import org.apache.uima.ducc.transport.event.common.Rationale;
+import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.JobCompletionType;
+import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.ReservationCompletionType;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
+import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
+import org.apache.uima.ducc.transport.event.common.IDuccState.ReservationState;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ProcessDeallocationType;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ResourceState;
+import org.apache.uima.ducc.transport.event.common.history.HistoryPersistenceManager;
+import org.apache.uima.ducc.transport.event.jd.DriverStatusReport;
+import org.apache.uima.ducc.transport.event.jd.DuccProcessWorkItemsMap;
+import org.apache.uima.ducc.transport.event.rm.IResource;
+import org.apache.uima.ducc.transport.event.rm.IRmJobState;
+import org.apache.uima.ducc.transport.event.sm.ServiceDependency;
+import org.apache.uima.ducc.transport.event.sm.ServiceMap;
+import org.apache.uima.ducc.transport.event.sm.IService.ServiceState;
+
+
+public class StateManager {
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(StateManager.class.getName());
+	
+	private static StateManager stateManager = new StateManager();
+	
+	public static StateManager getInstance() {
+		return stateManager;
+	}
+	
+	private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+	private Messages messages = orchestratorCommonArea.getSystemMessages();
+	private DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
+	private ConcurrentHashMap<DuccId,DriverStatusReport> driverStatusReportMap = orchestratorCommonArea.getDriverStatusReportMap();
+	private StateJobAccounting stateJobAccounting = StateJobAccounting.getInstance();
+	
+	HistoryPersistenceManager hpm = orchestratorCommonArea.getHistoryPersistencemanager();
+	
+	private boolean jobDriverTerminated(DuccWorkJob duccWorkJob) {
+		String methodName = "jobDriverTerminated";
+		boolean status = true;
+		logger.trace(methodName, null, messages.fetch("enter"));
+		IDuccProcessMap processMap = duccWorkJob.getDriver().getProcessMap();
+		Iterator<DuccId> processMapIterator = processMap.keySet().iterator();
+		while(processMapIterator.hasNext()) {
+			DuccId duccId = processMapIterator.next();
+			IDuccProcess process = processMap.get(duccId);
+			if(process.isActive()) {
+				logger.debug(methodName, duccId,  messages.fetch("processes active"));
+				status = false;
+			}
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return status;
+	}
+	
+	private boolean jobProcessesTerminated(DuccWorkJob duccWorkJob) {
+		String methodName = "jobProcessesTerminated";
+		boolean status = true;
+		logger.trace(methodName, null, messages.fetch("enter"));
+		IDuccProcessMap processMap = duccWorkJob.getProcessMap();
+		Iterator<DuccId> processMapIterator = processMap.keySet().iterator();
+		while(processMapIterator.hasNext()) {
+			DuccId duccId = processMapIterator.next();
+			IDuccProcess process = processMap.get(duccId);
+			if(process.isActive()) {
+				logger.debug(methodName, duccId,  messages.fetch("processes active"));
+				status = false;
+			}
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return status;
+	}
+	
+	private boolean allProcessesTerminated(DuccWorkJob duccWorkJob) {
+		String methodName = "allProcessesTerminated";
+		boolean status = false;
+		logger.trace(methodName, null, messages.fetch("enter"));
+		switch(duccWorkJob.getDuccType()) {
+		case Job:
+			if(jobDriverTerminated(duccWorkJob)) {
+				if(jobProcessesTerminated(duccWorkJob)) {
+					status = true;
+					if(duccWorkJob.getStandardInfo().getDateOfShutdownProcessesMillis() <= 0) {
+						duccWorkJob.getStandardInfo().setDateOfShutdownProcesses(TimeStamp.getCurrentMillis());
+					}
+				}
+			}
+			break;
+		case Service:
+			if(jobProcessesTerminated(duccWorkJob)) {
+				status = true;
+				if(duccWorkJob.getStandardInfo().getDateOfShutdownProcessesMillis() <= 0) {
+					duccWorkJob.getStandardInfo().setDateOfShutdownProcesses(TimeStamp.getCurrentMillis());
+				}
+			}
+			break;
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return status;
+	}
+	
+	private long SECONDS = 1000;
+	private long MINUTES = 60 * SECONDS;
+	private long AgeTime = 1 * MINUTES;
+	
+	private boolean isAgedOut(IDuccWork duccWork) {
+		String methodName = "isAgedOut";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		boolean retVal = true;
+		long endMillis = 0;
+		long nowMillis = 0;
+		long elapsed = 0;
+		try {
+			endMillis = duccWork.getStandardInfo().getDateOfCompletionMillis();
+			nowMillis = System.currentTimeMillis();
+			elapsed = (nowMillis - endMillis);
+			if(elapsed <= AgeTime) {
+				retVal = false;
+			}
+			endMillis = duccWork.getStandardInfo().getDateOfShutdownProcessesMillis();
+			elapsed = (nowMillis - endMillis);
+			if(elapsed <= AgeTime) {
+				retVal = false;
+			}
+		}
+		catch(Exception e) {
+			logger.error(methodName, null, "nowMillis:"+endMillis+" "+"nowMillis:"+endMillis+" ", e);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return retVal;
+	}
+	
+	public boolean isSaved(IDuccWorkJob duccWorkJob) {
+		String methodName = "isSaved";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		boolean retVal = false;
+		try {
+			switch(duccWorkJob.getDuccType()) {
+			case Job:
+				hpm.jobSave(duccWorkJob);
+				retVal = true;
+				break;
+			case Service:
+				hpm.serviceSave((IDuccWorkService)duccWorkJob);
+				retVal = true;
+				break;
+			}
+		}
+		catch(Exception e) {
+			logger.error(methodName, duccWorkJob.getDuccId(), e);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return retVal;
+	}
+	
+	public boolean isSaved(IDuccWorkReservation duccWorkReservation) {
+		String methodName = "isSaved";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		boolean retVal = false;
+		try {
+			hpm.reservationSave(duccWorkReservation);
+			retVal = true;
+		}
+		catch(Exception e) {
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return retVal;
+	}
+	
+	public int prune(DuccWorkMap workMap) {
+		String methodName = "prune";
+		int changes = 0;
+		logger.trace(methodName, null, messages.fetch("enter"));
+		long t0 = System.currentTimeMillis();
+		Iterator<DuccId> workMapIterator = workMap.keySet().iterator();
+		while(workMapIterator.hasNext()) {
+			DuccId duccId = workMapIterator.next();
+			IDuccWork duccWork = workMap.findDuccWork(duccId);
+			switch(duccWork.getDuccType()) {
+			case Job:
+			case Service:
+				DuccWorkJob duccWorkJob = (DuccWorkJob)duccWork;
+				if(duccWorkJob != null) {
+					if(duccWorkJob.isCompleting() && allProcessesTerminated(duccWorkJob)) {
+						stateJobAccounting.stateChange(duccWorkJob, JobState.Completed);
+					}
+					if(duccWorkJob.isCompleted() && allProcessesTerminated(duccWorkJob) && isSaved(duccWorkJob) && isAgedOut(duccWorkJob)) {
+						workMap.removeDuccWork(duccId);
+						driverStatusReportMap.remove(duccId);
+						logger.info(methodName, duccId, messages.fetch("removed job"));
+						changes ++;
+						IDuccProcessMap processMap = duccWorkJob.getProcessMap();
+						Iterator<DuccId> processMapIterator = processMap.keySet().iterator();
+						while(processMapIterator.hasNext()) {
+							DuccId processDuccId = processMapIterator.next();
+							orchestratorCommonArea.getProcessAccounting().removeProcess(processDuccId);
+							logger.info(methodName, duccId, messages.fetch("removed process")+" "+processDuccId.toString());
+							changes ++;
+						}
+						logger.info(methodName, duccId, messages.fetch("processes inactive"));
+					}
+					else {
+						logger.debug(methodName, duccId, messages.fetch("processes active"));
+					}
+				}
+				break;
+			case Reservation:
+				DuccWorkReservation duccWorkReservation = (DuccWorkReservation)duccWork;
+				if(duccWorkReservation != null) {
+					if(duccWorkReservation.isCompleted() && isSaved(duccWorkReservation) && isAgedOut(duccWorkReservation)) {
+						workMap.removeDuccWork(duccId);
+						logger.info(methodName, duccId, messages.fetch("removed reservation"));
+						changes ++;
+					}
+				}
+				break;
+			}
+		}
+		long t1 = System.currentTimeMillis();
+		long elapsed = t1 - t0;
+		if(elapsed > Constants.SYNC_LIMIT) {
+			logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+		}
+		logger.debug(methodName, null, "processToWorkMap.size()="+orchestratorCommonArea.getProcessAccounting().processCount());
+		if(changes > 0) {
+			OrchestratorCheckpoint.getInstance().saveState();
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return changes;
+	}
+	
+	private int stateChange(DuccWorkJob duccWorkJob, JobState state) {
+		stateJobAccounting.stateChange(duccWorkJob, state);
+		return 1;
+	}
+	
+	private int stateChange(DuccWorkReservation duccWorkReservation, ReservationState state) {
+		duccWorkReservation.stateChange(state);
+		return 1;
+	}
+	
+	private void setJdJmxUrl(DuccWorkJob job, String jdJmxUrl) {
+		if(jdJmxUrl != null) {
+			DuccWorkPopDriver driver = job.getDriver();
+			IDuccProcessMap processMap = driver.getProcessMap();
+			if(processMap != null) {
+				Collection<IDuccProcess> processCollection = processMap.values();
+				Iterator<IDuccProcess> iterator = processCollection.iterator();
+				while(iterator.hasNext()) {
+					IDuccProcess process = iterator.next();
+					process.setProcessJmxUrl(jdJmxUrl);
+					
+				}
+			}
+		}
+	}
+	
+	private void copyProcessWorkItemsReport(DuccWorkJob job, DriverStatusReport jdStatusReport) {
+		String methodName = "copyProcessWorkItemsReport";
+		try {
+			IDuccProcessMap processMap = job.getProcessMap();
+			DuccProcessWorkItemsMap pwiMap = jdStatusReport.getDuccProcessWorkItemsMap();
+			Iterator<DuccId> iterator = pwiMap.keySet().iterator();
+			while(iterator.hasNext()) {
+				DuccId processId = iterator.next();
+				IDuccProcess process = processMap.get(processId);
+				IDuccProcessWorkItems pwi = pwiMap.get(processId);
+				process.setProcessWorkItems(pwi);
+			}
+		}
+		catch(Throwable t) {
+			logger.error(methodName, job.getDuccId(), t);
+		}
+	}
+	
+	private void copyDriverWorkItemsReport(DuccWorkJob job, DriverStatusReport jdStatusReport) {
+		String methodName = "copyDriverWorkItemsReport";
+		try {
+			DuccProcessWorkItemsMap pwiMap = jdStatusReport.getDuccProcessWorkItemsMap();
+			IDuccProcessWorkItems pwi = pwiMap.getTotals();
+			DuccWorkPopDriver driver = job.getDriver();
+			IDuccProcessMap processMap = driver.getProcessMap();
+			if(processMap != null) {
+				Iterator<DuccId> iterator = processMap.keySet().iterator();
+				while(iterator.hasNext()) {
+					DuccId processId = iterator.next();
+					IDuccProcess process = processMap.get(processId);
+					process.setProcessWorkItems(pwi);
+				}
+			}
+		}
+		catch(Throwable t) {
+			logger.error(methodName, job.getDuccId(), t);
+		}
+	}
+	
+	/**
+	 * JD reconciliation
+	 */
+	public void reconcileState(DriverStatusReport jdStatusReport) {
+		String methodName = "reconcileState (JD)";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		int changes = 0;
+		long t0 = System.currentTimeMillis();
+		synchronized(workMap) {
+			DuccId duccId = jdStatusReport.getDuccId();
+			DuccWorkJob duccWorkJob = (DuccWorkJob) workMap.findDuccWork(duccId);
+			if(duccWorkJob != null) {
+				String jdJmxUrl = jdStatusReport.getJdJmxUrl();
+				setJdJmxUrl(duccWorkJob, jdJmxUrl);
+				IDuccUimaDeploymentDescriptor uimaDeploymentDescriptor = jdStatusReport.getUimaDeploymentDescriptor();
+				if(uimaDeploymentDescriptor != null) {
+					boolean copyDD = true;
+					if(copyDD) {
+						duccWorkJob.setUimaDeployableConfiguration(uimaDeploymentDescriptor);
+					}
+				}
+				//
+				copyProcessWorkItemsReport(duccWorkJob, jdStatusReport);
+				copyDriverWorkItemsReport(duccWorkJob, jdStatusReport);
+				//
+				switch(duccWorkJob.getJobState()) {
+				case Completed:
+					break;
+				case Completing:
+				default:
+					driverStatusReportMap.put(duccId, jdStatusReport);
+					break;
+				}
+				//
+				switch(jdStatusReport.getDriverState()) {
+				case NotRunning:
+					break;
+				case Initializing:	
+					switch(duccWorkJob.getJobState()) {
+					case WaitingForDriver:
+						stateJobAccounting.stateChange(duccWorkJob, JobState.WaitingForServices);
+						break;
+					case Initializing:
+						break;
+					}
+					break;
+				case Running:
+				case Idle:	
+					if(jdStatusReport.isKillJob()) {
+						jobTerminate(duccWorkJob, JobCompletionType.CanceledByDriver, jdStatusReport.getJobCompletionRationale(), ProcessDeallocationType.JobFailure);
+						break;
+					}
+					switch(duccWorkJob.getJobState()) {
+					case WaitingForDriver:
+						stateJobAccounting.stateChange(duccWorkJob, JobState.WaitingForServices);
+						break;
+					case Initializing:
+						stateJobAccounting.stateChange(duccWorkJob, JobState.Running);
+						break;
+					}
+					break;
+				case Completed:
+					if(!duccWorkJob.isFinished()) {
+						stateJobAccounting.stateChange(duccWorkJob, JobState.Completing);
+						deallocateJobDriver(duccWorkJob, jdStatusReport);
+					}
+					duccWorkJob.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+					switch(jdStatusReport.getJobCompletionType()) {
+					case EndOfJob:
+						duccWorkJob.setCompletion(JobCompletionType.EndOfJob, new Rationale("state manager detected normal completion"));
+						try {
+							int errors = Integer.parseInt(duccWorkJob.getSchedulingInfo().getWorkItemsError());
+							if(errors > 0) {
+								duccWorkJob.setCompletion(JobCompletionType.Error, new Rationale("state manager detected errors="+errors));
+							}
+						}
+						catch(Exception e) {
+						}
+						break;
+					default:
+						duccWorkJob.setCompletion(jdStatusReport.getJobCompletionType(),jdStatusReport.getJobCompletionRationale());
+						break;
+					}
+					break;
+				case Undefined:
+					break;
+				}
+				OrchestratorCommonArea.getInstance().getProcessAccounting().setStatus(jdStatusReport,duccWorkJob);
+				if(deallocateIdleProcesses(duccWorkJob, jdStatusReport)) {
+					changes++;
+				}
+				if(deallocateFailedProcesses(duccWorkJob, jdStatusReport)) {
+					changes++;
+				}
+			}
+			else {
+				logger.warn(methodName, duccId, messages.fetch("not found"));
+			}
+		}
+		long t1 = System.currentTimeMillis();
+		long elapsed = t1 - t0;
+		if(elapsed > Constants.SYNC_LIMIT) {
+			logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+		}		
+		if(changes > 0) {
+			OrchestratorCheckpoint.getInstance().saveState();
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	
+	public boolean isExcessCapacity(DuccWorkJob job, DriverStatusReport jdStatusReport) {
+		String methodName = "isExcessCapacity";
+		boolean retVal = false;
+		if(jdStatusReport != null) {
+			IDuccProcessMap processMap = job.getProcessMap();
+			int threads_per_share = Integer.parseInt(job.getSchedulingInfo().getThreadsPerShare());
+			long capacity = processMap.getUsableProcessCount() * threads_per_share;
+			long total = jdStatusReport.getWorkItemsTotal();
+			long done = jdStatusReport.getWorkItemsProcessingCompleted();
+			long error = jdStatusReport.getWorkItemsProcessingError();
+			long todo = total - (done + error);
+			if(capacity > 0) {
+				if(todo < capacity) {
+				retVal = true;
+				}
+			}
+			logger.info(methodName, job.getDuccId(), "todo:"+todo+" "+"capacity:"+capacity+" "+"excess:"+retVal);
+		}
+		else {
+			logger.info(methodName, job.getDuccId(), "todo:"+"?"+" "+"capacity:"+"?"+" "+"excess:"+retVal);
+		}
+		return retVal;
+	}
+	
+	private boolean deallocateIdleProcesses(DuccWorkJob job, DriverStatusReport jdStatusReport) {
+		String methodName = "deallocateIdleProcesses";
+		boolean retVal = false;
+		if(!jdStatusReport.isPending()  && !jdStatusReport.isWorkItemPendingProcessAssignment()) {
+			IDuccProcessMap processMap = job.getProcessMap();
+			Iterator<DuccId> iterator = processMap.keySet().iterator();
+			boolean excessCapacity = isExcessCapacity(job, jdStatusReport);
+			while(iterator.hasNext() && excessCapacity) {
+				DuccId duccId = iterator.next();
+				IDuccProcess process = processMap.get(duccId);
+				if(!process.isDeallocated()) {
+					String nodeIP = process.getNodeIdentity().getIp();
+					String PID = process.getPID();
+					if(!jdStatusReport.isOperating(nodeIP, PID)) {
+						process.setResourceState(ResourceState.Deallocated);
+						process.setProcessDeallocationType(ProcessDeallocationType.Voluntary);
+						logger.info(methodName, job.getDuccId(), process.getDuccId(), "deallocated");
+						retVal = true;
+						excessCapacity = isExcessCapacity(job, jdStatusReport);
+					}
+				}
+			}
+		}
+		return retVal;
+	}
+	
+	private boolean deallocateFailedProcesses(DuccWorkJob job, DriverStatusReport jdStatusReport) {
+		String methodName = "deallocateFailedProcesses";
+		boolean retVal = false;
+		IDuccProcessMap processMap = job.getProcessMap();
+		Iterator<DuccId> iterator = jdStatusReport.getKillDuccIds();
+		while (iterator.hasNext()) {
+			DuccId duccId = iterator.next();
+			IDuccProcess process = processMap.get(duccId);
+			if(process != null) {
+				if(!process.isDeallocated()) {
+					process.setResourceState(ResourceState.Deallocated);
+					process.setProcessDeallocationType(ProcessDeallocationType.Exception);
+					logger.info(methodName, job.getDuccId(), process.getDuccId(), "deallocated");
+				}
+			}
+			else {
+				logger.warn(methodName, job.getDuccId(), duccId, "not in process map");
+			}
+		}
+		return retVal;
+	}
+	
+	
+	private boolean deallocateJobDriver(DuccWorkJob job, DriverStatusReport jdStatusReport) {
+		String methodName = "deallocateJobDriver";
+		boolean retVal = false;
+		IDuccProcessMap processMap = job.getDriver().getProcessMap();
+		Iterator<DuccId> iterator = processMap.keySet().iterator();
+		while (iterator.hasNext()) {
+			DuccId duccId = iterator.next();
+			IDuccProcess process = processMap.get(duccId);
+			if(process != null) {
+				if(!process.isDeallocated()) {
+					process.setResourceState(ResourceState.Deallocated);
+					process.setProcessDeallocationType(ProcessDeallocationType.Voluntary);
+					logger.info(methodName, job.getDuccId(), process.getDuccId(), "deallocated");
+				}
+			}
+			else {
+				logger.warn(methodName, job.getDuccId(), duccId, "not in process map");
+			}
+		}
+		return retVal;
+	}
+	
+	/**
+	 * RM reconciliation
+	 */
+	public void reconcileState(Map<DuccId, IRmJobState> rmResourceStateMap) throws Exception {
+		String methodName = "reconcileState (RM)";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		logger.debug(methodName, null, messages.fetchLabel("size")+rmResourceStateMap.size());
+		int changes = 0;
+		long t0 = System.currentTimeMillis();
+		synchronized(workMap) {
+			Iterator<DuccId> rmResourceStateIterator = rmResourceStateMap.keySet().iterator();
+			while(rmResourceStateIterator.hasNext()) {
+				DuccId duccId = rmResourceStateIterator.next();
+				IRmJobState rmResourceState = rmResourceStateMap.get(duccId);
+				if(rmResourceState.getPendingAdditions() != null) {
+					logger.debug(methodName, duccId, messages.fetchLabel("pending additions")+rmResourceState.getPendingAdditions().size());
+				}
+				if(rmResourceState.getPendingRemovals() != null) {
+					logger.debug(methodName, duccId, messages.fetchLabel("pending removals")+rmResourceState.getPendingRemovals().size());
+				}
+				IDuccWork duccWork = workMap.findDuccWork(duccId);
+				if(duccWork== null) {
+					logger.debug(methodName, duccId, messages.fetch("not found"));
+				}
+				else {
+					logger.debug(methodName, duccId, messages.fetchLabel("type")+duccWork.getDuccType());
+					switch(duccWork.getDuccType()) {
+					case Job:
+						logger.debug(methodName, duccId, messages.fetch("processing job..."));
+						DuccWorkJob duccWorkJob = (DuccWorkJob) duccWork;
+						processPurger(duccWorkJob,rmResourceState.getResources());
+						changes += processMapResourcesAdd(duccWorkJob,rmResourceState.getPendingAdditions());
+						changes += processMapResourcesDel(duccWorkJob,rmResourceState.getPendingRemovals());
+						JobState jobState = duccWorkJob.getJobState();
+						logger.debug(methodName, duccId, messages.fetchLabel("job state")+jobState);
+						switch(jobState) {
+						case Received:
+						case WaitingForDriver:
+							logger.warn(methodName, duccId, messages.fetchLabel("unexpected state")+jobState);
+							break;
+						case WaitingForServices:
+							logger.debug(methodName, duccId, messages.fetchLabel("unexpected state")+jobState);
+							break;
+						case WaitingForResources:
+							if(rmResourceState.isRefused()) {
+								duccWorkJob.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+								duccWorkJob.setCompletionType(JobCompletionType.ResourcesUnavailable);
+								duccWorkJob.setCompletionRationale(new Rationale("resource manager refused allocation: "+rmResourceState.getReason()));
+								changes += stateChange(duccWorkJob,JobState.Completed);
+								logger.warn(methodName, duccId, messages.fetchLabel("refused")+rmResourceState.getReason());
+							}
+							if(duccWorkJob.getProcessMap().size() > 0) {
+								changes += stateChange(duccWorkJob,JobState.Initializing);
+								logger.info(methodName, duccId, messages.fetchLabel("resources count")+duccWorkJob.getProcessMap().size());
+							}
+							break;
+						case Initializing:
+						case Running:
+							if(duccWorkJob.getProcessMap().size() == 0) {
+								changes += stateChange(duccWorkJob,JobState.WaitingForResources);
+								logger.info(methodName, duccId, messages.fetchLabel("resources count")+duccWorkJob.getProcessMap().size());
+							}
+							break;
+						case Completing:
+						case Completed:
+							logger.debug(methodName, duccId, messages.fetchLabel("unsuitable state")+jobState);
+							break;
+						case Undefined:
+							logger.warn(methodName, duccId, messages.fetchLabel("unsuitable state")+jobState);
+							break;
+						}
+						break;
+					case Reservation:
+						logger.debug(methodName, duccId, messages.fetch("processing reservation..."));
+						DuccWorkReservation duccWorkReservation = (DuccWorkReservation) duccWork;
+						changes += reservationMapResourcesAdd(duccWorkReservation,rmResourceState.getPendingAdditions());
+						changes += reservationMapResourcesDel(duccWorkReservation,rmResourceState.getPendingRemovals());
+						ReservationState reservationState = duccWorkReservation.getReservationState();
+						switch(reservationState) {
+						case Received:
+							logger.warn(methodName, duccId, messages.fetchLabel("unexpected state")+reservationState);
+							break;
+						case WaitingForResources:
+							if(rmResourceState.isRefused()) {
+								duccWorkReservation.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+								duccWorkReservation.setCompletionType(ReservationCompletionType.ResourcesUnavailable);
+								duccWorkReservation.setCompletionRationale(new Rationale("resource manager refused allocation: "+rmResourceState.getReason()));
+								changes += stateChange(duccWorkReservation,ReservationState.Completed);
+								logger.warn(methodName, duccId, messages.fetchLabel("refused")+rmResourceState.getReason());
+							}
+							else {
+								if(rmResourceState.getResources() != null) {
+									if(!rmResourceState.getResources().isEmpty()) {
+										changes += stateChange(duccWorkReservation,ReservationState.Assigned);
+										logger.info(methodName, duccId, messages.fetchLabel("resources count")+rmResourceState.getResources().size());
+									}
+								}
+								else {
+									logger.info(methodName, duccId, messages.fetch("waiting...no resources?"));
+								}
+							}
+							break;
+						case Assigned:
+							if(rmResourceState.getResources() != null) {
+								if(rmResourceState.getResources().isEmpty()) {
+									changes += stateChange(duccWorkReservation,ReservationState.Completed);
+									logger.info(methodName, duccId, messages.fetchLabel("resources count")+rmResourceState.getResources().size());
+								}
+							}
+							else {
+								logger.info(methodName, duccId, messages.fetch("assigned...no resources?"));
+							}
+							break;
+						case Completed:
+							logger.debug(methodName, duccId, messages.fetchLabel("unsuitable state")+reservationState);
+							break;
+						case Undefined:
+							logger.warn(methodName, duccId, messages.fetchLabel("unsuitable state")+reservationState);
+							break;
+						}
+						break;
+					case Service:
+						logger.debug(methodName, duccId, messages.fetch("processing service..."));
+						DuccWorkJob duccWorkService = (DuccWorkJob) duccWork;
+						processPurger(duccWorkService,rmResourceState.getResources());
+						changes += processMapResourcesAdd(duccWorkService,rmResourceState.getPendingAdditions());
+						changes += processMapResourcesDel(duccWorkService,rmResourceState.getPendingRemovals());
+						JobState serviceState = duccWorkService.getJobState();
+						logger.debug(methodName, duccId, messages.fetchLabel("service state")+serviceState);
+						switch(serviceState) {
+						case Received:
+							logger.warn(methodName, duccId, messages.fetchLabel("unexpected state")+serviceState);
+							break;
+						case WaitingForServices:
+							logger.debug(methodName, duccId, messages.fetchLabel("unexpected state")+serviceState);
+							break;
+						case WaitingForResources:
+							if(rmResourceState.isRefused()) {
+								duccWorkService.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+								duccWorkService.setCompletionType(JobCompletionType.ResourcesUnavailable);
+								duccWorkService.setCompletionRationale(new Rationale("resource manager refused allocation: "+rmResourceState.getReason()));
+								changes += stateChange(duccWorkService,JobState.Completed);
+								logger.warn(methodName, duccId, messages.fetchLabel("refused")+rmResourceState.getReason());
+							}
+							if(duccWorkService.getProcessMap().size() > 0) {
+								changes += stateChange(duccWorkService,JobState.Initializing);
+								logger.info(methodName, duccId, messages.fetchLabel("resources count")+duccWorkService.getProcessMap().size());
+							}
+							break;
+						case Initializing:
+						case Running:
+							if(duccWorkService.getProcessMap().size() == 0) {
+								changes += stateChange(duccWorkService,JobState.WaitingForResources);
+								logger.info(methodName, duccId, messages.fetchLabel("resources count")+duccWorkService.getProcessMap().size());
+							}
+							break;
+						case Completing:
+						case Completed:
+							logger.debug(methodName, duccId, messages.fetchLabel("unsuitable state")+serviceState);
+							break;
+						case Undefined:
+							logger.warn(methodName, duccId, messages.fetchLabel("unsuitable state")+serviceState);
+							break;
+						}
+						break;
+					}
+				}
+			}
+			if(changes > 0) {
+				OrchestratorCheckpoint.getInstance().saveState();
+			}
+		}
+		long t1 = System.currentTimeMillis();
+		long elapsed = t1 - t0;
+		if(elapsed > Constants.SYNC_LIMIT) {
+			logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+		}		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	
+	private int processPurger(DuccWorkJob job,Map<DuccId, IResource> map) {
+		String methodName = "processPurger";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		int changes = 0;
+		if(job != null) {
+			if(map != null) {
+				Iterator<DuccId> iterator = map.keySet().iterator();
+				while(iterator.hasNext()) {
+					DuccId duccId = iterator.next();
+					IResource resource = map.get(duccId);
+					if(resource.isPurged()) {
+						IDuccProcess process = job.getProcessMap().get(duccId);
+						if(!process.isDefunct()) {
+							String rState = process.getResourceState().toString();
+							String pState = process.getProcessState().toString();
+							logger.info(methodName, job.getDuccId(), duccId, "rState:"+rState+" "+"pState"+pState);
+							process.setResourceState(ResourceState.Deallocated);
+							process.setProcessDeallocationType(ProcessDeallocationType.Purged);
+							process.advanceProcessState(ProcessState.Stopped);
+						}
+					}
+				}
+			}
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return changes;
+	}
+	
+	private int processMapResourcesAdd(DuccWorkJob duccWorkJob,Map<DuccId,IResource> resourceMap) {
+		String methodName = "processMapResourcesAdd";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		int changes = 0;
+		if(resourceMap == null) {
+			logger.info(methodName, duccWorkJob.getDuccId(), messages.fetch("no map found"));
+		}
+		else {
+			IDuccProcessMap processMap = duccWorkJob.getProcessMap();
+			Iterator<DuccId> resourceMapIterator = resourceMap.keySet().iterator();
+			while(resourceMapIterator.hasNext()) {
+				DuccId duccId = resourceMapIterator.next();
+				NodeIdentity nodeId = resourceMap.get(duccId).getNodeId();
+				if(!processMap.containsKey(duccId)) {
+					DuccProcess process = new DuccProcess(duccId, nodeId, ProcessType.Job_Uima_AS_Process);
+					orchestratorCommonArea.getProcessAccounting().addProcess(duccId, duccWorkJob.getDuccId());
+					processMap.addProcess(process);
+					process.setResourceState(ResourceState.Allocated);
+					logger.info(methodName, duccWorkJob.getDuccId(), messages.fetch("resource added")
+												+" "+messages.fetchLabel("process")+duccId.getFriendly()
+												+" "+messages.fetchLabel("unique")+duccId.getUnique()
+												+" "+messages.fetchLabel("name")+nodeId.getName()
+												+" "+messages.fetchLabel("ip")+nodeId.getIp());
+					changes++;
+					// check on usefulness of recent allocation
+					switch(duccWorkJob.getJobState()) {
+					// allocation unnecessary if job is already completed
+					case Completing:
+					case Completed:
+						process.setResourceState(ResourceState.Deallocated);
+						process.setProcessDeallocationType(ProcessDeallocationType.Voluntary);
+						process.advanceProcessState(ProcessState.Stopped);
+						logger.warn(methodName, duccWorkJob.getDuccId(), 
+								messages.fetch("resource allocated for completed job")
+								+" "+
+								messages.fetchLabel("process")+duccId.getFriendly()
+								);
+						break;
+					default:
+						// allocation unnecessary if job has excess capacity
+						if(isExcessCapacity(duccWorkJob,driverStatusReportMap.get(duccId))) {
+							process.setResourceState(ResourceState.Deallocated);
+							process.setProcessDeallocationType(ProcessDeallocationType.Voluntary);
+							process.advanceProcessState(ProcessState.Stopped);
+							logger.warn(methodName, duccWorkJob.getDuccId(), 
+									messages.fetch("resource allocated for over capacity job")
+									+" "+
+									messages.fetchLabel("process")+duccId.getFriendly()
+									);
+						}
+						break;
+					}
+				}
+				else {
+					logger.warn(methodName, duccWorkJob.getDuccId(), messages.fetch("resource exists")
+						+" "+messages.fetchLabel("process")+duccId.getFriendly()
+						+" "+messages.fetchLabel("unique")+duccId.getUnique()
+						+" "+messages.fetchLabel("name")+nodeId.getName()
+						+" "+messages.fetchLabel("ip")+nodeId.getIp());
+				}
+			}
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return changes;
+	}
+	
+	private int processMapResourcesDel(DuccWorkJob duccWorkJob,Map<DuccId,IResource> resourceMap) {
+		String methodName = "processMapResourcesDel";
+		logger.trace(methodName, duccWorkJob.getDuccId(), messages.fetch("enter"));
+		int changes = 0;
+		if(resourceMap == null) {
+			logger.info(methodName, duccWorkJob.getDuccId(), messages.fetch("no map found"));
+		}
+		else {
+			IDuccProcessMap processMap = duccWorkJob.getProcessMap();
+			Iterator<DuccId> resourceMapIterator = resourceMap.keySet().iterator();
+			logger.debug(methodName, duccWorkJob.getDuccId(), messages.fetchLabel("size")+processMap.size());
+			while(resourceMapIterator.hasNext()) {
+				DuccId duccId = resourceMapIterator.next();
+				NodeIdentity nodeId = resourceMap.get(duccId).getNodeId();
+				logger.info(methodName, duccWorkJob.getDuccId(), messages.fetch("resource processing")
+					+" "+messages.fetchLabel("process")+duccId.getFriendly()
+					+" "+messages.fetchLabel("unique")+duccId.getUnique()
+					+" "+messages.fetchLabel("name")+nodeId.getName()
+					+" "+messages.fetchLabel("ip")+nodeId.getIp());
+				if(processMap.containsKey(duccId)) {
+					IDuccProcess process = processMap.get(duccId);
+					process.setResourceState(ResourceState.Deallocated);
+					process.setProcessDeallocationType(ProcessDeallocationType.Forced);
+					logger.info(methodName, duccWorkJob.getDuccId(), messages.fetch("resource deallocated")
+						+" "+messages.fetchLabel("process")+duccId.getFriendly()
+						+" "+messages.fetchLabel("unique")+duccId.getUnique()
+						+" "+messages.fetchLabel("name")+nodeId.getName()
+						+" "+messages.fetchLabel("ip")+nodeId.getIp());
+					/*
+					if(process.isDefunct()) {
+						orchestratorCommonArea.getProcessAccounting().removeProcess(duccId);
+						processMap.removeProcess(duccId);
+						logger.info(methodName, duccId, messages.fetch("remove resource")+" "+messages.fetchLabel("name")+nodeId.getName()+" "+messages.fetchLabel("ip")+nodeId.getIp());
+						changes++;
+					}
+					*/
+				}
+				else {
+					logger.info(methodName, duccWorkJob.getDuccId(), messages.fetch("resource not found")
+						+" "+messages.fetchLabel("process")+duccId.getFriendly()
+						+" "+messages.fetchLabel("unique")+duccId.getUnique()
+						+" "+messages.fetchLabel("name")+nodeId.getName()
+						+" "+messages.fetchLabel("ip")+nodeId.getIp());
+				}
+			}
+		}
+		logger.trace(methodName, duccWorkJob.getDuccId(), messages.fetch("exit"));
+		return changes;
+	}
+
+	private int reservationMapResourcesAdd(DuccWorkReservation duccWorkReservation,Map<DuccId,IResource> resourceMap) {
+		String methodName = "reservationMapResourcesAdd";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		int changes = 0;
+		IDuccReservationMap reservationMap = duccWorkReservation.getReservationMap();
+		if(resourceMap != null) {
+			Iterator<DuccId> resourceMapIterator = resourceMap.keySet().iterator();
+			while(resourceMapIterator.hasNext()) {
+				DuccId duccId = resourceMapIterator.next();
+				IResource resource = resourceMap.get(duccId);
+				NodeIdentity nodeId = resource.getNodeId();
+				int shares = resource.countShares();
+				if(!reservationMap.containsKey(duccId)) {
+					DuccReservation reservation = new DuccReservation(duccId, nodeId, shares);
+					reservationMap.addReservation(reservation);
+					logger.info(methodName, duccId, messages.fetch("add resource")+" "+messages.fetchLabel("name")+nodeId.getName()+" "+messages.fetchLabel("ip")+nodeId.getIp());
+					changes++;
+				}
+				else {
+					logger.debug(methodName, duccId, messages.fetch("duplicate resource")+" "+messages.fetchLabel("name")+nodeId.getName()+" "+messages.fetchLabel("ip")+nodeId.getIp());
+				}
+			}
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return changes;
+	}
+	
+	private int reservationMapResourcesDel(DuccWorkReservation duccWorkReservation,Map<DuccId,IResource> resourceMap) {
+		String methodName = "processMapResourcesDel";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		int changes = 0;
+		IDuccReservationMap reservationMap = duccWorkReservation.getReservationMap();
+		if(resourceMap != null) {
+			Iterator<DuccId> resourceMapIterator = resourceMap.keySet().iterator();
+			while(resourceMapIterator.hasNext()) {
+				DuccId duccId = resourceMapIterator.next();
+				NodeIdentity nodeId = resourceMap.get(duccId).getNodeId();
+				if(reservationMap.containsKey(duccId)) {
+					reservationMap.removeReservation(duccId);
+					logger.info(methodName, duccId, messages.fetch("remove resource")+" "+messages.fetchLabel("name")+nodeId.getName()+" "+messages.fetchLabel("ip")+nodeId.getIp());
+					changes++;
+				}
+				else {
+					logger.debug(methodName, duccId, messages.fetch("not found resource")+" "+messages.fetchLabel("name")+nodeId.getName()+" "+messages.fetchLabel("ip")+nodeId.getIp());
+				}
+			}
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return changes;
+	}
+	
+	/**
+	 * SM reconciliation
+	 */
+	private String getServiceDependencyMessages(ServiceDependency sd) {
+		String retVal = null;
+		Map<String, String> messages = sd.getMessages();
+		if(messages != null) {
+			StringBuffer sb = new StringBuffer();
+			for(String endpoint : messages.keySet()) {
+				sb.append(endpoint);
+				sb.append(":");
+				sb.append(messages.get(endpoint));
+				sb.append(";");
+			}
+			retVal = sb.toString();
+		}
+		return retVal;
+	}
+	
+	public void reconcileState(ServiceMap serviceMap) {
+		String methodName = "reconcileState (SM)";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		int changes = 0;
+		Iterator<DuccId> serviceMapIterator = serviceMap.keySet().iterator();
+		long t0 = System.currentTimeMillis();
+		synchronized(workMap) {
+			while(serviceMapIterator.hasNext()) {
+				DuccId duccId = serviceMapIterator.next();
+				ServiceDependency services = serviceMap.get(duccId);
+				DuccWorkJob duccWorkJob = (DuccWorkJob) workMap.findDuccWork(duccId);
+				if(duccWorkJob != null) {
+					JobState jobState = duccWorkJob.getJobState();
+					ServiceState serviceState = services.getState();
+					switch(jobState) {
+					case Received:
+						logger.warn(methodName, duccId, messages.fetchLabel("unexpected job state")+jobState);
+						break;
+					case WaitingForDriver:
+						logger.warn(methodName, duccId, messages.fetchLabel("unexpected job state")+jobState);
+						break;
+					case WaitingForServices:
+						switch(serviceState) {
+						case Waiting:
+						case Initializing:
+							break;
+						case Available:
+							stateJobAccounting.stateChange(duccWorkJob, JobState.WaitingForResources);
+							changes++;
+							logger.warn(methodName, duccId, messages.fetchLabel("job state")+jobState+" "+messages.fetchLabel("services state")+serviceState);
+							break;
+						case NotAvailable:
+                        case Stopping:
+							stateJobAccounting.stateChange(duccWorkJob, JobState.Completing);
+							duccWorkJob.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+							String sdm = getServiceDependencyMessages(services);  
+							IRationale rationale = new Rationale();
+							if(sdm != null) {
+								rationale = new Rationale("service manager reported "+sdm);
+							}
+							stateJobAccounting.complete(duccWorkJob, JobCompletionType.ServicesUnavailable, rationale);
+							changes++;
+							logger.warn(methodName, duccId, messages.fetchLabel("job state")+jobState+" "+messages.fetchLabel("services state")+serviceState);
+							break;
+						case Undefined:
+							logger.warn(methodName, duccId, messages.fetchLabel("job state")+jobState+" "+messages.fetchLabel("services state")+serviceState);
+							break;
+						}
+						break;
+					case WaitingForResources:
+					case Initializing:
+					case Running:
+						logger.warn(methodName, duccId, messages.fetchLabel("job state")+jobState+" "+messages.fetchLabel("services state")+serviceState);
+						break;
+					case Completed:
+						logger.debug(methodName, duccId, messages.fetchLabel("job state")+jobState+" "+messages.fetchLabel("services state")+serviceState);
+						break;
+					case Undefined:
+						logger.warn(methodName, duccId, messages.fetchLabel("unexpected job state")+jobState);
+						break;
+					}
+				}
+				else {
+					logger.debug(methodName, duccId, messages.fetch("job not found"));
+				}
+			}
+			if(changes > 0) {
+				OrchestratorCheckpoint.getInstance().saveState();
+			}
+		}
+		long t1 = System.currentTimeMillis();
+		long elapsed = t1 - t0;
+		if(elapsed > Constants.SYNC_LIMIT) {
+			logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+		}		
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	
+	/**
+	 * Node Inventory reconciliation
+	 */
+	public void reconcileState(HashMap<DuccId, IDuccProcess> inventoryProcessMap) {
+		String methodName = "reconcileState (Node Inventory)";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		Iterator<DuccId> iterator = inventoryProcessMap.keySet().iterator();
+		long t0 = System.currentTimeMillis();
+		synchronized(workMap) {
+			while(iterator.hasNext()) {
+				DuccId processId = iterator.next();
+				IDuccProcess inventoryProcess = inventoryProcessMap.get(processId);
+				switch(inventoryProcess.getProcessType()) {
+				case Pop:
+					DuccId jobId = OrchestratorCommonArea.getInstance().getProcessAccounting().getJobId(processId);
+					if(jobId != null) {
+						IDuccWork duccWork = workMap.findDuccWork(jobId);
+						if(duccWork != null) {
+							switch(duccWork.getDuccType()) {
+							case Job:
+								DuccWorkJob job = (DuccWorkJob) duccWork;
+								OrchestratorCommonArea.getInstance().getProcessAccounting().setStatus(inventoryProcess);
+								switch(inventoryProcess.getProcessState()) {
+								case Failed:
+									if(inventoryProcess.getDuccId().getFriendly() == 0) {
+										jobTerminate(job, JobCompletionType.DriverProcessFailed, new Rationale(inventoryProcess.getReasonForStoppingProcess()), inventoryProcess.getProcessDeallocationType());
+									}
+									else {
+										jobTerminate(job, JobCompletionType.ProcessFailure, new Rationale(inventoryProcess.getReasonForStoppingProcess()), inventoryProcess.getProcessDeallocationType());
+									}
+									break;
+								default:
+									if(inventoryProcess.isComplete()) {
+										OrchestratorCommonArea.getInstance().getProcessAccounting().deallocate(job,ProcessDeallocationType.Stopped);
+										completeJob(job, new Rationale("state manager reported as normal completion"));
+									}
+									break;
+								}
+								break;
+							}
+						}
+					}
+					break;
+				case Service:
+					break;
+				case Job_Uima_AS_Process:
+					OrchestratorCommonArea.getInstance().getProcessAccounting().setStatus(inventoryProcess);
+					break;
+				}
+			}
+		}
+		long t1 = System.currentTimeMillis();
+		long elapsed = t1 - t0;
+		if(elapsed > Constants.SYNC_LIMIT) {
+			logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+		}		
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	
+	private void completeJob(DuccWorkJob job, IRationale rationale) {
+		switch(job.getCompletionType()) {
+		case Undefined:
+			job.setCompletion(JobCompletionType.EndOfJob, rationale);
+			job.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+		case EndOfJob:
+			if(job.getProcessFailureCount() > 0) {
+				job.setCompletion(JobCompletionType.Error, rationale);
+			}
+			else if(job.getProcessInitFailureCount() > 0) {
+				job.setCompletion(JobCompletionType.Error, rationale);
+			}
+			else {
+				try {
+					if(Integer.parseInt(job.getSchedulingInfo().getWorkItemsError()) > 0) {
+						job.setCompletion(JobCompletionType.Error, rationale);
+					}
+				}
+				catch(Exception e) {
+				}
+			}
+			break;
+		default:
+			break;
+		}
+		switch(job.getJobState()) {
+		case Completing:
+		case Completed:
+			break;
+		default:
+			if(job.getProcessMap().getAliveProcessCount() == 0) {
+				stateJobAccounting.stateChange(job, JobState.Completing);
+			}
+		}
+	}
+	
+	public void jobTerminate(IDuccWorkJob job, JobCompletionType jobCompletionType, IRationale rationale, ProcessDeallocationType processDeallocationType) {
+		if(!job.isFinished()) {
+			stateJobAccounting.stateChange(job, JobState.Completing);
+			stateJobAccounting.complete(job, jobCompletionType, rationale);
+			OrchestratorCommonArea.getInstance().getProcessAccounting().deallocate(job,processDeallocationType);
+			job.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+		}
+	}
+	
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Validate.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Validate.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Validate.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Validate.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import java.util.ArrayList;
+import java.util.Properties;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.orchestrator.authentication.DuccWebAdministrators;
+import org.apache.uima.ducc.orchestrator.utilities.MemorySpecification;
+import org.apache.uima.ducc.transport.event.CancelJobDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelServiceDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitJobDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitServiceDuccEvent;
+import org.apache.uima.ducc.transport.event.cli.JobRequestProperties;
+import org.apache.uima.ducc.transport.event.cli.JobSpecificationProperties;
+import org.apache.uima.ducc.transport.event.cli.ReservationRequestProperties;
+import org.apache.uima.ducc.transport.event.common.DuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccSchedulingInfo;
+
+
+public class Validate {
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(Validate.class.getName());
+	
+	private static DuccWebAdministrators duccWebAdministrators = new DuccWebAdministrators();
+	
+	@SuppressWarnings("unchecked")
+	private static void addError(Properties properties, String reason) {
+		String methodName = "addError";
+		String key = JobSpecificationProperties.key_submit_errors;
+		ArrayList<String> value = (ArrayList<String>) properties.get(key);
+		if(value == null) {
+			value = new ArrayList<String>();
+			properties.put(key, value);
+		}
+		value.add(reason);
+		logger.info(methodName, null, reason);
+		return;
+	}
+	
+	@SuppressWarnings("unchecked")
+	private static void addWarning(Properties properties, String reason) {
+		String methodName = "addWarning";
+		String key = JobSpecificationProperties.key_submit_warnings;
+		ArrayList<String> value = (ArrayList<String>) properties.get(key);
+		if(value == null) {
+			value = new ArrayList<String>();
+			properties.put(key, value);
+		}
+		value.add(reason);
+		logger.info(methodName, null, reason);
+		return;
+	}
+	
+	private static String createReason(String type, String key, String value) {
+		String retVal = type+": "+key+"="+value;
+		return retVal;
+	}
+	 
+	public static boolean integer(boolean retVal, Properties properties, String key, String defaultValue, String minValue, String maxValue) {
+		String value = (String)properties.get(key);
+		if(value == null) {
+			String reason = createReason("default",key,defaultValue);
+			addWarning(properties,reason);
+		}
+		else {
+			try {
+				int specified_value = Integer.parseInt(value);
+				int min_value = Integer.parseInt(minValue);
+				int max_value = Integer.parseInt(maxValue);
+				if(specified_value < min_value) {
+					String reason = createReason("invalid, under "+minValue,key,value);
+					addError(properties,reason);
+					retVal = false;
+				}
+				if(specified_value > max_value) {
+					String reason = createReason("invalid, above "+maxValue,key,value);
+					addError(properties,reason);
+					retVal = false;
+				}
+			}
+			catch(Exception e) {
+				String reason = createReason("invalid, non-integer",key,value);
+				addError(properties,reason);
+				retVal = false;
+			}
+		}
+		return retVal;
+	}
+	public static boolean request(SubmitJobDuccEvent duccEvent) {
+		boolean retVal = true;
+		JobRequestProperties properties = (JobRequestProperties) duccEvent.getProperties();
+		//
+		retVal = integer(retVal,
+				properties,
+				JobSpecificationProperties.key_process_thread_count,
+				IDuccSchedulingInfo.defaultThreadsPerShare,
+				IDuccSchedulingInfo.minThreadsPerShare,
+				IDuccSchedulingInfo.maxThreadsPerShare);
+		//TODO		
+		return retVal;
+	}
+	
+	public static boolean request(CancelJobDuccEvent duccEvent) {
+		boolean retVal = true;
+		//TODO
+		return retVal;
+	}
+	
+	public static boolean request(SubmitReservationDuccEvent duccEvent) {
+		boolean retVal = true;
+		ReservationRequestProperties properties = (ReservationRequestProperties) duccEvent.getProperties();
+		String key;
+		String value;
+		// memory size
+		key = ReservationRequestProperties.key_number_of_instances;
+		String memorySize = (String) properties.get(key);
+		MemorySpecification memorySpecification = new MemorySpecification(memorySize);
+		value = memorySpecification.getSize();
+		if(value == null) {
+			String reason = createReason("invalid", key, value);
+			addError(properties,reason);
+			retVal = false;
+		}
+		// number of machines
+		key = ReservationRequestProperties.key_number_of_instances;
+		value = (String) properties.get(key);
+		if(value == null) {
+			String reason = createReason("invalid", key, value);
+			addError(properties,reason);
+			retVal = false;
+		}
+		// scheduling class
+		key = ReservationRequestProperties.key_scheduling_class;
+		value = (String) properties.get(key);
+		if(value == null) {
+			String reason = createReason("using", key, "default");
+			addWarning(properties,reason);
+		}
+		return retVal;
+	}
+	
+	public static boolean request(CancelReservationDuccEvent duccEvent, DuccWorkReservation duccWorkReservation) {
+		String location = "request";
+		boolean retVal = false;
+		Properties properties = duccEvent.getProperties();
+		String userid = properties.getProperty(JobSpecificationProperties.key_user);
+		String ownerid = duccWorkReservation.getStandardInfo().getUser();
+		if((userid != null) && (userid.equals(ownerid))) {
+			retVal = true;
+		}
+		else if(duccWebAdministrators.isAdministrator(userid)) {
+			retVal = true;
+		}
+		else {
+			String reason;
+			if(userid == null) {
+				reason = createReason("reservation cancel invalid",JobSpecificationProperties.key_user,"unspecified");
+			}
+			else {
+				reason = createReason("reservation cancel unauthorized",JobSpecificationProperties.key_user,userid);
+			}
+			addWarning(properties,reason);
+			logger.warn(location, duccWorkReservation.getDuccId(), reason);
+		}
+		return retVal;
+	}
+	
+	public static boolean request(SubmitServiceDuccEvent duccEvent) {
+		boolean retVal = true;
+		JobRequestProperties properties = (JobRequestProperties) duccEvent.getProperties();
+		//
+		retVal = integer(retVal,
+				properties,
+				JobSpecificationProperties.key_process_thread_count,
+				IDuccSchedulingInfo.defaultThreadsPerShare,
+				IDuccSchedulingInfo.minThreadsPerShare,
+				IDuccSchedulingInfo.maxThreadsPerShare);
+		//TODO		
+		return retVal;
+	}
+	
+	public static boolean request(CancelServiceDuccEvent duccEvent) {
+		boolean retVal = true;
+		//TODO
+		return retVal;
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Validate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/authentication/DuccWebAdministrators.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/authentication/DuccWebAdministrators.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/authentication/DuccWebAdministrators.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/authentication/DuccWebAdministrators.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.authentication;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.TreeMap;
+
+import org.apache.uima.ducc.common.IDuccEnv;
+
+
+public class DuccWebAdministrators {
+
+	private String fileName = IDuccEnv.DUCC_ADMINISTRATORS_FILE;
+	
+	private static DuccWebAdministrators duccWebAdministrators = new DuccWebAdministrators();
+	
+	public static DuccWebAdministrators getInstance() {
+		return duccWebAdministrators;
+	}
+	
+	private Properties load() {
+		Properties properties = new Properties();
+		try {
+			File file = new File(fileName);
+			FileInputStream fis = new FileInputStream(file);
+			properties.load(fis);
+			fis.close();
+		}
+		catch(IOException e) {
+			e.printStackTrace();
+		}
+		return properties;
+	}
+	
+	public Iterator<String> getSortedAuthorizedUserids() {
+		TreeMap<String,String> map = new TreeMap<String,String>();
+		Properties properties = load();
+		if(!properties.isEmpty()) {
+			Enumeration<?> enumeration = properties.propertyNames();
+			while(enumeration.hasMoreElements()) {
+				String name = (String)enumeration.nextElement();
+				map.put(name, name);
+			}
+		}
+		Iterator<String> iterator = map.keySet().iterator();
+		return iterator;
+	}
+	
+	public String getAuthorizationFileName() {
+		return fileName;
+	}
+	
+	public boolean isAdministrator(String userid) {
+		boolean retVal = false;
+		try {
+			if(userid != null) {
+				Properties properties = load();
+				Iterator<Object> iterator = properties.keySet().iterator();
+				while(iterator.hasNext()) {
+					String authorizedUserid = ((String)(iterator.next())).trim();
+					if(userid.trim().equals(authorizedUserid)) {
+						retVal = true;
+						break;
+					}
+				}
+			}
+		}
+		catch(Exception e) {
+		}
+		return retVal;
+	}
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/authentication/DuccWebAdministrators.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/config/OrchestratorConfiguration.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/config/OrchestratorConfiguration.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/config/OrchestratorConfiguration.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/config/OrchestratorConfiguration.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.config;
+
+import org.apache.camel.Body;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jetty.JettyHttpComponent;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.config.DuccBlastGuardPredicate;
+import org.apache.uima.ducc.common.exception.DuccRuntimeException;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.XStreamUtils;
+import org.apache.uima.ducc.orchestrator.ORTracer;
+import org.apache.uima.ducc.orchestrator.Orchestrator;
+import org.apache.uima.ducc.orchestrator.OrchestratorCommonArea;
+import org.apache.uima.ducc.orchestrator.OrchestratorComponent;
+import org.apache.uima.ducc.orchestrator.event.OrchestratorEventListener;
+import org.apache.uima.ducc.orchestrator.monitor.Xmon;
+import org.apache.uima.ducc.orchestrator.monitor.Xmon.ExchangeType;
+import org.apache.uima.ducc.orchestrator.monitor.Xmon.LifeStatus;
+import org.apache.uima.ducc.transport.DuccTransportConfiguration;
+import org.apache.uima.ducc.transport.event.CancelJobDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelJobReplyDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelReservationReplyDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelServiceDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelServiceReplyDuccEvent;
+import org.apache.uima.ducc.transport.event.OrchestratorAbbreviatedStateDuccEvent;
+import org.apache.uima.ducc.transport.event.OrchestratorStateDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitJobDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitJobReplyDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitReservationReplyDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitServiceDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitServiceReplyDuccEvent;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+
+@Configuration
+@Import({DuccTransportConfiguration.class,CommonConfiguration.class})
+
+public class OrchestratorConfiguration {
+	//	Springframework magic to inject instance of {@link CommonConfiguration}
+	@Autowired CommonConfiguration common;
+	//	Springframework magic to inject instance of {@link DuccTransportConfiguration}
+	@Autowired DuccTransportConfiguration orchestratorTransport;
+
+	private DuccLogger duccLogger = DuccLoggerComponents.getOrLogger(OrchestratorConfiguration.class.getName());
+	
+	/**
+	 * Creates Camel router that will handle incoming request messages. Each message will
+	 * be unmarshalled using xstream and delegated to provided {@code OrchestratorEventListener}.
+	 *   
+	 * @param endpoint - endpoint where the job manager expects to receive messages
+	 * @param delegate - {@code OrchestratorEventListener} instance to delegate incoming messages 
+	 * @return
+	 */
+	
+	public RouteBuilder routeBuilderForEndpoint(final String endpoint, final OrchestratorEventListener delegate) {
+
+		return new RouteBuilder() {
+			
+            public void configure() {
+    			Xmon xmStart = new Xmon(LifeStatus.Start, ExchangeType.Receive);
+    			Xmon xmEnded = new Xmon(LifeStatus.Ended, ExchangeType.Receive);
+    			Xmon xmError = new Xmon(LifeStatus.Error, ExchangeType.Receive);
+            	onException(Exception.class).handled(true).process(xmError);
+            	from(endpoint)
+            	.process(xmStart)
+            	.bean(delegate)
+            	.process(xmEnded)
+            	;
+            }
+        };
+    }
+	
+	/**
+	 * Creates Camel router that will handle incoming request messages. Each message will
+	 * be unmarshalled using xstream and delegated to provided {@code OrchestratorEventListener}.
+	 *   
+	 * @param endpoint - endpoint where the job manager expects to receive messages
+	 * @param delegate - {@code OrchestratorEventListener} instance to delegate incoming messages 
+	 * @return
+	 */
+	/*
+	public RouteBuilder routeBuilderForReplyEndpoint(final String endpoint, final OrchestratorEventListener delegate) {
+
+		return new RouteBuilder() {
+            public void configure() {
+            	from(endpoint)
+            	.unmarshal().xstream()
+            	.process(new TransportProcessor())  // intermediate processing before delegating to event listener
+            	.bean(delegate)
+            	.process(new OrchestratorReplyProcessor())   // inject reply object
+            	.marshal().xstream()
+            	;
+            }
+        };
+    }
+	*/
+  private RouteBuilder routeBuilder(final CamelContext context, final OrchestratorEventListener delegate) throws Exception {
+    
+    return new RouteBuilder() {
+          public void configure() {
+            
+            JettyHttpComponent jettyComponent = new JettyHttpComponent();
+            
+			Xmon xmStart = new Xmon(LifeStatus.Start, ExchangeType.Receive);
+			Xmon xmEnded = new Xmon(LifeStatus.Ended, ExchangeType.Reply);
+			//ExchangeMonitor xmError = new ExchangeMonitor(LifeStatus.Error, ExchangeType.Receive);
+			
+            context.addComponent("jetty", jettyComponent);
+            onException(Throwable.class).maximumRedeliveries(0).handled(false).process(new ErrorProcessor());
+            
+            from("jetty://http://0.0.0.0:"+common.duccORHttpPort+"/or")
+            .unmarshal().xstream()
+            
+            .process(xmStart)
+            .bean(delegate)
+            .process(new OrchestratorReplyProcessor())   // inject reply object
+            .process(xmEnded)
+            .process(new Processor() {
+              
+              public void process(Exchange exchange) throws Exception {
+                exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
+                exchange.getOut().setHeader("content-type", "text/xml");
+                Object o = exchange.getIn().getBody();
+                if ( o != null ) {
+                  String body = XStreamUtils.marshall(o);
+                  exchange.getOut().setBody(body);
+                  exchange.getOut().setHeader("content-length", body.length());
+                } else {
+                  duccLogger.warn("RouteBuilder.configure", null, new DuccRuntimeException("Orchestrator Has Not Provided a Reply Object."));
+                  exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 500);
+                } 
+              }
+            })
+            ;
+          }
+        };
+  }
+  
+	private class OrchestratorReplyProcessor implements Processor {
+		
+		private OrchestratorReplyProcessor() {
+		}
+		
+		public void process(Exchange exchange) throws Exception {
+			Object obj = exchange.getIn().getBody();
+			if(obj instanceof SubmitJobDuccEvent) {
+				SubmitJobDuccEvent submitJobEvent = exchange.getIn().getBody(SubmitJobDuccEvent.class);
+				SubmitJobReplyDuccEvent replyJobEvent = new SubmitJobReplyDuccEvent();
+				replyJobEvent.setProperties(submitJobEvent.getProperties());
+				exchange.getIn().setBody(replyJobEvent);
+			}
+			if(obj instanceof CancelJobDuccEvent) {
+				CancelJobDuccEvent cancelJobEvent = exchange.getIn().getBody(CancelJobDuccEvent.class);
+				CancelJobReplyDuccEvent replyJobEvent = new CancelJobReplyDuccEvent();
+				replyJobEvent.setProperties(cancelJobEvent.getProperties());
+				exchange.getIn().setBody(replyJobEvent);
+			}
+			if(obj instanceof SubmitReservationDuccEvent) {
+				SubmitReservationDuccEvent submitReservationEvent = exchange.getIn().getBody(SubmitReservationDuccEvent.class);
+				SubmitReservationReplyDuccEvent replyReservationEvent = new SubmitReservationReplyDuccEvent();
+				replyReservationEvent.setProperties(submitReservationEvent.getProperties());
+				exchange.getIn().setBody(replyReservationEvent);
+			}
+			if(obj instanceof CancelReservationDuccEvent) {
+				CancelReservationDuccEvent cancelReservationEvent = exchange.getIn().getBody(CancelReservationDuccEvent.class);
+				CancelReservationReplyDuccEvent replyReservationEvent = new CancelReservationReplyDuccEvent();
+				replyReservationEvent.setProperties(cancelReservationEvent.getProperties());
+				exchange.getIn().setBody(replyReservationEvent);
+			}
+			if(obj instanceof SubmitServiceDuccEvent) {
+				SubmitServiceDuccEvent submitServiceEvent = exchange.getIn().getBody(SubmitServiceDuccEvent.class);
+				SubmitServiceReplyDuccEvent replyServiceEvent = new SubmitServiceReplyDuccEvent();
+				replyServiceEvent.setProperties(submitServiceEvent.getProperties());
+				exchange.getIn().setBody(replyServiceEvent);
+			}
+			if(obj instanceof CancelServiceDuccEvent) {
+				CancelServiceDuccEvent cancelServiceEvent = exchange.getIn().getBody(CancelServiceDuccEvent.class);
+				CancelServiceReplyDuccEvent replyServiceEvent = new CancelServiceReplyDuccEvent();
+				replyServiceEvent.setProperties(cancelServiceEvent.getProperties());
+				exchange.getIn().setBody(replyServiceEvent);
+			}
+		}
+	}
+	
+	/**
+	 * Creates Camel router that will publish Orchestrator state at regular intervals.
+	 * 
+	 * @param targetEndpointToReceiveOrchestratorStateUpdate - endpoint where to publish JM state 
+	 * @param statePublishRate - how often to publish state
+	 * @return
+	 * @throws Exception
+	 */
+	private RouteBuilder routeBuilderForOrchestratorStatePost(final Orchestrator orchestrator, final String targetEndpointToReceiveOrchestratorStateUpdate, final int statePublishRate) throws Exception {
+		final OrchestratorStateProcessor orchestratorp =  // an object responsible for generating the state 
+			new OrchestratorStateProcessor(orchestrator);
+		
+		return new RouteBuilder() {
+		      public void configure() {		            
+				//Xmon xmStart = new Xmon(LifeStatus.Start, ExchangeType.Send, OrchestratorStateDuccEvent.class);
+				//Xmon xmEnded = new Xmon(LifeStatus.Ended, ExchangeType.Send, OrchestratorStateDuccEvent.class);
+				//Xmon xmError = new Xmon(LifeStatus.Error, ExchangeType.Send, OrchestratorStateDuccEvent.class);
+				//onException(Exception.class).handled(true).process(xmError);
+		        String route = "StatePost";
+		    	ORTracer ort1 = new ORTracer(route+":error");
+		    	ORTracer ort2 = new ORTracer(route+":begin");
+		    	ORTracer ort3 = new ORTracer(route+":done:orchestratorp");
+		    	ORTracer ort4 = new ORTracer(route+":done:to");
+		    	
+		    	final Predicate blastFilter = new DuccBlastGuardPredicate(duccLogger);
+		    	
+				onException(Exception.class).handled(true).process(ort1);
+		        from("timer:orchestratorStateDumpTimer?fixedRate=true&period=" + statePublishRate)
+		              // This route uses a filter to prevent sudden bursts of messages which
+		        	  // may flood DUCC daemons causing chaos. The filter disposes any event
+		        	  // that appears in a window of 1 sec or less.
+		        	  .filter(blastFilter)	
+		              //.process(xmStart)
+		        	  .process(ort2)
+		        	  .process(orchestratorp)
+		        	  //.process(xmEnded)
+		        	  .process(ort3)
+		        	  .to(targetEndpointToReceiveOrchestratorStateUpdate)
+		        	  .process(ort4)
+		        	  ;
+		      }
+		    };
+	}
+	
+	/**
+	 * Camel Processor responsible for generating Orchestrator's state.
+	 * 
+	 */
+	private class OrchestratorStateProcessor implements Processor {
+		private Orchestrator orchestrator;
+		
+		private OrchestratorStateProcessor(Orchestrator orchestrator) {
+			this.orchestrator = orchestrator;
+		}
+		public void process(Exchange exchange) throws Exception {
+			// Fetch new state from Orchestrator
+			OrchestratorStateDuccEvent jse = orchestrator.getState();
+			//	Add the state object to the Message
+			exchange.getIn().setBody(jse);
+		}
+	}
+	
+	
+	/**
+	 * Creates Camel router that will publish Orchestrator abbreviated state at regular intervals.
+	 * 
+	 * @param targetEndpointToReceiveOrchestratorAbbreviatedStateUpdate - endpoint where to publish state 
+	 * @param statePublishRate - how often to publish state
+	 * @return
+	 * @throws Exception
+	 */
+	private RouteBuilder routeBuilderForOrchestratorAbbreviatedStatePost(final Orchestrator orchestrator, final String targetEndpointToReceiveOrchestratorAbbreviatedStateUpdate, final int statePublishRate) throws Exception {
+		final OrchestratorAbbreviatedStateProcessor orchestratorp =  // an object responsible for generating the state 
+			new OrchestratorAbbreviatedStateProcessor(orchestrator);
+		
+		return new RouteBuilder() {
+		      public void configure() {  
+				//Xmon xmStart = new Xmon(LifeStatus.Start, ExchangeType.Send, OrchestratorAbbreviatedStateDuccEvent.class);
+				//Xmon xmEnded = new Xmon(LifeStatus.Ended, ExchangeType.Send, OrchestratorAbbreviatedStateDuccEvent.class);
+				//Xmon xmError = new Xmon(LifeStatus.Error, ExchangeType.Send, OrchestratorAbbreviatedStateDuccEvent.class);
+				//onException(Exception.class).handled(true).process(xmError);
+		    	String route = "AbbreviatedStatePost";
+		    	ORTracer ort1 = new ORTracer(route+":error");
+		    	ORTracer ort2 = new ORTracer(route+":begin");
+		    	ORTracer ort3 = new ORTracer(route+":done:orchestratorp");
+		    	ORTracer ort4 = new ORTracer(route+":done:to");
+		    	
+		    	final Predicate blastFilter = new DuccBlastGuardPredicate(duccLogger);
+		    	
+				onException(Exception.class).handled(true).process(ort1);
+		        from("timer:orchestratorAbbreviatedStateDumpTimer?fixedRate=true&period=" + statePublishRate)
+		        	// This route uses a filter to prevent sudden bursts of messages which
+		        	// may flood DUCC daemons causing chaos. The filter disposes any event
+		        	// that appears in a window of 1 sec or less.
+		        	.filter(blastFilter)		
+		        	//.process(xmStart)
+		        	.process(ort2)
+		        	.process(orchestratorp)
+		        	//.process(xmEnded)
+                    .process(ort3)
+                    .to(targetEndpointToReceiveOrchestratorAbbreviatedStateUpdate)
+                    .process(ort4)
+		        	;
+		      }
+		    };
+	}
+	
+	/**
+	 * Camel Processor responsible for generating Orchestrator's state.
+	 * 
+	 */
+	private class OrchestratorAbbreviatedStateProcessor implements Processor {
+		private Orchestrator orchestrator;
+		
+		private OrchestratorAbbreviatedStateProcessor(Orchestrator orchestrator) {
+			this.orchestrator = orchestrator;
+		}
+		public void process(Exchange exchange) throws Exception {
+			// Fetch new state from Orchestrator
+			OrchestratorAbbreviatedStateDuccEvent jse = orchestrator.getAbbreviatedState();
+			//	Add the state object to the Message
+			exchange.getIn().setBody(jse);
+		}
+	}
+	
+	/**
+	 * Instantiate a listener to which Camel will route a body of the incoming message.
+	 * The listener should provide a method for each object class it expects to receive.
+	 * Camel uses introspection to analyze given listener and find a match based on
+	 * what is in the incoming message. 
+	 * 
+	 * @return
+	 */
+	public OrchestratorEventListener orchestratorDelegateListener(OrchestratorComponent orchestrator) {
+		OrchestratorEventListener orchestratorel =  new OrchestratorEventListener(orchestrator);
+		return orchestratorel;
+	}
+
+	@Bean 
+	public OrchestratorComponent orchestrator() throws Exception {
+		OrchestratorCommonArea.initialize(common);
+		OrchestratorComponent orchestrator = new OrchestratorComponent(common.camelContext());
+        //	Instantiate JobManagerEventListener delegate listener. This listener will receive
+        //	incoming messages. 
+        OrchestratorEventListener delegateListener = this.orchestratorDelegateListener(orchestrator);
+		//	Inject a dispatcher into the listener in case it needs to send
+		//  a message to another component
+		delegateListener.setDuccEventDispatcher(orchestratorTransport.duccEventDispatcher(common.pmRequestEndpoint, orchestrator.getContext()));
+//		orchestrator.getContext().addRoutes(this.routeBuilderForReplyEndpoint(common.orchestratorRequestEndpoint, delegateListener));
+    orchestrator.getContext().addRoutes(this.routeBuilder(orchestrator.getContext(), delegateListener));
+		orchestrator.getContext().addRoutes(this.routeBuilderForEndpoint(common.rmStateUpdateEndpoint, delegateListener));
+		orchestrator.getContext().addRoutes(this.routeBuilderForEndpoint(common.smStateUpdateEndpoint, delegateListener));
+		orchestrator.getContext().addRoutes(this.routeBuilderForEndpoint(common.jdStateUpdateEndpoint,delegateListener));
+		orchestrator.getContext().addRoutes(this.routeBuilderForEndpoint(common.nodeInventoryEndpoint,delegateListener));
+		orchestrator.getContext().addRoutes(this.routeBuilderForOrchestratorStatePost(orchestrator, common.orchestratorStateUpdateEndpoint, Integer.parseInt(common.orchestratorStatePublishRate)));
+		orchestrator.getContext().addRoutes(this.routeBuilderForOrchestratorAbbreviatedStatePost(orchestrator, common.orchestratorAbbreviatedStateUpdateEndpoint, Integer.parseInt(common.orchestratorAbbreviatedStatePublishRate)));
+		return orchestrator;
+	}
+  public class ErrorProcessor implements Processor {
+
+    public void process(Exchange exchange) throws Exception {
+      // the caused by exception is stored in a property on the exchange
+      Throwable caused = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
+      exchange.getOut().setBody(caused); //XStreamUtils.marshall(caused));
+      caused.printStackTrace();
+    }
+  }
+  public class ServiceRequestHandler {
+    public void handleRequest(@Body SubmitJobDuccEvent jobSubmit) throws Exception {
+ //   public void handleRequest(@Body ErrorProcessor jobSubmit) throws Exception {
+      System.out.println("ServiceRequestHandler Received Request of type: "+jobSubmit.getClass().getName());
+       synchronized(this) {
+         this.wait(2000);
+       }
+    }
+  }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/config/OrchestratorConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/JdStateEventLogger.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/JdStateEventLogger.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/JdStateEventLogger.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/JdStateEventLogger.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.event;
+
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.orchestrator.OrchestratorCommonArea;
+import org.apache.uima.ducc.transport.event.JdStateDuccEvent;
+
+
+public class JdStateEventLogger {
+	
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(JdStateEventLogger.class.getName());
+	
+	private static final OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+	private static final Messages messages = orchestratorCommonArea.getSystemMessages();
+	
+	public static void receiver(JdStateDuccEvent jdStateDuccEvent) {
+		String methodName = "receiver";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return;
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/JdStateEventLogger.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/NodeInventoryEventLogger.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/NodeInventoryEventLogger.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/NodeInventoryEventLogger.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/NodeInventoryEventLogger.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.event;
+
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.orchestrator.OrchestratorCommonArea;
+import org.apache.uima.ducc.transport.event.NodeInventoryUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+
+
+public class NodeInventoryEventLogger {
+	
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(NodeInventoryEventLogger.class.getName());
+	
+	private static final OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+	private static final Messages messages = orchestratorCommonArea.getSystemMessages();
+	
+	public static void receiver(NodeInventoryUpdateDuccEvent nodeInventoryUpdateDuccEvent) {
+		String methodName = "receiver";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		HashMap<DuccId, IDuccProcess> processMap = nodeInventoryUpdateDuccEvent.getProcesses();
+		if(processMap != null) {
+			logger.debug(methodName, null, processMap.size());
+			Iterator<DuccId> iterator = processMap.keySet().iterator();
+			while(iterator.hasNext()) {
+				DuccId processId = iterator.next();
+				DuccId jobId = orchestratorCommonArea.getProcessAccounting().getJobId(processId);
+				IDuccProcess process = processMap.get(processId);
+				String processState = ""+process.getProcessState();
+				String nodeIdentity = ""+process.getNodeIdentity();
+				String PID = ""+process.getPID();
+				String resourceState = ""+process.getResourceState();
+				logger.debug(methodName, jobId, processId, processState+" "+resourceState+" "+nodeIdentity+" "+PID);
+			}
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return;
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/NodeInventoryEventLogger.java
------------------------------------------------------------------------------
    svn:eol-style = native