You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:37:56 UTC

svn commit: r1427956 [2/4] - in /uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator: main/ main/java/ main/java/org/ main/java/org/apache/ main/java/org/apache/uima/ main/java/org/apache/uima/ducc/ main/java/org/apache/uima/ducc/orchestrator/ main/jav...

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorComponent.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorComponent.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorComponent.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,874 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+
+import org.apache.camel.CamelContext;
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties;
+import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties.DaemonName;
+import org.apache.uima.ducc.common.component.AbstractDuccComponent;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.crypto.Crypto;
+import org.apache.uima.ducc.common.crypto.CryptoException;
+import org.apache.uima.ducc.common.crypto.Crypto.AccessType;
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.main.DuccService;
+import org.apache.uima.ducc.common.system.SystemState;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
+import org.apache.uima.ducc.common.utils.LinuxUtils;
+import org.apache.uima.ducc.common.utils.TimeStamp;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.orchestrator.OrchestratorConstants.StartType;
+import org.apache.uima.ducc.orchestrator.maintenance.MaintenanceThread;
+import org.apache.uima.ducc.orchestrator.maintenance.NodeAccounting;
+import org.apache.uima.ducc.transport.event.CancelJobDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelServiceDuccEvent;
+import org.apache.uima.ducc.transport.event.JdStateDuccEvent;
+import org.apache.uima.ducc.transport.event.NodeInventoryUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.OrchestratorAbbreviatedStateDuccEvent;
+import org.apache.uima.ducc.transport.event.OrchestratorStateDuccEvent;
+import org.apache.uima.ducc.transport.event.RmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.SmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitJobDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitServiceDuccEvent;
+import org.apache.uima.ducc.transport.event.cli.JobReplyProperties;
+import org.apache.uima.ducc.transport.event.cli.JobRequestProperties;
+import org.apache.uima.ducc.transport.event.cli.JobSpecificationProperties;
+import org.apache.uima.ducc.transport.event.cli.ReservationReplyProperties;
+import org.apache.uima.ducc.transport.event.cli.ReservationRequestProperties;
+import org.apache.uima.ducc.transport.event.cli.SpecificationProperties;
+import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+import org.apache.uima.ducc.transport.event.common.DuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
+import org.apache.uima.ducc.transport.event.common.IDuccWork;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.IRationale;
+import org.apache.uima.ducc.transport.event.common.Rationale;
+import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.JobCompletionType;
+import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.ReservationCompletionType;
+import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
+import org.apache.uima.ducc.transport.event.common.IDuccState.ReservationState;
+import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ProcessDeallocationType;
+import org.apache.uima.ducc.transport.event.jd.DriverStatusReport;
+import org.apache.uima.ducc.transport.event.rm.IRmJobState;
+import org.apache.uima.ducc.transport.event.sm.ServiceMap;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+@Configuration
+@Import({CommonConfiguration.class})
+public class OrchestratorComponent extends AbstractDuccComponent 
+implements Orchestrator {
+	//	Springframework magic to inject instance of {@link CommonConfiguration}
+	@Autowired CommonConfiguration common;
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(OrchestratorComponent.class.getName());
+	
+	private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+	private Messages messages = orchestratorCommonArea.getSystemMessages();
+	private DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
+	private StateManager stateManager = StateManager.getInstance();
+	//private HealthMonitor healthMonitor = HealthMonitor.getInstance();
+	//private MqReaper mqReaper = MqReaper.getInstance();
+	private JobFactory jobFactory = JobFactory.getInstance();
+	private ReservationFactory reservationFactory = ReservationFactory.getInstance();
+	private CommonConfiguration commonConfiguration = orchestratorCommonArea.getCommonConfiguration();
+	private JobDriverHostManager hostManager = orchestratorCommonArea.getHostManager();
+	private StateJobAccounting stateJobAccounting = StateJobAccounting.getInstance();
+	
+	public OrchestratorComponent(CamelContext context) {
+		super("Orchestrator", context);
+	}
+	
+	private void force(IDuccWorkJob job, IRationale rationale){
+		String methodName = "force";
+		if(!job.isCompleted()) {
+			stateJobAccounting.stateChange(job, JobState.Completed);
+			job.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+			stateJobAccounting.complete(job, JobCompletionType.CanceledBySystem, rationale);
+			OrchestratorCommonArea.getInstance().getProcessAccounting().deallocateAndStop(job,ProcessDeallocationType.JobCanceled);
+			logger.info(methodName, job.getDuccId(),JobCompletionType.CanceledBySystem);
+		}
+	}
+	
+	/*
+	private void cancel(IDuccWorkJob job) {
+		String methodName = "cancel";
+		if(!job.isFinished()) {
+			stateJobAccounting.stateChange(job, JobState.Completing);
+			job.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+			stateJobAccounting.complete(job, JobCompletionType.CanceledBySystem);
+			OrchestratorCommonArea.getInstance().getProcessAccounting().deallocateAndStop(job,ProcessDeallocationType.JobCanceled);
+			logger.info(methodName, job.getDuccId(),JobCompletionType.CanceledBySystem);
+		}
+	}
+	*/
+	
+	private void cancel(IDuccWorkReservation reservation) {
+		String methodName = "cancel";
+		reservation.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+		reservation.stateChange(ReservationState.Completed);
+		reservation.complete(ReservationCompletionType.CanceledBySystem);
+		logger.info(methodName, reservation.getDuccId(), ReservationCompletionType.CanceledBySystem);
+	}
+	
+	private StartType getStartTypeProperty() 
+	{
+		String methodName = "getStartTypeProperty";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		StartType startType = StartType.warm;
+		String property = commonConfiguration.orchestratorStartType;
+		if(property != null) {
+			String startTypeProperty = property.trim().toLowerCase();
+			if(startTypeProperty.equals("cold")) {
+				startType = StartType.cold;
+			}
+			else if(startTypeProperty.equals("warm")) {
+				startType = StartType.warm;
+			}
+			else if(startTypeProperty.equals("hot")) {
+				startType = StartType.hot;
+			}
+			else {
+				logger.warn(methodName, null, "ducc.orchestrator.start.type value in ducc.properties not recognized: "+property);
+			}
+		}
+		else {
+			logger.warn(methodName, null, "ducc.orchestrator.start.type not found in ducc.properties");
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return startType;
+	}
+	
+	private void resolveSignatureRequired() throws CryptoException 
+	{
+		String methodName = "resolveSignatureRequired";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		String property = commonConfiguration.signatureRequired;
+		if(property != null) {
+			String signatureRequiredProperty = property.trim().toLowerCase();
+			if(signatureRequiredProperty.equals("on")) {
+				orchestratorCommonArea.setSignatureRequired();
+				logger.info(methodName, null, "ducc.signature.required: "+property);
+			}
+			else if(signatureRequiredProperty.equals("off")) {
+				orchestratorCommonArea.resetSignatureRequired();
+				logger.info(methodName, null, "ducc.signature.required: "+property);
+			}
+			else {
+				logger.warn(methodName, null, "ducc.signature.required value in ducc.properties not recognized: "+property);
+			}
+		}
+		else {
+			logger.warn(methodName, null, "ducc.signature.required not found in ducc.properties");
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return;
+	}
+	
+	private StartType getStartTypeOverride(String[] args) 
+	{
+		String methodName = "getStartTypeOverride";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		StartType startType = null;
+		// override start type if specified on command line
+		if(args != null) {
+			for( String arg : args) {
+				logger.debug(methodName, null, "arg: "+arg);
+				String flag = arg.trim();
+				while(flag.startsWith("-")) {
+						flag = flag.replaceFirst("-", "");
+				}
+				if(flag.equals(StartType.cold.toString())) {
+					startType = StartType.cold;
+				}
+				else if(flag.equals(StartType.warm.toString())) {
+					startType = StartType.warm;
+				}
+				else if(flag.equals(StartType.hot.toString())) {
+					startType = StartType.hot;
+				}
+				else {
+					logger.warn(methodName, null, "unrecognized arg: "+arg);
+				}
+			}
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return startType;
+	}
+	
+	private StartType getStartType(String[] args) 
+	{
+		String methodName = "getStartType";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		StartType startType = OrchestratorConstants.startTypeDefault;
+		StartType property = getStartTypeProperty();
+		StartType override = getStartTypeOverride(args) ;
+		StringBuffer sb = new StringBuffer();
+		sb.append("start type: ");
+		if(override != null) {
+			startType = override;
+			sb.append(startType);
+			sb.append(", "+"override");
+		}
+		else if(property != null) {
+			startType = property;
+			sb.append(startType);
+			sb.append(", "+"property");
+		}
+		else {
+			sb.append(startType);
+			sb.append(startType);
+			sb.append(", "+"default");
+		}
+		logger.info(methodName, null, sb);
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return startType;
+	}
+		
+	
+	public void start(DuccService service, String[] args) throws Exception {
+		String methodName = "start";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		try {
+			StartType startType = getStartType(args);
+			logger.info(methodName, null, "##### "+startType+" #####");
+			boolean saveState = false;
+			long t0 = System.currentTimeMillis();
+			synchronized(workMap) {
+				Iterator<IDuccWork> iterator = workMap.values().iterator();
+				while(iterator.hasNext()) {
+					IDuccWork duccWork = iterator.next();
+					switch(duccWork.getDuccType()) {
+					case Job:
+					case Service:
+						IDuccWorkJob job = (IDuccWorkJob) duccWork;
+						switch(startType) {
+						case cold:
+							force(job, new Rationale("system cold start"));
+							saveState = true;
+							break;
+						case warm:
+							force(job, new Rationale("system warm start"));
+							saveState = true;
+							break;
+						case hot:
+							break;
+						}
+						break;
+					case Reservation:
+						IDuccWorkReservation reservation = (IDuccWorkReservation) duccWork;
+						switch(startType) {
+						case cold:
+							cancel(reservation);
+							saveState = true;
+							break;
+						case warm:
+							if(commonConfiguration.jdHostClass.equals(reservation.getSchedulingInfo().getSchedulingClass())) {
+								cancel(reservation);
+								saveState = true;
+							}
+							break;
+						case hot:
+							if(commonConfiguration.jdHostClass.equals(reservation.getSchedulingInfo().getSchedulingClass())) {
+								IDuccReservationMap map = reservation.getReservationMap();
+								Iterator<Entry<DuccId, IDuccReservation>> entries = map.entrySet().iterator();
+								while(entries.hasNext()) {
+									Entry<DuccId, IDuccReservation> entry = entries.next();
+									NodeIdentity node = entry.getValue().getNodeIdentity();
+									hostManager.addNode(node);
+								}
+							}
+							break;
+						}
+						break;
+					}
+				}
+			}
+			long t1 = System.currentTimeMillis();
+			long elapsed = t1 - t0;
+			if(elapsed > Constants.SYNC_LIMIT) {
+				logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+			}
+			if(saveState) {
+				OrchestratorCheckpoint.getInstance().saveState();
+			}
+			switch(startType) {
+			case cold:
+			case warm:
+				hostManager = JobDriverHostManager.getInstance();
+				hostManager.init();
+				break;
+			case hot:
+				hostManager = JobDriverHostManager.getInstance();
+				hostManager.conditional();
+				break;
+			}
+			resolveSignatureRequired();
+			MaintenanceThread.getInstance().start();
+		} 
+		catch(Throwable t) {
+			logger.error(methodName, null, t);
+		}
+		super.start(service, args);
+		DuccDaemonRuntimeProperties.getInstance().boot(DaemonName.Orchestrator,getProcessJmxUrl());
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	
+	/**
+	 * Job Driver State Reconciliation
+	 */
+	public void reconcileJdState(JdStateDuccEvent duccEvent) {
+		String methodName = "reconcileJdState";
+		DriverStatusReport dsr = duccEvent.getState();
+		DuccId duccId = null;
+		if(dsr != null) {
+			duccId = dsr.getDuccId();
+		}
+		logger.trace(methodName, null, messages.fetch("enter"));
+		if(dsr != null) {
+			logger.info(methodName, duccId, dsr.getLogReport());
+			stateManager.reconcileState(dsr);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	/**
+	 * Resources Manager State Reconciliation
+	 */
+	public void reconcileRmState(RmStateDuccEvent duccEvent) {
+		String methodName = "reconcileRmState";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		Map<DuccId, IRmJobState> resourceMap = duccEvent.getJobState();
+		try {
+			stateManager.reconcileState(resourceMap);
+		}
+		catch(Exception e) {
+			logger.error(methodName, null, e);
+		}
+		
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	/**
+	 * Services Manager State Reconciliation
+	 */
+	public void reconcileSmState(SmStateDuccEvent duccEvent) {
+		String methodName = "reconcileSmState";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		ServiceMap serviceMap = duccEvent.getServiceMap();
+		stateManager.reconcileState(serviceMap);
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	/**
+	 * Node Inventory State Reconciliation
+	 */
+	
+	public void reconcileNodeInventory(NodeInventoryUpdateDuccEvent duccEvent) {
+		String methodName = "reconcileNodeInventory";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		HashMap<DuccId, IDuccProcess> processMap = duccEvent.getProcesses();
+		stateManager.reconcileState(processMap);
+		NodeAccounting.getInstance().heartbeat(processMap);
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	/**
+	 * Publish Orchestrator State
+	 */
+	
+	public OrchestratorStateDuccEvent getState() {
+		String methodName = "getState";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		OrchestratorStateDuccEvent orchestratorStateDuccEvent = new OrchestratorStateDuccEvent();
+		try {
+			long t0 = System.currentTimeMillis();
+			DuccWorkMap workMapCopy = workMap.deepCopy();
+			long t1 = System.currentTimeMillis();
+			long elapsed = t1 - t0;
+			if(elapsed > Constants.SYNC_LIMIT) {
+				logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+			}
+			int activeJobs = workMapCopy.getJobCount();
+			int activeReservations = workMapCopy.getReservationCount();
+			int activeServices = workMapCopy.getServiceCount();
+			logger.debug(methodName, null, messages.fetch("publishing state")+" "+
+											messages.fetchLabel("active job count")+activeJobs
+											+" "+
+											messages.fetchLabel("active reservation count")+activeReservations
+											+" "+
+											messages.fetchLabel("active service count")+activeServices
+											);
+			orchestratorStateDuccEvent.setWorkMap(workMapCopy);
+			//stateManager.prune(workMapCopy);
+			//healthMonitor.cancelNonViableJobs();
+			//mqReaper.removeUnusedJdQueues(workMapCopy);
+		}
+		catch(Throwable t) {
+			logger.error(methodName, null, t);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return orchestratorStateDuccEvent;
+	}
+	/**
+	 * Publish Orchestrator Abbreviated State
+	 */
+	
+	public OrchestratorAbbreviatedStateDuccEvent getAbbreviatedState() {
+		String methodName = "getAbbreviatedState";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		OrchestratorAbbreviatedStateDuccEvent orchestratorAbbreviatedStateDuccEvent = new OrchestratorAbbreviatedStateDuccEvent();
+		try {
+			long t0 = System.currentTimeMillis();
+			DuccWorkMap workMapCopy = workMap.deepCopy();
+			long t1 = System.currentTimeMillis();
+			long elapsed = t1 - t0;
+			if(elapsed > Constants.SYNC_LIMIT) {
+				logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+			}
+			int activeJobs = workMapCopy.getJobCount();
+			int activeReservations = workMapCopy.getReservationCount();
+			int activeServices = workMapCopy.getServiceCount();
+			logger.debug(methodName, null, messages.fetch("publishing state")+" "+
+											messages.fetchLabel("active job count")+activeJobs
+											+" "+
+											messages.fetchLabel("active reservation count")+activeReservations
+											+" "+
+											messages.fetchLabel("active service count")+activeServices
+											);
+			long t2 = System.currentTimeMillis();
+			orchestratorAbbreviatedStateDuccEvent.setWorkMap(workMapCopy);
+			long t3 = System.currentTimeMillis();
+			long elapsed2 = t3 - t2;
+			if(elapsed > Constants.SYNC_LIMIT) {
+				logger.debug(methodName, null, "elapsed msecs: "+elapsed2);
+			}
+			//stateManager.prune(workMapCopy);
+			//healthMonitor.cancelNonViableJobs();
+			//mqReaper.removeUnusedJdQueues(workMapCopy);
+		}
+		catch(Throwable t) {
+			logger.error(methodName, null, t);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return orchestratorAbbreviatedStateDuccEvent;
+	}
+	@SuppressWarnings("unchecked")
+	private void submitError(Properties properties, String error_message) {
+		String key = SpecificationProperties.key_submit_errors;
+		ArrayList<String> value_submit_errors = (ArrayList<String>) properties.get(key);
+		if(value_submit_errors == null) {
+			value_submit_errors = new ArrayList<String>();
+			properties.put(key, value_submit_errors);
+		}
+		value_submit_errors.add(error_message);
+	}
+	private boolean isSignatureInvalid(Properties properties) {
+		String methodName = "isSignatureInvalid";
+		boolean retVal = false;
+		try {
+			if(orchestratorCommonArea.isSignatureRequired()) {
+				String user = properties.getProperty(SpecificationProperties.key_user);
+				String userHome = LinuxUtils.getUserHome(user);
+				String runmode = DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_runmode);
+				if(runmode != null) {
+					if(runmode.equals("Test")) {
+						userHome = System.getProperty("user.home");
+					}
+				}
+				Crypto crypto = new Crypto(userHome,AccessType.READER);
+				String signature = (String)crypto.decrypt((byte[])properties.get(SpecificationProperties.key_signature));
+				if(!user.equals(signature)) {
+					logger.warn(methodName, null, "user:"+user+" signature:"+signature+" valid:n");
+				}
+				else {
+					logger.debug(methodName, null, "user:"+user+" signature:"+signature+" valid:y");
+				}
+				retVal = !user.equals(signature);
+			}
+		}
+		catch(Throwable t) {
+			logger.error(methodName, null, t);
+		}
+		return retVal;
+	}
+	/**
+	 * Handle Job Submit
+	 */
+	
+	public void startJob(SubmitJobDuccEvent duccEvent) {
+		String methodName = "startJob";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		try {
+			JobRequestProperties properties = (JobRequestProperties) duccEvent.getProperties();
+			int jdNodes = hostManager.nodes();
+			if(isSignatureInvalid(properties)) {
+				String error_message = messages.fetch(" type=authentication error, text=signature not valid.");
+				logger.error(methodName, null, error_message);
+				submitError(properties, error_message);
+			}
+			else if(jdNodes <= 0) {
+				String error_message = messages.fetch(" type=system error, text=job driver node unavailable.");
+				logger.error(methodName, null, error_message);
+				submitError(properties, error_message);
+			}
+			else if(!SystemState.getInstance().isAcceptJobs()) {
+				String error_message = messages.fetch(" type=system error, text=system is not accepting new work at this time.");
+				logger.error(methodName, null, error_message);
+				submitError(properties, error_message);
+			}
+			else {
+				if(Validate.request(duccEvent)) {
+					DuccWorkJob duccWorkJob = jobFactory.create(common,properties);
+					long t0 = System.currentTimeMillis();
+					workMap.addDuccWork(duccWorkJob);
+					long t1 = System.currentTimeMillis();
+					long elapsed = t1 - t0;
+					if(elapsed > Constants.SYNC_LIMIT) {
+						logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+					}
+					// state: Received
+					stateJobAccounting.stateChange(duccWorkJob, JobState.Received);
+					OrchestratorCheckpoint.getInstance().saveState();
+					// state: WaitingForDriver
+					stateJobAccounting.stateChange(duccWorkJob, JobState.WaitingForDriver);
+					OrchestratorCheckpoint.getInstance().saveState();
+					// prepare for reply to submitter
+					properties.put(JobRequestProperties.key_id, duccWorkJob.getId());
+					duccEvent.setProperties(properties);
+				}
+				else {
+					logger.info(methodName, null, messages.fetch("TODO")+" prepare error reply");
+					//TODO
+				}
+			}
+		}
+		catch(Throwable t) {
+			logger.error(methodName, null, messages.fetch("TODO")+" prepare error reply",t);
+			//TODO
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return;
+	}
+	/**
+	 * Handle Job Cancel
+	 */
+	
+	public void stopJob(CancelJobDuccEvent duccEvent) {
+		String methodName = "stopJob";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		Properties properties = duccEvent.getProperties();
+		if(isSignatureInvalid(properties)) {
+			String error_message = messages.fetch(" type=authentication error, text=signature not valid.");
+			logger.error(methodName, null, error_message);
+			submitError(properties, error_message);
+		}
+		else if(Validate.request(duccEvent)) {
+			// update state
+			String jobId = properties.getProperty(JobRequestProperties.key_id);
+			long t0 = System.currentTimeMillis();
+			DuccWorkJob duccWorkJob = (DuccWorkJob) workMap.findDuccWork(DuccType.Job,jobId);
+			long t1 = System.currentTimeMillis();
+			long elapsed = t1 - t0;
+			if(elapsed > Constants.SYNC_LIMIT) {
+				logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+			}
+			if(duccWorkJob != null) {
+				String userid = properties.getProperty(JobSpecificationProperties.key_user);
+				IRationale rationale = new Rationale("job canceled by userid "+userid);
+				stateManager.jobTerminate(duccWorkJob, JobCompletionType.CanceledByUser, rationale, ProcessDeallocationType.JobCanceled);
+				OrchestratorCheckpoint.getInstance().saveState();
+				// prepare for reply to canceler
+				properties.put(JobReplyProperties.key_message, JobReplyProperties.msg_canceled);
+				duccEvent.setProperties(properties);
+				logger.info(methodName, duccWorkJob.getDuccId(), messages.fetchLabel("job state")+duccWorkJob.getJobState());
+			}
+			
+			else {
+				// prepare undefined reply 
+				properties.put(JobReplyProperties.key_message, JobReplyProperties.msg_not_found);
+				duccEvent.setProperties(properties);
+				logger.info(methodName, null, jobId+" : "+messages.fetch("job not found"));
+			}
+		}
+		else {
+			logger.info(methodName, null, messages.fetch("TODO")+" prepare error reply");
+			//TODO
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return;
+	}
+	
+	
+	public void startReservation(SubmitReservationDuccEvent duccEvent) {
+		String methodName = "startReservation";
+		logger.trace(methodName, null, messages.fetch("enter"));	
+		try {
+			Properties properties = duccEvent.getProperties();
+			if(isSignatureInvalid(properties)) {
+				String error_message = messages.fetch(" type=authentication error, text=signature not valid.");
+				logger.error(methodName, null, error_message);
+				submitError(properties, error_message);
+			}
+			else if(Validate.request(duccEvent)) {
+				DuccWorkReservation duccWorkReservation = reservationFactory.create(common,(ReservationRequestProperties)properties);
+				long t0 = System.currentTimeMillis();
+				workMap.addDuccWork(duccWorkReservation);
+				long t1 = System.currentTimeMillis();
+				long elapsed = t1 - t0;
+				if(elapsed > Constants.SYNC_LIMIT) {
+					logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+				}
+				// state: Received
+				duccWorkReservation.stateChange(ReservationState.Received);
+				OrchestratorCheckpoint.getInstance().saveState();
+				// state: WaitingForResources
+				duccWorkReservation.stateChange(ReservationState.WaitingForResources);
+				OrchestratorCheckpoint.getInstance().saveState();
+				int counter = 0;
+				while(duccWorkReservation.isPending()) {
+					counter++;
+					if(counter > 5) {
+						counter = 0;
+						logger.info(methodName, duccWorkReservation.getDuccId(), "waiting for allocation...");
+					}
+					Thread.sleep(1000);
+				}
+				// prepare for reply to submitter
+				properties.put(ReservationRequestProperties.key_id, duccWorkReservation.getId());
+				// node list
+				properties.put(ReservationRequestProperties.key_node_list, "none");
+				if(!duccWorkReservation.getReservationMap().isEmpty()) {
+					StringBuffer sb = new StringBuffer();
+					TreeMap<String,Integer> nodeMap = new TreeMap<String,Integer>(); 
+					IDuccReservationMap map = duccWorkReservation.getReservationMap();
+					for (DuccId key : map.keySet()) { 
+						IDuccReservation value = duccWorkReservation.getReservationMap().get(key);
+						String node = value.getNodeIdentity().getName();
+						if(!nodeMap.containsKey(node)) {
+							nodeMap.put(node,new Integer(0));
+						}
+						Integer count = nodeMap.get(node);
+						count++;
+						nodeMap.put(node,count);
+					}
+					for (String node: nodeMap.keySet()) { 
+						sb.append(" "+node+" "+"["+nodeMap.get(node)+"]"+";");
+					}
+					properties.put(ReservationRequestProperties.key_node_list,sb.toString().trim());
+				}
+				duccEvent.setProperties(properties);
+			}
+			else {
+				logger.info(methodName, null, messages.fetch("TODO")+" prepare error reply");
+				//TODO
+			}
+		}
+		catch(Exception e) {
+			logger.error(methodName, null, messages.fetch("TODO")+" prepare error reply",e);
+			//TODO
+		}
+		
+		
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return;
+	}
+	
+	
+	public void stopReservation(CancelReservationDuccEvent duccEvent) {
+		String methodName = "stopReservation";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		Properties properties = duccEvent.getProperties();
+		if(isSignatureInvalid(properties)) {
+			String error_message = messages.fetch(" type=authentication error, text=signature not valid.");
+			logger.error(methodName, null, error_message);
+			submitError(properties, error_message);
+		}
+		else {
+			// update state
+			String id = properties.getProperty(ReservationRequestProperties.key_id);
+			long t0 = System.currentTimeMillis();
+			DuccWorkReservation duccWorkReservation = (DuccWorkReservation) workMap.findDuccWork(DuccType.Reservation,id);
+			long t1 = System.currentTimeMillis();
+			long elapsed = t1 - t0;
+			if(elapsed > Constants.SYNC_LIMIT) {
+				logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+			}
+			if(Validate.request(duccEvent,duccWorkReservation)) {
+				if(duccWorkReservation != null) {
+					String cancelUser = properties.getProperty(SpecificationProperties.key_user);
+					duccWorkReservation.getStandardInfo().setCancelUser(cancelUser);
+					duccWorkReservation.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+					duccWorkReservation.stateChange(ReservationState.Completed);
+					duccWorkReservation.complete(ReservationCompletionType.CanceledByUser);
+					String u1 = duccWorkReservation.getStandardInfo().getUser();
+					String u2 = duccWorkReservation.getStandardInfo().getCancelUser();
+					if(u1 != null) {
+						if(u2 != null) {
+							if(!u1.equals(u2)) {
+								duccWorkReservation.complete(ReservationCompletionType.CanceledByAdmin);
+							}
+						}
+					}
+					OrchestratorCheckpoint.getInstance().saveState();
+					// prepare for reply to canceler
+					properties.put(ReservationReplyProperties.key_message, ReservationReplyProperties.msg_canceled);
+					duccEvent.setProperties(properties);
+					logger.info(methodName, duccWorkReservation.getDuccId(), messages.fetchLabel("reservation state")+duccWorkReservation.getReservationState());
+				}
+				else {
+					// prepare undefined reply 
+					properties.put(ReservationReplyProperties.key_message, ReservationReplyProperties.msg_not_found);
+					duccEvent.setProperties(properties);
+					logger.info(methodName, null, id+" : "+messages.fetch("reservation not found"));
+				}
+			}
+			else {
+				properties.put(ReservationReplyProperties.key_message, ReservationReplyProperties.msg_user_not_authorized);
+				duccEvent.setProperties(properties);
+				logger.info(methodName, null, id+" : "+messages.fetch("not authorized"));
+			}
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return;
+	}
+	
+	/**
+	 * Handle Service Submit
+	 */
+	
+	public void startService(SubmitServiceDuccEvent duccEvent) {
+		String methodName = "startService";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		try {
+			JobRequestProperties properties = (JobRequestProperties) duccEvent.getProperties();
+			NodeIdentity nodeIdentity = hostManager.getNode();
+			if(isSignatureInvalid(properties)) {
+				String error_message = messages.fetch(" type=authentication error, text=signature not valid.");
+				logger.error(methodName, null, error_message);
+				submitError(properties, error_message);
+			}
+			else if(nodeIdentity == null) {
+				String error_message = messages.fetch(" type=system error, text=job driver node unavailable.");
+				logger.error(methodName, null, error_message);
+				submitError(properties, error_message);
+			}
+			else if(!SystemState.getInstance().isAcceptJobs()) {
+				String error_message = messages.fetch(" type=system error, text=system is not accepting new work at this time.");
+				logger.error(methodName, null, error_message);
+				submitError(properties, error_message);
+			}
+			else {
+				logger.debug(methodName, null, messages.fetch("job driver host")+" "+messages.fetchLabel("IP")+nodeIdentity.getIp()+" "+messages.fetchLabel("name")+nodeIdentity.getName());
+				if(Validate.request(duccEvent)) {
+					DuccWorkJob duccWorkJob = jobFactory.create(common,properties);
+					long t0 = System.currentTimeMillis();
+					workMap.addDuccWork(duccWorkJob);
+					long t1 = System.currentTimeMillis();
+					long elapsed = t1 - t0;
+					if(elapsed > Constants.SYNC_LIMIT) {
+						logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+					}
+					// state: Received
+					stateJobAccounting.stateChange(duccWorkJob, JobState.Received);
+					OrchestratorCheckpoint.getInstance().saveState();
+					// state: WaitingForServices
+					stateJobAccounting.stateChange(duccWorkJob, JobState.WaitingForServices);
+					OrchestratorCheckpoint.getInstance().saveState();
+					// prepare for reply to submitter
+					properties.put(JobRequestProperties.key_id, duccWorkJob.getId());
+					duccEvent.setProperties(properties);
+				}
+				else {
+					logger.info(methodName, null, messages.fetch("TODO")+" prepare error reply");
+					//TODO
+				}
+			}
+		}
+		catch(Throwable t) {
+			logger.error(methodName, null, messages.fetch("TODO")+" prepare error reply",t);
+			//TODO
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return;
+	}
+	
+	/**
+	 * Handle Service Cancel
+	 */
+	
+	public void stopService(CancelServiceDuccEvent duccEvent) {
+		String methodName = "stopService";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		Properties properties = duccEvent.getProperties();
+		if(isSignatureInvalid(properties)) {
+			String error_message = messages.fetch(" type=authentication error, text=signature not valid.");
+			logger.error(methodName, null, error_message);
+			submitError(properties, error_message);
+		}
+		else if(Validate.request(duccEvent)) {
+			// update state
+			String jobId = properties.getProperty(JobRequestProperties.key_id);
+			long t0 = System.currentTimeMillis();
+			DuccWorkJob duccWorkJob = (DuccWorkJob) workMap.findDuccWork(DuccType.Service,jobId);
+			long t1 = System.currentTimeMillis();
+			long elapsed = t1 - t0;
+			if(elapsed > Constants.SYNC_LIMIT) {
+				logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+			}
+			if(duccWorkJob != null) {
+				String userid = properties.getProperty(JobSpecificationProperties.key_user);
+				IRationale rationale = new Rationale("service canceled by "+userid);
+				stateManager.jobTerminate(duccWorkJob, JobCompletionType.CanceledByUser, rationale, ProcessDeallocationType.JobCanceled);
+				OrchestratorCheckpoint.getInstance().saveState();
+				// prepare for reply to canceler
+				properties.put(JobReplyProperties.key_message, JobReplyProperties.msg_canceled);
+				duccEvent.setProperties(properties);
+				logger.info(methodName, duccWorkJob.getDuccId(), messages.fetchLabel("service state")+duccWorkJob.getJobState());
+			}
+			
+			else {
+				// prepare undefined reply 
+				properties.put(JobReplyProperties.key_message, JobReplyProperties.msg_not_found);
+				duccEvent.setProperties(properties);
+				logger.info(methodName, null, jobId+" : "+messages.fetch("service not found"));
+			}
+		}
+		else {
+			logger.info(methodName, null, messages.fetch("TODO")+" prepare error reply");
+			//TODO
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return;
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorComponent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorConstants.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorConstants.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorConstants.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorConstants.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import java.io.Serializable;
+
+public interface OrchestratorConstants extends Serializable {
+	
+		public static StartType startTypeDefault = StartType.warm;
+	
+		public enum StartType {
+			cold, // Recover: All is lost					JD host: employ new
+			warm, // Recover: Reservations only (default) 	JD host: employ new
+			hot , // Recover: Reservations and Jobs, 		JD host: employ current
+		}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorConstants.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.TimeStamp;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
+import org.apache.uima.ducc.transport.event.common.IDuccWork;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkExecutable;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.ITimeWindow;
+import org.apache.uima.ducc.transport.event.common.TimeWindow;
+import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ProcessDeallocationType;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ResourceState;
+import org.apache.uima.ducc.transport.event.jd.DriverStatusReport;
+
+
+public class ProcessAccounting {
+	
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(ProcessAccounting.class.getName());
+	
+	private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+	private Messages messages = orchestratorCommonArea.getSystemMessages();
+	private DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
+	
+	private ConcurrentHashMap<DuccId,DuccId> processToJobMap = new ConcurrentHashMap<DuccId,DuccId>();
+	
+	private StateJobAccounting stateJobAccounting = StateJobAccounting.getInstance();
+	
+	public ProcessAccounting() {
+	}
+	
+	public ProcessAccounting(ConcurrentHashMap<DuccId,DuccId> processToJobMap) {
+		setProcessToJobMap(processToJobMap);
+	}
+	
+	public ConcurrentHashMap<DuccId,DuccId> getProcessToJobMap() {
+		return this.processToJobMap;
+	}
+	
+	private void setProcessToJobMap(ConcurrentHashMap<DuccId,DuccId> processToJobMap) {
+		this.processToJobMap = processToJobMap;
+	}
+	
+	public DuccId getJobId(DuccId processId) {
+		String methodName = "getJobId";
+		DuccId retVal;
+		long t0 = System.currentTimeMillis();
+		synchronized(workMap) {
+			retVal = processToJobMap.get(processId);
+		}
+		long t1 = System.currentTimeMillis();
+		long elapsed = t1 - t0;
+		if(elapsed > Constants.SYNC_LIMIT) {
+			logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+		}
+		return retVal;
+	}
+	
+	public int processCount() {
+		String methodName = "processCount";
+		int retVal;
+		long t0 = System.currentTimeMillis();
+		synchronized(workMap) {
+			retVal = processToJobMap.size();
+		}
+		long t1 = System.currentTimeMillis();
+		long elapsed = t1 - t0;
+		if(elapsed > Constants.SYNC_LIMIT) {
+			logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+		}
+		return retVal;
+	}
+	
+	public boolean addProcess(DuccId processId, DuccId jobId) {
+		String methodName = "addProcess";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		boolean retVal = false;
+		long t0 = System.currentTimeMillis();
+		synchronized(workMap) {
+			if(!processToJobMap.containsKey(processId)) {
+				processToJobMap.put(processId, jobId);
+				retVal = true;
+				logger.info(methodName, jobId, processId, messages.fetch("added"));
+			}
+			else {
+				logger.warn(methodName, jobId, processId, messages.fetch("exists"));
+			}
+		}
+		long t1 = System.currentTimeMillis();
+		long elapsed = t1 - t0;
+		if(elapsed > Constants.SYNC_LIMIT) {
+			logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return retVal;
+	}
+	
+	public boolean removeProcess(DuccId processId) {
+		String methodName = "removeProcess";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		boolean retVal = false;
+		long t0 = System.currentTimeMillis();
+		synchronized(workMap) {
+			if(processToJobMap.containsKey(processId)) {
+				DuccId jobId = processToJobMap.remove(processId);
+				retVal = true;
+				logger.info(methodName, jobId, processId, messages.fetch("removed"));
+			}
+			else {
+				logger.info(methodName, null, processId, messages.fetch("not found"));
+			}
+		}
+		long t1 = System.currentTimeMillis();
+		long elapsed = t1 - t0;
+		if(elapsed > Constants.SYNC_LIMIT) {
+			logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return retVal;
+	}
+	
+	private boolean compare(String a, String b) {
+		boolean retVal = false;
+		if(a == null) {
+			if(b == null) {
+				retVal = true;
+			}
+		}
+		else {
+			return a.equals(b);
+		}
+		return retVal;
+	}
+	
+	private boolean compare(ITimeWindow a, ITimeWindow b) {
+		boolean retVal = false;
+		if((a == null) && (b == null)) {
+			retVal = true;
+		}
+		else if((a != null) && (b != null)) {
+			retVal = compare(a.getStart(),b.getStart()) && compare(a.getEnd(),b.getEnd());
+		}
+		return retVal;
+	}
+	
+	public void copyTimeInit(IDuccProcess inventoryProcess, IDuccProcess process) {
+		String methodName = "copyTimeInit";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		DuccId processId = inventoryProcess.getDuccId();
+		DuccId jobId = getJobId(processId);
+		ITimeWindow twInit = inventoryProcess.getTimeWindowInit();
+		if(twInit != null) {
+			if(!compare(twInit,process.getTimeWindowInit())) {
+				process.setTimeWindowInit(twInit);
+				String millis;
+				String ts;
+				millis = process.getTimeWindowInit().getStart();
+				if(millis != null) {
+					ts = TimeStamp.simpleFormat(millis);
+					logger.info(methodName, jobId, processId, messages.fetchLabel("initialization start")+ts);
+				}
+				millis = process.getTimeWindowInit().getEnd();
+				if(millis != null) {
+					ts = TimeStamp.simpleFormat(millis);
+					logger.info(methodName, jobId, processId, messages.fetchLabel("initialization end")+ts);
+				}
+			}
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return;
+	}
+	
+	public void copyTimeRun(IDuccProcess inventoryProcess, IDuccProcess process) {
+		String methodName = "copyTimeRun";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		DuccId processId = inventoryProcess.getDuccId();
+		DuccId jobId = getJobId(processId);
+		ITimeWindow twRun = inventoryProcess.getTimeWindowRun();
+		if(twRun != null) {
+			if(!compare(twRun,process.getTimeWindowRun())) {
+				process.setTimeWindowRun(twRun);
+				String millis;
+				String ts;
+				millis = process.getTimeWindowRun().getStart();
+				if(millis != null) {
+					ts = TimeStamp.simpleFormat(millis);
+					logger.info(methodName, jobId, processId, messages.fetchLabel("run start")+ts);
+				}
+				millis = process.getTimeWindowRun().getEnd();
+				if(millis != null) {
+					ts = TimeStamp.simpleFormat(millis);
+					logger.info(methodName, jobId, processId, messages.fetchLabel("run end")+ts);
+				}
+			}
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return;
+	}
+	
+	private void setResourceStateAndReason(IDuccWorkJob job, IDuccProcess inventoryProcess, IDuccProcess process) {
+		String methodName = "setResourceStateAndReason";
+		logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
+		switch(inventoryProcess.getProcessState()) {
+		case Stopped:
+		case Failed:
+		case FailedInitialization:
+		case InitializationTimeout:
+		case Killed:
+			switch(process.getResourceState()) {
+			case Allocated:
+				process.setResourceState(ResourceState.Deallocated);
+				logger.info(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process state")+inventoryProcess.getProcessState()+" => "+messages.fetchLabel("resource state")+process.getResourceState());
+				String reason = inventoryProcess.getReasonForStoppingProcess();
+				switch(inventoryProcess.getProcessState()) {
+				case Stopped:
+					if(reason != null) {
+						process.setReasonForStoppingProcess(reason);
+					}
+					process.setProcessDeallocationType(ProcessDeallocationType.AutonomousStop);
+					break;
+				case Failed:
+					if(reason != null) {
+						process.setReasonForStoppingProcess(reason);
+					}
+					process.setProcessDeallocationType(ProcessDeallocationType.Failed);
+					break;
+				case Killed:
+					if(reason != null) {
+						process.setReasonForStoppingProcess(reason);
+					}
+					process.setProcessDeallocationType(ProcessDeallocationType.Killed);
+					break;
+				}
+				break;
+			default:
+				logger.debug(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process state")+inventoryProcess.getProcessState()+" => "+messages.fetchLabel("resource state")+process.getResourceState());
+				break;
+			}
+			break;
+		default:
+			logger.debug(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process state")+inventoryProcess.getProcessState()+" => "+messages.fetchLabel("resource state")+process.getResourceState());
+			break;
+		}
+		logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
+	}
+	
+	public void copyInventoryProcessState(IDuccWorkJob job, IDuccProcess inventoryProcess, IDuccProcess process) {
+		String methodName = "copyInventoryProcessState";
+		logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
+		if(!compare(inventoryProcess.getProcessState().toString(),process.getProcessState().toString())) {
+			switch((JobState)job.getStateObject()) {
+			//case Initializing:
+			//	logger.info(methodName, jobId, processId, messages.fetchLabel("process state ignored")+inventoryProcess.getProcessState());
+			//	break;
+			default:
+				process.advanceProcessState(inventoryProcess.getProcessState());
+				if ( inventoryProcess.getProcessJmxUrl() != null && process.getProcessJmxUrl() == null) {
+					process.setProcessJmxUrl(inventoryProcess.getProcessJmxUrl());
+				}
+				if ( inventoryProcess.getGarbageCollectionStats() != null ) {
+					process.setGarbageCollectionStats(inventoryProcess.getGarbageCollectionStats());
+					logger.trace(methodName, job.getDuccId(), process.getDuccId(), "GC Stats Count:"+process.getGarbageCollectionStats().getCollectionCount());
+				}
+				process.setResidentMemory(inventoryProcess.getResidentMemory());
+				logger.trace(methodName, job.getDuccId(), process.getDuccId(), "Resident Memory:"+process.getResidentMemory());
+				process.setCpuTime(inventoryProcess.getCpuTime());
+				logger.trace(methodName, job.getDuccId(), process.getDuccId(), "Cpu Time:"+process.getCpuTime());
+				logger.info(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process state")+process.getProcessState());
+				break;
+			}
+		}
+		logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
+	}
+	
+	private void initStop(IDuccWorkJob job, IDuccProcess process) {
+		String ts = TimeStamp.getCurrentMillis();
+		ITimeWindow twi = process.getTimeWindowInit();
+		if(twi == null) {
+			twi = new TimeWindow();
+			twi.setStart(ts);
+			twi.setEnd(ts);
+			process.setTimeWindowRun(twi);
+		}
+		long i0 = twi.getStartLong();
+		long i1 = twi.getEndLong();
+		if(i0 != i1) {
+			if(i1 < i0) {
+				twi.setEnd(ts);
+			}
+		}
+	}
+	
+	private void runStop(IDuccWorkJob job, IDuccProcess process) {
+		String ts = TimeStamp.getCurrentMillis();
+		ITimeWindow twi = process.getTimeWindowInit();
+		if(twi == null) {
+			twi = new TimeWindow();
+			twi.setStart(ts);
+			twi.setEnd(ts);
+			process.setTimeWindowRun(twi);
+		}
+		long i0 = twi.getStartLong();
+		long i1 = twi.getEndLong();
+		if(i0 != i1) {
+			if(i1 < i0) {
+				twi.setEnd(ts);
+				i1 = twi.getEndLong();
+			}
+		}
+		ITimeWindow twr = process.getTimeWindowRun();
+		if(twr == null) {
+			twr = new TimeWindow();
+			twr.setStart(twi.getEnd());
+			twr.setEnd(twi.getEnd());
+			process.setTimeWindowRun(twr);
+		}
+		long r0 = twr.getStartLong();
+		long r1 = twr.getEndLong();
+		if(r0 != r1) {
+			if(r0 < i1) {
+				twr.setStart(twi.getEnd());
+				r0 = twr.getStartLong();
+			}
+			if(r1 < r0) {
+				twr.setEnd(ts);
+			}
+		}
+	}
+
+	public void updateProcessTime(IDuccWorkJob job, IDuccProcess inventoryProcess, IDuccProcess process) {
+		String methodName = "updateProcessTime";
+		logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
+		switch(inventoryProcess.getProcessState()) {
+		case Starting:              // Process Manager sent request to start the Process
+		case Initializing:			// Process Agent is initializing process	
+			copyTimeInit(inventoryProcess, process);
+			break;
+		case Running:				// Process Agent is processing work items
+			copyTimeInit(inventoryProcess, process);
+			initStop(job, process);
+			copyTimeRun(inventoryProcess, process);
+			break;
+		case Stopped:				// Process Agent reports process stopped
+		case Failed:				// Process Agent reports process failed
+		case FailedInitialization:	// Process Agent reports process failed initialization
+		case InitializationTimeout: // Process Agent reports process initialization timeout
+		case Killed:				// Agent forcefully killed the process
+			copyTimeInit(inventoryProcess, process);
+			copyTimeRun(inventoryProcess, process);
+			runStop(job, process);
+			break;
+		case Undefined:
+			break;
+		}
+		logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
+	}
+	
+	public void setStatus(IDuccProcess inventoryProcess) {
+		String methodName = "setStatus";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		DuccId processId = inventoryProcess.getDuccId();
+		logger.debug(methodName, null, processId, messages.fetchLabel("node")+inventoryProcess.getNodeIdentity().getName()+" "+messages.fetchLabel("PID")+inventoryProcess.getPID());
+		long t0 = System.currentTimeMillis();
+		synchronized(workMap) {
+			if(processToJobMap.containsKey(processId)) {
+				DuccId jobId = getJobId(processId);
+				IDuccWork duccWork = workMap.findDuccWork(jobId);
+				if(duccWork != null) {
+					if(duccWork instanceof IDuccWorkExecutable) {
+						IDuccWorkExecutable duccWorkExecutable = (IDuccWorkExecutable) duccWork;
+						IDuccWorkJob job = null;
+						if(duccWork instanceof IDuccWorkJob) { 
+							job = (IDuccWorkJob)duccWork;
+						}
+						IDuccProcessMap processMap = duccWorkExecutable.getProcessMap();
+						IDuccProcess process = processMap.get(processId);
+						if(process == null) {
+							if(job != null) { 
+								process = job.getDriver().getProcessMap().get(processId);
+							}
+						}
+						if(process != null) {
+							// PID
+							String iPID = inventoryProcess.getPID();
+							String pPID = process.getPID();
+							if(!compare(iPID, pPID)) {
+								process.setPID(iPID);
+								logger.info(methodName, jobId, processId, messages.fetchLabel("pPID")+pPID+" "+messages.fetchLabel("iPID")+iPID);
+							}
+							// Scheduler State
+							setResourceStateAndReason(job, inventoryProcess, process);
+							// Process State
+							copyInventoryProcessState(job, inventoryProcess, process);
+							// Process Init & Run times
+							updateProcessTime(job, inventoryProcess, process);
+							// Process Initialization State
+							switch(inventoryProcess.getProcessState()) {
+							case Running:
+								process.setInitialized();
+								if(job != null) {
+									switch(job.getDuccType()) {
+									case Service:
+										switch(job.getJobState()) {
+										case Initializing:
+											stateJobAccounting.stateChange(job, JobState.Running);
+											break;
+										}
+										break;
+									}
+								}
+							}
+						}
+						else {
+							logger.warn(methodName, jobId, processId, messages.fetch("process not found job's process table"));
+						}
+					}
+					else {
+						logger.warn(methodName, jobId, processId, messages.fetch("not executable"));
+					}
+				}
+				else {
+					logger.warn(methodName, jobId, processId, messages.fetch("job ID not found"));
+				}
+			}
+			else {
+				logger.warn(methodName, null, processId, messages.fetch("ID not found in process map"));
+			}
+		}
+		long t1 = System.currentTimeMillis();
+		long elapsed = t1 - t0;
+		if(elapsed > Constants.SYNC_LIMIT) {
+			logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return;
+	}
+	
+	public boolean setStatus(DriverStatusReport jdStatusReport, DuccWorkJob duccWorkJob) {
+		String methodName = "setStatus";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		boolean retVal = false;
+		String jdTotalWorkItems = ""+jdStatusReport.getWorkItemsTotal();
+		if(!compare(jdTotalWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsTotal())) {
+			duccWorkJob.getSchedulingInfo().setWorkItemsTotal(jdTotalWorkItems);
+		}
+		String jdCompletedWorkItems = ""+jdStatusReport.getWorkItemsProcessingCompleted();
+		if(!compare(jdCompletedWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsCompleted())) {
+			duccWorkJob.getSchedulingInfo().setWorkItemsCompleted(jdCompletedWorkItems);
+		}
+		String jdDispatchedWorkItems = ""+jdStatusReport.getWorkItemsDispatched();
+		if(!compare(jdDispatchedWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsDispatched())) {
+			duccWorkJob.getSchedulingInfo().setWorkItemsDispatched(jdDispatchedWorkItems);
+		}
+		String jdErrorWorkItems = ""+jdStatusReport.getWorkItemsProcessingError();
+		if(!compare(jdErrorWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsError())) {
+			duccWorkJob.getSchedulingInfo().setWorkItemsError(jdErrorWorkItems);
+		}
+		String jdRetryWorkItems = ""+jdStatusReport.getWorkItemsRetry();
+		if(!compare(jdRetryWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsRetry())) {
+			duccWorkJob.getSchedulingInfo().setWorkItemsRetry(jdRetryWorkItems);
+		}
+		String jdPreemptWorkItems = ""+jdStatusReport.getWorkItemsPreempted();
+		if(!compare(jdPreemptWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsPreempt())) {
+			duccWorkJob.getSchedulingInfo().setWorkItemsPreempt(jdPreemptWorkItems);
+		}
+		
+		duccWorkJob.getSchedulingInfo().setCasQueuedMap(jdStatusReport.getCasQueuedMap());
+		duccWorkJob.getSchedulingInfo().setLimboMap(jdStatusReport.getLimboMap());
+		
+		duccWorkJob.getSchedulingInfo().setMostRecentWorkItemStart(jdStatusReport.getMostRecentStart());
+		
+		duccWorkJob.getSchedulingInfo().setPerWorkItemStatistics(jdStatusReport.getPerWorkItemStatistics());
+
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return retVal;
+	}
+	
+	private void deallocate(IDuccWorkJob job, ProcessDeallocationType processDeallocationType, ProcessState processState, IDuccProcessMap processMap, String type) {
+		String methodName = "deallocate";
+		logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
+		if(processMap != null) {
+			Collection<IDuccProcess> processCollection = processMap.values();
+			Iterator<IDuccProcess> iterator = processCollection.iterator();
+			while(iterator.hasNext()) {
+				IDuccProcess process = iterator.next();
+				switch(process.getResourceState()) {
+				case Allocated:
+					process.setResourceState(ResourceState.Deallocated);
+					process.setProcessDeallocationType(processDeallocationType);
+					logger.info(methodName, job.getDuccId(), process.getDuccId(), type);
+					if(processState != null) {
+						logger.debug(methodName, job.getDuccId(), process.getProcessState()+" -> "+processState);
+						process.advanceProcessState(processState);
+					}
+					break;
+				case Deallocated:	
+					if(processState != null) {
+						logger.debug(methodName, job.getDuccId(), process.getProcessState()+" -> "+processState);
+						process.advanceProcessState(processState);
+					}
+					break;
+				}
+			}
+		}
+		logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
+		return;
+	}
+	
+	private void deallocate(IDuccWorkJob job, ProcessDeallocationType processDeallocationType, ProcessState processState) {
+		String methodName = "deallocate";
+		logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
+		deallocate(job,processDeallocationType,processState,job.getProcessMap(),"worker");
+		switch(job.getDuccType()) {
+		case Job:
+			deallocate(job,processDeallocationType,processState,job.getDriver().getProcessMap(),"driver");
+			break;
+		case Service:
+			break;
+		}
+		logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
+		return;
+	}
+	
+	public void deallocate(IDuccWorkJob job, ProcessDeallocationType processDeallocationType) {
+		deallocate(job,processDeallocationType,null);
+	}
+	
+	public void deallocateAndStop(IDuccWorkJob job, ProcessDeallocationType processDeallocationType) {
+		deallocate(job,processDeallocationType,ProcessState.Stopped);
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ReservationFactory.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ReservationFactory.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ReservationFactory.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ReservationFactory.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.TimeStamp;
+import org.apache.uima.ducc.common.utils.id.IDuccIdFactory;
+import org.apache.uima.ducc.orchestrator.utilities.MemorySpecification;
+import org.apache.uima.ducc.transport.event.cli.ReservationRequestProperties;
+import org.apache.uima.ducc.transport.event.cli.ReservationSpecificationProperties;
+import org.apache.uima.ducc.transport.event.common.DuccSchedulingInfo;
+import org.apache.uima.ducc.transport.event.common.DuccStandardInfo;
+import org.apache.uima.ducc.transport.event.common.DuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
+
+
+public class ReservationFactory {
+	private static ReservationFactory reservationFactory = new ReservationFactory();
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(ReservationFactory.class.getName());
+	
+	public static ReservationFactory getInstance() {
+		return reservationFactory;
+	}
+	
+	private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+	private Messages messages = orchestratorCommonArea.getSystemMessages();
+	private IDuccIdFactory reservationDuccIdFactory = orchestratorCommonArea.getReservationDuccIdFactory();
+	
+	public DuccWorkReservation create(CommonConfiguration common, ReservationRequestProperties reservationRequestProperties) {
+		String methodName = "create";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		DuccWorkReservation duccWorkReservation = new DuccWorkReservation();
+		// id, type
+		duccWorkReservation.setDuccId(reservationDuccIdFactory.next());
+		duccWorkReservation.setDuccType(DuccType.Reservation);
+		// standard info
+		DuccStandardInfo standardInfo = new DuccStandardInfo();
+		duccWorkReservation.setStandardInfo(standardInfo);
+		standardInfo.setUser(reservationRequestProperties.getProperty(ReservationSpecificationProperties.key_user));
+		standardInfo.setDateOfSubmission(TimeStamp.getCurrentMillis());
+		standardInfo.setDateOfCompletion(null);
+		standardInfo.setDescription(reservationRequestProperties.getProperty(ReservationSpecificationProperties.key_description));
+		// scheduling info
+		DuccSchedulingInfo schedulingInfo = new DuccSchedulingInfo();
+		duccWorkReservation.setSchedulingInfo(schedulingInfo);
+		schedulingInfo.setSchedulingClass(reservationRequestProperties.getProperty(ReservationSpecificationProperties.key_scheduling_class));
+		String memorySize = reservationRequestProperties.getProperty(ReservationSpecificationProperties.key_instance_memory_size);
+		MemorySpecification memorySpecification = new MemorySpecification(memorySize);
+		schedulingInfo.setShareMemorySize(memorySpecification.getSize());
+		schedulingInfo.setShareMemoryUnits(memorySpecification.getMemoryUnits());
+		schedulingInfo.setInstancesCount(reservationRequestProperties.getProperty(ReservationSpecificationProperties.key_number_of_instances));
+		logger.info(methodName, duccWorkReservation.getDuccId(), messages.fetchLabel("user")+standardInfo.getUser());
+		logger.info(methodName, duccWorkReservation.getDuccId(), messages.fetchLabel("description")+standardInfo.getDescription());
+		logger.info(methodName, duccWorkReservation.getDuccId(), messages.fetchLabel("class")+schedulingInfo.getSchedulingClass());
+		logger.info(methodName, duccWorkReservation.getDuccId(), messages.fetchLabel("priority")+schedulingInfo.getSchedulingPriority());
+		logger.info(methodName, duccWorkReservation.getDuccId(), messages.fetchLabel("memory")+schedulingInfo.getShareMemorySize()+schedulingInfo.getShareMemoryUnits());
+		logger.info(methodName, duccWorkReservation.getDuccId(), messages.fetchLabel("instances")+schedulingInfo.getInstancesCount());
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return duccWorkReservation;
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ReservationFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateJobAccounting.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateJobAccounting.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateJobAccounting.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateJobAccounting.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.IRationale;
+import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.JobCompletionType;
+import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
+
+
+public class StateJobAccounting {
+
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(StateJobAccounting.class.getName());
+	
+	private static StateJobAccounting instance = new StateJobAccounting();
+	
+	public static StateJobAccounting getInstance() {
+		return instance;
+	}
+	
+	public boolean stateChange(IDuccWorkJob job, JobState state) {
+		String methodName = "stateChange";
+		boolean retVal = false;
+		JobState prev = job.getJobState();
+		JobState next = state;
+		switch(prev) {
+		case Completing:
+			retVal = stateChangeFromCompleting(prev, next);
+			break;
+		case Completed:
+			switch(next) {
+			case Completing:
+				next = prev;
+				break;
+			default:
+				break;
+			}
+			retVal = stateChangeFromCompleted(prev, next);
+			break;
+		case Initializing:
+			retVal = stateChangeFromInitializing(prev, next);
+			break;
+		case Received:
+			retVal = stateChangeFromReceived(prev, next);
+			break;
+		case Running:
+			retVal = stateChangeFromRunning(prev, next);
+			break;
+		case Undefined:
+			retVal = stateChangeFromUndefined(prev, next);
+			break;
+		case WaitingForDriver:
+			retVal = stateChangeFromWaitingForDriver(prev, next);
+			break;
+		case WaitingForResources:
+			retVal = stateChangeFromWaitingForResources(prev, next);
+			break;	
+		case WaitingForServices:
+			retVal = stateChangeFromWaitingForServices(prev, next);
+			break;
+		}
+		if(retVal) {
+			job.setJobState(state);
+			logger.info(methodName, job.getDuccId(),"current["+next+"] previous["+prev+"]");
+		}
+		else {
+			try {
+				throw new RuntimeException();
+			} 
+			catch(Exception e) {
+				logger.error(methodName, job.getDuccId(),"current["+prev+"] requested["+next+"]"+" ignored", e);
+			}
+		}
+		return retVal;
+	}
+	
+	private boolean stateChangeFromCompleting(JobState prev, JobState next) {
+		boolean retVal = false;
+		switch(next) {
+		case Completing:							break;
+		case Completed:				retVal = true;	break;
+		case Initializing:							break;
+		case Received:								break;
+		case Running:								break;
+		case Undefined:								break;
+		case WaitingForDriver:						break;
+		case WaitingForResources:					break;
+		case WaitingForServices:					break;
+		}
+		return retVal;
+	}
+	
+	private boolean stateChangeFromCompleted(JobState prev, JobState next) {
+		boolean retVal = false;
+		switch(next) {
+		case Completing:							break;
+		case Completed:								break;
+		case Initializing:							break;
+		case Received:								break;
+		case Running:								break;
+		case Undefined:								break;
+		case WaitingForDriver:						break;
+		case WaitingForResources:					break;
+		case WaitingForServices:					break;
+		}
+		return retVal;
+	}
+	
+	private boolean stateChangeFromInitializing(JobState prev, JobState next) {
+		boolean retVal = false;
+		switch(next) {
+		case Completing:			retVal = true;	break;
+		case Completed:				retVal = true;	break;
+		case Initializing:							break;
+		case Received:								break;
+		case Running:				retVal = true;	break;
+		case Undefined:								break;
+		case WaitingForDriver:						break;
+		case WaitingForResources:					break;
+		case WaitingForServices:					break;
+		}
+		return retVal;
+	}
+	
+	private boolean stateChangeFromReceived(JobState prev, JobState next) {
+		boolean retVal = false;
+		switch(next) {
+		case Completing:			retVal = true;	break;
+		case Completed:				retVal = true;	break;
+		case Initializing:							break;
+		case Received:								break;
+		case Running:								break;
+		case Undefined:								break;
+		case WaitingForDriver:		retVal = true;	break;
+		case WaitingForResources:					break;
+		case WaitingForServices:	retVal = true;	break;
+		}
+		return retVal;
+	}
+	
+	private boolean stateChangeFromRunning(JobState prev, JobState next) {
+		boolean retVal = false;
+		switch(next) {
+		case Completing:			retVal = true;	break;
+		case Completed:				retVal = true;	break;
+		case Initializing:							break;
+		case Received:								break;
+		case Running:								break;
+		case Undefined:								break;
+		case WaitingForDriver:						break;
+		case WaitingForResources:					break;
+		case WaitingForServices:					break;
+		}
+		return retVal;
+	}
+	
+	private boolean stateChangeFromUndefined(JobState prev, JobState next) {
+		boolean retVal = false;
+		switch(next) {
+		case Completing:							break;
+		case Completed:								break;
+		case Initializing:							break;
+		case Received:				retVal = true;	break;
+		case Running:								break;
+		case Undefined:								break;
+		case WaitingForDriver:						break;
+		case WaitingForResources:					break;
+		case WaitingForServices:					break;
+		}
+		return retVal;
+	}
+	
+	private boolean stateChangeFromWaitingForDriver(JobState prev, JobState next) {
+		boolean retVal = false;
+		switch(next) {
+		case Completing:			retVal = true;	break;
+		case Completed:				retVal = true;	break;
+		case Initializing:							break;
+		case Received:								break;
+		case Running:								break;
+		case Undefined:								break;
+		case WaitingForDriver:						break;
+		case WaitingForResources:					break;
+		case WaitingForServices:	retVal = true;	break;
+		}
+		return retVal;
+	}
+	
+	private boolean stateChangeFromWaitingForResources(JobState prev, JobState next) {
+		boolean retVal = false;
+		switch(next) {
+		case Completing:			retVal = true;	break;
+		case Completed:				retVal = true;	break;
+		case Initializing:			retVal = true;	break;
+		case Received:								break;
+		case Running:								break;
+		case Undefined:								break;
+		case WaitingForDriver:						break;
+		case WaitingForResources:					break;
+		case WaitingForServices:					break;
+		}
+		return retVal;
+	}
+	
+	private boolean stateChangeFromWaitingForServices(JobState prev, JobState next) {
+		boolean retVal = false;
+		switch(next) {
+		case Completing:			retVal = true;	break;
+		case Completed:				retVal = true;	break;
+		case Initializing:							break;
+		case Received:								break;
+		case Running:								break;
+		case Undefined:								break;
+		case WaitingForDriver:						break;
+		case WaitingForResources:	retVal = true;	break;
+		case WaitingForServices:					break;
+		}
+		return retVal;
+	}
+
+	public boolean complete(IDuccWorkJob job, JobCompletionType completionType, IRationale completionRationale) {
+		String methodName = "complete";
+		boolean retVal = false;
+		switch(job.getCompletionType()) {
+		case Undefined:
+			retVal = true;
+			break;
+		}
+		if(retVal) {
+			job.setCompletion(completionType,completionRationale);
+			logger.info(methodName, job.getDuccId(), completionType+" "+completionRationale);
+		}
+		else {
+			logger.info(methodName, job.getDuccId(), completionType+" "+"ignored");
+		}
+		return retVal;
+	}
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateJobAccounting.java
------------------------------------------------------------------------------
    svn:eol-style = native