You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:37:56 UTC
svn commit: r1427956 [2/4] - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator: main/ main/java/
main/java/org/ main/java/org/apache/ main/java/org/apache/uima/
main/java/org/apache/uima/ducc/
main/java/org/apache/uima/ducc/orchestrator/ main/jav...
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorComponent.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorComponent.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorComponent.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,874 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+
+import org.apache.camel.CamelContext;
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties;
+import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties.DaemonName;
+import org.apache.uima.ducc.common.component.AbstractDuccComponent;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.crypto.Crypto;
+import org.apache.uima.ducc.common.crypto.CryptoException;
+import org.apache.uima.ducc.common.crypto.Crypto.AccessType;
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.main.DuccService;
+import org.apache.uima.ducc.common.system.SystemState;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
+import org.apache.uima.ducc.common.utils.LinuxUtils;
+import org.apache.uima.ducc.common.utils.TimeStamp;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.orchestrator.OrchestratorConstants.StartType;
+import org.apache.uima.ducc.orchestrator.maintenance.MaintenanceThread;
+import org.apache.uima.ducc.orchestrator.maintenance.NodeAccounting;
+import org.apache.uima.ducc.transport.event.CancelJobDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelServiceDuccEvent;
+import org.apache.uima.ducc.transport.event.JdStateDuccEvent;
+import org.apache.uima.ducc.transport.event.NodeInventoryUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.OrchestratorAbbreviatedStateDuccEvent;
+import org.apache.uima.ducc.transport.event.OrchestratorStateDuccEvent;
+import org.apache.uima.ducc.transport.event.RmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.SmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitJobDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitServiceDuccEvent;
+import org.apache.uima.ducc.transport.event.cli.JobReplyProperties;
+import org.apache.uima.ducc.transport.event.cli.JobRequestProperties;
+import org.apache.uima.ducc.transport.event.cli.JobSpecificationProperties;
+import org.apache.uima.ducc.transport.event.cli.ReservationReplyProperties;
+import org.apache.uima.ducc.transport.event.cli.ReservationRequestProperties;
+import org.apache.uima.ducc.transport.event.cli.SpecificationProperties;
+import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+import org.apache.uima.ducc.transport.event.common.DuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
+import org.apache.uima.ducc.transport.event.common.IDuccWork;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.IRationale;
+import org.apache.uima.ducc.transport.event.common.Rationale;
+import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.JobCompletionType;
+import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.ReservationCompletionType;
+import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
+import org.apache.uima.ducc.transport.event.common.IDuccState.ReservationState;
+import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ProcessDeallocationType;
+import org.apache.uima.ducc.transport.event.jd.DriverStatusReport;
+import org.apache.uima.ducc.transport.event.rm.IRmJobState;
+import org.apache.uima.ducc.transport.event.sm.ServiceMap;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+@Configuration
+@Import({CommonConfiguration.class})
+public class OrchestratorComponent extends AbstractDuccComponent
+implements Orchestrator {
+ // Springframework magic to inject instance of {@link CommonConfiguration}
+ @Autowired CommonConfiguration common;
+ private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(OrchestratorComponent.class.getName());
+
+ private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+ private Messages messages = orchestratorCommonArea.getSystemMessages();
+ private DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
+ private StateManager stateManager = StateManager.getInstance();
+ //private HealthMonitor healthMonitor = HealthMonitor.getInstance();
+ //private MqReaper mqReaper = MqReaper.getInstance();
+ private JobFactory jobFactory = JobFactory.getInstance();
+ private ReservationFactory reservationFactory = ReservationFactory.getInstance();
+ private CommonConfiguration commonConfiguration = orchestratorCommonArea.getCommonConfiguration();
+ private JobDriverHostManager hostManager = orchestratorCommonArea.getHostManager();
+ private StateJobAccounting stateJobAccounting = StateJobAccounting.getInstance();
+
+ public OrchestratorComponent(CamelContext context) {
+ super("Orchestrator", context);
+ }
+
+ private void force(IDuccWorkJob job, IRationale rationale){
+ String methodName = "force";
+ if(!job.isCompleted()) {
+ stateJobAccounting.stateChange(job, JobState.Completed);
+ job.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+ stateJobAccounting.complete(job, JobCompletionType.CanceledBySystem, rationale);
+ OrchestratorCommonArea.getInstance().getProcessAccounting().deallocateAndStop(job,ProcessDeallocationType.JobCanceled);
+ logger.info(methodName, job.getDuccId(),JobCompletionType.CanceledBySystem);
+ }
+ }
+
+ /*
+ private void cancel(IDuccWorkJob job) {
+ String methodName = "cancel";
+ if(!job.isFinished()) {
+ stateJobAccounting.stateChange(job, JobState.Completing);
+ job.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+ stateJobAccounting.complete(job, JobCompletionType.CanceledBySystem);
+ OrchestratorCommonArea.getInstance().getProcessAccounting().deallocateAndStop(job,ProcessDeallocationType.JobCanceled);
+ logger.info(methodName, job.getDuccId(),JobCompletionType.CanceledBySystem);
+ }
+ }
+ */
+
+ private void cancel(IDuccWorkReservation reservation) {
+ String methodName = "cancel";
+ reservation.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+ reservation.stateChange(ReservationState.Completed);
+ reservation.complete(ReservationCompletionType.CanceledBySystem);
+ logger.info(methodName, reservation.getDuccId(), ReservationCompletionType.CanceledBySystem);
+ }
+
+ private StartType getStartTypeProperty()
+ {
+ String methodName = "getStartTypeProperty";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ StartType startType = StartType.warm;
+ String property = commonConfiguration.orchestratorStartType;
+ if(property != null) {
+ String startTypeProperty = property.trim().toLowerCase();
+ if(startTypeProperty.equals("cold")) {
+ startType = StartType.cold;
+ }
+ else if(startTypeProperty.equals("warm")) {
+ startType = StartType.warm;
+ }
+ else if(startTypeProperty.equals("hot")) {
+ startType = StartType.hot;
+ }
+ else {
+ logger.warn(methodName, null, "ducc.orchestrator.start.type value in ducc.properties not recognized: "+property);
+ }
+ }
+ else {
+ logger.warn(methodName, null, "ducc.orchestrator.start.type not found in ducc.properties");
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return startType;
+ }
+
+ private void resolveSignatureRequired() throws CryptoException
+ {
+ String methodName = "resolveSignatureRequired";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ String property = commonConfiguration.signatureRequired;
+ if(property != null) {
+ String signatureRequiredProperty = property.trim().toLowerCase();
+ if(signatureRequiredProperty.equals("on")) {
+ orchestratorCommonArea.setSignatureRequired();
+ logger.info(methodName, null, "ducc.signature.required: "+property);
+ }
+ else if(signatureRequiredProperty.equals("off")) {
+ orchestratorCommonArea.resetSignatureRequired();
+ logger.info(methodName, null, "ducc.signature.required: "+property);
+ }
+ else {
+ logger.warn(methodName, null, "ducc.signature.required value in ducc.properties not recognized: "+property);
+ }
+ }
+ else {
+ logger.warn(methodName, null, "ducc.signature.required not found in ducc.properties");
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return;
+ }
+
+ private StartType getStartTypeOverride(String[] args)
+ {
+ String methodName = "getStartTypeOverride";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ StartType startType = null;
+ // override start type if specified on command line
+ if(args != null) {
+ for( String arg : args) {
+ logger.debug(methodName, null, "arg: "+arg);
+ String flag = arg.trim();
+ while(flag.startsWith("-")) {
+ flag = flag.replaceFirst("-", "");
+ }
+ if(flag.equals(StartType.cold.toString())) {
+ startType = StartType.cold;
+ }
+ else if(flag.equals(StartType.warm.toString())) {
+ startType = StartType.warm;
+ }
+ else if(flag.equals(StartType.hot.toString())) {
+ startType = StartType.hot;
+ }
+ else {
+ logger.warn(methodName, null, "unrecognized arg: "+arg);
+ }
+ }
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return startType;
+ }
+
+ private StartType getStartType(String[] args)
+ {
+ String methodName = "getStartType";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ StartType startType = OrchestratorConstants.startTypeDefault;
+ StartType property = getStartTypeProperty();
+ StartType override = getStartTypeOverride(args) ;
+ StringBuffer sb = new StringBuffer();
+ sb.append("start type: ");
+ if(override != null) {
+ startType = override;
+ sb.append(startType);
+ sb.append(", "+"override");
+ }
+ else if(property != null) {
+ startType = property;
+ sb.append(startType);
+ sb.append(", "+"property");
+ }
+ else {
+ sb.append(startType);
+ sb.append(startType);
+ sb.append(", "+"default");
+ }
+ logger.info(methodName, null, sb);
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return startType;
+ }
+
+
+ public void start(DuccService service, String[] args) throws Exception {
+ String methodName = "start";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ try {
+ StartType startType = getStartType(args);
+ logger.info(methodName, null, "##### "+startType+" #####");
+ boolean saveState = false;
+ long t0 = System.currentTimeMillis();
+ synchronized(workMap) {
+ Iterator<IDuccWork> iterator = workMap.values().iterator();
+ while(iterator.hasNext()) {
+ IDuccWork duccWork = iterator.next();
+ switch(duccWork.getDuccType()) {
+ case Job:
+ case Service:
+ IDuccWorkJob job = (IDuccWorkJob) duccWork;
+ switch(startType) {
+ case cold:
+ force(job, new Rationale("system cold start"));
+ saveState = true;
+ break;
+ case warm:
+ force(job, new Rationale("system warm start"));
+ saveState = true;
+ break;
+ case hot:
+ break;
+ }
+ break;
+ case Reservation:
+ IDuccWorkReservation reservation = (IDuccWorkReservation) duccWork;
+ switch(startType) {
+ case cold:
+ cancel(reservation);
+ saveState = true;
+ break;
+ case warm:
+ if(commonConfiguration.jdHostClass.equals(reservation.getSchedulingInfo().getSchedulingClass())) {
+ cancel(reservation);
+ saveState = true;
+ }
+ break;
+ case hot:
+ if(commonConfiguration.jdHostClass.equals(reservation.getSchedulingInfo().getSchedulingClass())) {
+ IDuccReservationMap map = reservation.getReservationMap();
+ Iterator<Entry<DuccId, IDuccReservation>> entries = map.entrySet().iterator();
+ while(entries.hasNext()) {
+ Entry<DuccId, IDuccReservation> entry = entries.next();
+ NodeIdentity node = entry.getValue().getNodeIdentity();
+ hostManager.addNode(node);
+ }
+ }
+ break;
+ }
+ break;
+ }
+ }
+ }
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ if(saveState) {
+ OrchestratorCheckpoint.getInstance().saveState();
+ }
+ switch(startType) {
+ case cold:
+ case warm:
+ hostManager = JobDriverHostManager.getInstance();
+ hostManager.init();
+ break;
+ case hot:
+ hostManager = JobDriverHostManager.getInstance();
+ hostManager.conditional();
+ break;
+ }
+ resolveSignatureRequired();
+ MaintenanceThread.getInstance().start();
+ }
+ catch(Throwable t) {
+ logger.error(methodName, null, t);
+ }
+ super.start(service, args);
+ DuccDaemonRuntimeProperties.getInstance().boot(DaemonName.Orchestrator,getProcessJmxUrl());
+ logger.trace(methodName, null, messages.fetch("exit"));
+ }
+
+ /**
+ * Job Driver State Reconciliation
+ */
+ public void reconcileJdState(JdStateDuccEvent duccEvent) {
+ String methodName = "reconcileJdState";
+ DriverStatusReport dsr = duccEvent.getState();
+ DuccId duccId = null;
+ if(dsr != null) {
+ duccId = dsr.getDuccId();
+ }
+ logger.trace(methodName, null, messages.fetch("enter"));
+ if(dsr != null) {
+ logger.info(methodName, duccId, dsr.getLogReport());
+ stateManager.reconcileState(dsr);
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ }
+ /**
+ * Resources Manager State Reconciliation
+ */
+ public void reconcileRmState(RmStateDuccEvent duccEvent) {
+ String methodName = "reconcileRmState";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ Map<DuccId, IRmJobState> resourceMap = duccEvent.getJobState();
+ try {
+ stateManager.reconcileState(resourceMap);
+ }
+ catch(Exception e) {
+ logger.error(methodName, null, e);
+ }
+
+ logger.trace(methodName, null, messages.fetch("exit"));
+ }
+ /**
+ * Services Manager State Reconciliation
+ */
+ public void reconcileSmState(SmStateDuccEvent duccEvent) {
+ String methodName = "reconcileSmState";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ ServiceMap serviceMap = duccEvent.getServiceMap();
+ stateManager.reconcileState(serviceMap);
+ logger.trace(methodName, null, messages.fetch("exit"));
+ }
+ /**
+ * Node Inventory State Reconciliation
+ */
+
+ public void reconcileNodeInventory(NodeInventoryUpdateDuccEvent duccEvent) {
+ String methodName = "reconcileNodeInventory";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ HashMap<DuccId, IDuccProcess> processMap = duccEvent.getProcesses();
+ stateManager.reconcileState(processMap);
+ NodeAccounting.getInstance().heartbeat(processMap);
+ logger.trace(methodName, null, messages.fetch("exit"));
+ }
+ /**
+ * Publish Orchestrator State
+ */
+
+ public OrchestratorStateDuccEvent getState() {
+ String methodName = "getState";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ OrchestratorStateDuccEvent orchestratorStateDuccEvent = new OrchestratorStateDuccEvent();
+ try {
+ long t0 = System.currentTimeMillis();
+ DuccWorkMap workMapCopy = workMap.deepCopy();
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ int activeJobs = workMapCopy.getJobCount();
+ int activeReservations = workMapCopy.getReservationCount();
+ int activeServices = workMapCopy.getServiceCount();
+ logger.debug(methodName, null, messages.fetch("publishing state")+" "+
+ messages.fetchLabel("active job count")+activeJobs
+ +" "+
+ messages.fetchLabel("active reservation count")+activeReservations
+ +" "+
+ messages.fetchLabel("active service count")+activeServices
+ );
+ orchestratorStateDuccEvent.setWorkMap(workMapCopy);
+ //stateManager.prune(workMapCopy);
+ //healthMonitor.cancelNonViableJobs();
+ //mqReaper.removeUnusedJdQueues(workMapCopy);
+ }
+ catch(Throwable t) {
+ logger.error(methodName, null, t);
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return orchestratorStateDuccEvent;
+ }
+ /**
+ * Publish Orchestrator Abbreviated State
+ */
+
+ public OrchestratorAbbreviatedStateDuccEvent getAbbreviatedState() {
+ String methodName = "getAbbreviatedState";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ OrchestratorAbbreviatedStateDuccEvent orchestratorAbbreviatedStateDuccEvent = new OrchestratorAbbreviatedStateDuccEvent();
+ try {
+ long t0 = System.currentTimeMillis();
+ DuccWorkMap workMapCopy = workMap.deepCopy();
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ int activeJobs = workMapCopy.getJobCount();
+ int activeReservations = workMapCopy.getReservationCount();
+ int activeServices = workMapCopy.getServiceCount();
+ logger.debug(methodName, null, messages.fetch("publishing state")+" "+
+ messages.fetchLabel("active job count")+activeJobs
+ +" "+
+ messages.fetchLabel("active reservation count")+activeReservations
+ +" "+
+ messages.fetchLabel("active service count")+activeServices
+ );
+ long t2 = System.currentTimeMillis();
+ orchestratorAbbreviatedStateDuccEvent.setWorkMap(workMapCopy);
+ long t3 = System.currentTimeMillis();
+ long elapsed2 = t3 - t2;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed2);
+ }
+ //stateManager.prune(workMapCopy);
+ //healthMonitor.cancelNonViableJobs();
+ //mqReaper.removeUnusedJdQueues(workMapCopy);
+ }
+ catch(Throwable t) {
+ logger.error(methodName, null, t);
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return orchestratorAbbreviatedStateDuccEvent;
+ }
+ @SuppressWarnings("unchecked")
+ private void submitError(Properties properties, String error_message) {
+ String key = SpecificationProperties.key_submit_errors;
+ ArrayList<String> value_submit_errors = (ArrayList<String>) properties.get(key);
+ if(value_submit_errors == null) {
+ value_submit_errors = new ArrayList<String>();
+ properties.put(key, value_submit_errors);
+ }
+ value_submit_errors.add(error_message);
+ }
+ private boolean isSignatureInvalid(Properties properties) {
+ String methodName = "isSignatureInvalid";
+ boolean retVal = false;
+ try {
+ if(orchestratorCommonArea.isSignatureRequired()) {
+ String user = properties.getProperty(SpecificationProperties.key_user);
+ String userHome = LinuxUtils.getUserHome(user);
+ String runmode = DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_runmode);
+ if(runmode != null) {
+ if(runmode.equals("Test")) {
+ userHome = System.getProperty("user.home");
+ }
+ }
+ Crypto crypto = new Crypto(userHome,AccessType.READER);
+ String signature = (String)crypto.decrypt((byte[])properties.get(SpecificationProperties.key_signature));
+ if(!user.equals(signature)) {
+ logger.warn(methodName, null, "user:"+user+" signature:"+signature+" valid:n");
+ }
+ else {
+ logger.debug(methodName, null, "user:"+user+" signature:"+signature+" valid:y");
+ }
+ retVal = !user.equals(signature);
+ }
+ }
+ catch(Throwable t) {
+ logger.error(methodName, null, t);
+ }
+ return retVal;
+ }
+ /**
+ * Handle Job Submit
+ */
+
+ public void startJob(SubmitJobDuccEvent duccEvent) {
+ String methodName = "startJob";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ try {
+ JobRequestProperties properties = (JobRequestProperties) duccEvent.getProperties();
+ int jdNodes = hostManager.nodes();
+ if(isSignatureInvalid(properties)) {
+ String error_message = messages.fetch(" type=authentication error, text=signature not valid.");
+ logger.error(methodName, null, error_message);
+ submitError(properties, error_message);
+ }
+ else if(jdNodes <= 0) {
+ String error_message = messages.fetch(" type=system error, text=job driver node unavailable.");
+ logger.error(methodName, null, error_message);
+ submitError(properties, error_message);
+ }
+ else if(!SystemState.getInstance().isAcceptJobs()) {
+ String error_message = messages.fetch(" type=system error, text=system is not accepting new work at this time.");
+ logger.error(methodName, null, error_message);
+ submitError(properties, error_message);
+ }
+ else {
+ if(Validate.request(duccEvent)) {
+ DuccWorkJob duccWorkJob = jobFactory.create(common,properties);
+ long t0 = System.currentTimeMillis();
+ workMap.addDuccWork(duccWorkJob);
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ // state: Received
+ stateJobAccounting.stateChange(duccWorkJob, JobState.Received);
+ OrchestratorCheckpoint.getInstance().saveState();
+ // state: WaitingForDriver
+ stateJobAccounting.stateChange(duccWorkJob, JobState.WaitingForDriver);
+ OrchestratorCheckpoint.getInstance().saveState();
+ // prepare for reply to submitter
+ properties.put(JobRequestProperties.key_id, duccWorkJob.getId());
+ duccEvent.setProperties(properties);
+ }
+ else {
+ logger.info(methodName, null, messages.fetch("TODO")+" prepare error reply");
+ //TODO
+ }
+ }
+ }
+ catch(Throwable t) {
+ logger.error(methodName, null, messages.fetch("TODO")+" prepare error reply",t);
+ //TODO
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return;
+ }
+ /**
+ * Handle Job Cancel
+ */
+
+ public void stopJob(CancelJobDuccEvent duccEvent) {
+ String methodName = "stopJob";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ Properties properties = duccEvent.getProperties();
+ if(isSignatureInvalid(properties)) {
+ String error_message = messages.fetch(" type=authentication error, text=signature not valid.");
+ logger.error(methodName, null, error_message);
+ submitError(properties, error_message);
+ }
+ else if(Validate.request(duccEvent)) {
+ // update state
+ String jobId = properties.getProperty(JobRequestProperties.key_id);
+ long t0 = System.currentTimeMillis();
+ DuccWorkJob duccWorkJob = (DuccWorkJob) workMap.findDuccWork(DuccType.Job,jobId);
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ if(duccWorkJob != null) {
+ String userid = properties.getProperty(JobSpecificationProperties.key_user);
+ IRationale rationale = new Rationale("job canceled by userid "+userid);
+ stateManager.jobTerminate(duccWorkJob, JobCompletionType.CanceledByUser, rationale, ProcessDeallocationType.JobCanceled);
+ OrchestratorCheckpoint.getInstance().saveState();
+ // prepare for reply to canceler
+ properties.put(JobReplyProperties.key_message, JobReplyProperties.msg_canceled);
+ duccEvent.setProperties(properties);
+ logger.info(methodName, duccWorkJob.getDuccId(), messages.fetchLabel("job state")+duccWorkJob.getJobState());
+ }
+
+ else {
+ // prepare undefined reply
+ properties.put(JobReplyProperties.key_message, JobReplyProperties.msg_not_found);
+ duccEvent.setProperties(properties);
+ logger.info(methodName, null, jobId+" : "+messages.fetch("job not found"));
+ }
+ }
+ else {
+ logger.info(methodName, null, messages.fetch("TODO")+" prepare error reply");
+ //TODO
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return;
+ }
+
+
+ public void startReservation(SubmitReservationDuccEvent duccEvent) {
+ String methodName = "startReservation";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ try {
+ Properties properties = duccEvent.getProperties();
+ if(isSignatureInvalid(properties)) {
+ String error_message = messages.fetch(" type=authentication error, text=signature not valid.");
+ logger.error(methodName, null, error_message);
+ submitError(properties, error_message);
+ }
+ else if(Validate.request(duccEvent)) {
+ DuccWorkReservation duccWorkReservation = reservationFactory.create(common,(ReservationRequestProperties)properties);
+ long t0 = System.currentTimeMillis();
+ workMap.addDuccWork(duccWorkReservation);
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ // state: Received
+ duccWorkReservation.stateChange(ReservationState.Received);
+ OrchestratorCheckpoint.getInstance().saveState();
+ // state: WaitingForResources
+ duccWorkReservation.stateChange(ReservationState.WaitingForResources);
+ OrchestratorCheckpoint.getInstance().saveState();
+ int counter = 0;
+ while(duccWorkReservation.isPending()) {
+ counter++;
+ if(counter > 5) {
+ counter = 0;
+ logger.info(methodName, duccWorkReservation.getDuccId(), "waiting for allocation...");
+ }
+ Thread.sleep(1000);
+ }
+ // prepare for reply to submitter
+ properties.put(ReservationRequestProperties.key_id, duccWorkReservation.getId());
+ // node list
+ properties.put(ReservationRequestProperties.key_node_list, "none");
+ if(!duccWorkReservation.getReservationMap().isEmpty()) {
+ StringBuffer sb = new StringBuffer();
+ TreeMap<String,Integer> nodeMap = new TreeMap<String,Integer>();
+ IDuccReservationMap map = duccWorkReservation.getReservationMap();
+ for (DuccId key : map.keySet()) {
+ IDuccReservation value = duccWorkReservation.getReservationMap().get(key);
+ String node = value.getNodeIdentity().getName();
+ if(!nodeMap.containsKey(node)) {
+ nodeMap.put(node,new Integer(0));
+ }
+ Integer count = nodeMap.get(node);
+ count++;
+ nodeMap.put(node,count);
+ }
+ for (String node: nodeMap.keySet()) {
+ sb.append(" "+node+" "+"["+nodeMap.get(node)+"]"+";");
+ }
+ properties.put(ReservationRequestProperties.key_node_list,sb.toString().trim());
+ }
+ duccEvent.setProperties(properties);
+ }
+ else {
+ logger.info(methodName, null, messages.fetch("TODO")+" prepare error reply");
+ //TODO
+ }
+ }
+ catch(Exception e) {
+ logger.error(methodName, null, messages.fetch("TODO")+" prepare error reply",e);
+ //TODO
+ }
+
+
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return;
+ }
+
+
+ public void stopReservation(CancelReservationDuccEvent duccEvent) {
+ String methodName = "stopReservation";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ Properties properties = duccEvent.getProperties();
+ if(isSignatureInvalid(properties)) {
+ String error_message = messages.fetch(" type=authentication error, text=signature not valid.");
+ logger.error(methodName, null, error_message);
+ submitError(properties, error_message);
+ }
+ else {
+ // update state
+ String id = properties.getProperty(ReservationRequestProperties.key_id);
+ long t0 = System.currentTimeMillis();
+ DuccWorkReservation duccWorkReservation = (DuccWorkReservation) workMap.findDuccWork(DuccType.Reservation,id);
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ if(Validate.request(duccEvent,duccWorkReservation)) {
+ if(duccWorkReservation != null) {
+ String cancelUser = properties.getProperty(SpecificationProperties.key_user);
+ duccWorkReservation.getStandardInfo().setCancelUser(cancelUser);
+ duccWorkReservation.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+ duccWorkReservation.stateChange(ReservationState.Completed);
+ duccWorkReservation.complete(ReservationCompletionType.CanceledByUser);
+ String u1 = duccWorkReservation.getStandardInfo().getUser();
+ String u2 = duccWorkReservation.getStandardInfo().getCancelUser();
+ if(u1 != null) {
+ if(u2 != null) {
+ if(!u1.equals(u2)) {
+ duccWorkReservation.complete(ReservationCompletionType.CanceledByAdmin);
+ }
+ }
+ }
+ OrchestratorCheckpoint.getInstance().saveState();
+ // prepare for reply to canceler
+ properties.put(ReservationReplyProperties.key_message, ReservationReplyProperties.msg_canceled);
+ duccEvent.setProperties(properties);
+ logger.info(methodName, duccWorkReservation.getDuccId(), messages.fetchLabel("reservation state")+duccWorkReservation.getReservationState());
+ }
+ else {
+ // prepare undefined reply
+ properties.put(ReservationReplyProperties.key_message, ReservationReplyProperties.msg_not_found);
+ duccEvent.setProperties(properties);
+ logger.info(methodName, null, id+" : "+messages.fetch("reservation not found"));
+ }
+ }
+ else {
+ properties.put(ReservationReplyProperties.key_message, ReservationReplyProperties.msg_user_not_authorized);
+ duccEvent.setProperties(properties);
+ logger.info(methodName, null, id+" : "+messages.fetch("not authorized"));
+ }
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return;
+ }
+
+ /**
+ * Handle Service Submit
+ */
+
+ public void startService(SubmitServiceDuccEvent duccEvent) {
+ String methodName = "startService";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ try {
+ JobRequestProperties properties = (JobRequestProperties) duccEvent.getProperties();
+ NodeIdentity nodeIdentity = hostManager.getNode();
+ if(isSignatureInvalid(properties)) {
+ String error_message = messages.fetch(" type=authentication error, text=signature not valid.");
+ logger.error(methodName, null, error_message);
+ submitError(properties, error_message);
+ }
+ else if(nodeIdentity == null) {
+ String error_message = messages.fetch(" type=system error, text=job driver node unavailable.");
+ logger.error(methodName, null, error_message);
+ submitError(properties, error_message);
+ }
+ else if(!SystemState.getInstance().isAcceptJobs()) {
+ String error_message = messages.fetch(" type=system error, text=system is not accepting new work at this time.");
+ logger.error(methodName, null, error_message);
+ submitError(properties, error_message);
+ }
+ else {
+ logger.debug(methodName, null, messages.fetch("job driver host")+" "+messages.fetchLabel("IP")+nodeIdentity.getIp()+" "+messages.fetchLabel("name")+nodeIdentity.getName());
+ if(Validate.request(duccEvent)) {
+ DuccWorkJob duccWorkJob = jobFactory.create(common,properties);
+ long t0 = System.currentTimeMillis();
+ workMap.addDuccWork(duccWorkJob);
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ // state: Received
+ stateJobAccounting.stateChange(duccWorkJob, JobState.Received);
+ OrchestratorCheckpoint.getInstance().saveState();
+ // state: WaitingForServices
+ stateJobAccounting.stateChange(duccWorkJob, JobState.WaitingForServices);
+ OrchestratorCheckpoint.getInstance().saveState();
+ // prepare for reply to submitter
+ properties.put(JobRequestProperties.key_id, duccWorkJob.getId());
+ duccEvent.setProperties(properties);
+ }
+ else {
+ logger.info(methodName, null, messages.fetch("TODO")+" prepare error reply");
+ //TODO
+ }
+ }
+ }
+ catch(Throwable t) {
+ logger.error(methodName, null, messages.fetch("TODO")+" prepare error reply",t);
+ //TODO
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return;
+ }
+
+ /**
+ * Handle Service Cancel
+ */
+
+ public void stopService(CancelServiceDuccEvent duccEvent) {
+ String methodName = "stopService";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ Properties properties = duccEvent.getProperties();
+ if(isSignatureInvalid(properties)) {
+ String error_message = messages.fetch(" type=authentication error, text=signature not valid.");
+ logger.error(methodName, null, error_message);
+ submitError(properties, error_message);
+ }
+ else if(Validate.request(duccEvent)) {
+ // update state
+ String jobId = properties.getProperty(JobRequestProperties.key_id);
+ long t0 = System.currentTimeMillis();
+ DuccWorkJob duccWorkJob = (DuccWorkJob) workMap.findDuccWork(DuccType.Service,jobId);
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ if(duccWorkJob != null) {
+ String userid = properties.getProperty(JobSpecificationProperties.key_user);
+ IRationale rationale = new Rationale("service canceled by "+userid);
+ stateManager.jobTerminate(duccWorkJob, JobCompletionType.CanceledByUser, rationale, ProcessDeallocationType.JobCanceled);
+ OrchestratorCheckpoint.getInstance().saveState();
+ // prepare for reply to canceler
+ properties.put(JobReplyProperties.key_message, JobReplyProperties.msg_canceled);
+ duccEvent.setProperties(properties);
+ logger.info(methodName, duccWorkJob.getDuccId(), messages.fetchLabel("service state")+duccWorkJob.getJobState());
+ }
+
+ else {
+ // prepare undefined reply
+ properties.put(JobReplyProperties.key_message, JobReplyProperties.msg_not_found);
+ duccEvent.setProperties(properties);
+ logger.info(methodName, null, jobId+" : "+messages.fetch("service not found"));
+ }
+ }
+ else {
+ logger.info(methodName, null, messages.fetch("TODO")+" prepare error reply");
+ //TODO
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorComponent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorConstants.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorConstants.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorConstants.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorConstants.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import java.io.Serializable;
+
+public interface OrchestratorConstants extends Serializable {
+
+ public static StartType startTypeDefault = StartType.warm;
+
+ public enum StartType {
+ cold, // Recover: All is lost JD host: employ new
+ warm, // Recover: Reservations only (default) JD host: employ new
+ hot , // Recover: Reservations and Jobs, JD host: employ current
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorConstants.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.TimeStamp;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
+import org.apache.uima.ducc.transport.event.common.IDuccWork;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkExecutable;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.ITimeWindow;
+import org.apache.uima.ducc.transport.event.common.TimeWindow;
+import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ProcessDeallocationType;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ResourceState;
+import org.apache.uima.ducc.transport.event.jd.DriverStatusReport;
+
+
+public class ProcessAccounting {
+
+ private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(ProcessAccounting.class.getName());
+
+ private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+ private Messages messages = orchestratorCommonArea.getSystemMessages();
+ private DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
+
+ private ConcurrentHashMap<DuccId,DuccId> processToJobMap = new ConcurrentHashMap<DuccId,DuccId>();
+
+ private StateJobAccounting stateJobAccounting = StateJobAccounting.getInstance();
+
+ public ProcessAccounting() {
+ }
+
+ public ProcessAccounting(ConcurrentHashMap<DuccId,DuccId> processToJobMap) {
+ setProcessToJobMap(processToJobMap);
+ }
+
+ public ConcurrentHashMap<DuccId,DuccId> getProcessToJobMap() {
+ return this.processToJobMap;
+ }
+
+ private void setProcessToJobMap(ConcurrentHashMap<DuccId,DuccId> processToJobMap) {
+ this.processToJobMap = processToJobMap;
+ }
+
+ public DuccId getJobId(DuccId processId) {
+ String methodName = "getJobId";
+ DuccId retVal;
+ long t0 = System.currentTimeMillis();
+ synchronized(workMap) {
+ retVal = processToJobMap.get(processId);
+ }
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ return retVal;
+ }
+
+ public int processCount() {
+ String methodName = "processCount";
+ int retVal;
+ long t0 = System.currentTimeMillis();
+ synchronized(workMap) {
+ retVal = processToJobMap.size();
+ }
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ return retVal;
+ }
+
+ public boolean addProcess(DuccId processId, DuccId jobId) {
+ String methodName = "addProcess";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ boolean retVal = false;
+ long t0 = System.currentTimeMillis();
+ synchronized(workMap) {
+ if(!processToJobMap.containsKey(processId)) {
+ processToJobMap.put(processId, jobId);
+ retVal = true;
+ logger.info(methodName, jobId, processId, messages.fetch("added"));
+ }
+ else {
+ logger.warn(methodName, jobId, processId, messages.fetch("exists"));
+ }
+ }
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return retVal;
+ }
+
+ public boolean removeProcess(DuccId processId) {
+ String methodName = "removeProcess";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ boolean retVal = false;
+ long t0 = System.currentTimeMillis();
+ synchronized(workMap) {
+ if(processToJobMap.containsKey(processId)) {
+ DuccId jobId = processToJobMap.remove(processId);
+ retVal = true;
+ logger.info(methodName, jobId, processId, messages.fetch("removed"));
+ }
+ else {
+ logger.info(methodName, null, processId, messages.fetch("not found"));
+ }
+ }
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return retVal;
+ }
+
+ private boolean compare(String a, String b) {
+ boolean retVal = false;
+ if(a == null) {
+ if(b == null) {
+ retVal = true;
+ }
+ }
+ else {
+ return a.equals(b);
+ }
+ return retVal;
+ }
+
+ private boolean compare(ITimeWindow a, ITimeWindow b) {
+ boolean retVal = false;
+ if((a == null) && (b == null)) {
+ retVal = true;
+ }
+ else if((a != null) && (b != null)) {
+ retVal = compare(a.getStart(),b.getStart()) && compare(a.getEnd(),b.getEnd());
+ }
+ return retVal;
+ }
+
+ public void copyTimeInit(IDuccProcess inventoryProcess, IDuccProcess process) {
+ String methodName = "copyTimeInit";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ DuccId processId = inventoryProcess.getDuccId();
+ DuccId jobId = getJobId(processId);
+ ITimeWindow twInit = inventoryProcess.getTimeWindowInit();
+ if(twInit != null) {
+ if(!compare(twInit,process.getTimeWindowInit())) {
+ process.setTimeWindowInit(twInit);
+ String millis;
+ String ts;
+ millis = process.getTimeWindowInit().getStart();
+ if(millis != null) {
+ ts = TimeStamp.simpleFormat(millis);
+ logger.info(methodName, jobId, processId, messages.fetchLabel("initialization start")+ts);
+ }
+ millis = process.getTimeWindowInit().getEnd();
+ if(millis != null) {
+ ts = TimeStamp.simpleFormat(millis);
+ logger.info(methodName, jobId, processId, messages.fetchLabel("initialization end")+ts);
+ }
+ }
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return;
+ }
+
+ public void copyTimeRun(IDuccProcess inventoryProcess, IDuccProcess process) {
+ String methodName = "copyTimeRun";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ DuccId processId = inventoryProcess.getDuccId();
+ DuccId jobId = getJobId(processId);
+ ITimeWindow twRun = inventoryProcess.getTimeWindowRun();
+ if(twRun != null) {
+ if(!compare(twRun,process.getTimeWindowRun())) {
+ process.setTimeWindowRun(twRun);
+ String millis;
+ String ts;
+ millis = process.getTimeWindowRun().getStart();
+ if(millis != null) {
+ ts = TimeStamp.simpleFormat(millis);
+ logger.info(methodName, jobId, processId, messages.fetchLabel("run start")+ts);
+ }
+ millis = process.getTimeWindowRun().getEnd();
+ if(millis != null) {
+ ts = TimeStamp.simpleFormat(millis);
+ logger.info(methodName, jobId, processId, messages.fetchLabel("run end")+ts);
+ }
+ }
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return;
+ }
+
+ private void setResourceStateAndReason(IDuccWorkJob job, IDuccProcess inventoryProcess, IDuccProcess process) {
+ String methodName = "setResourceStateAndReason";
+ logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
+ switch(inventoryProcess.getProcessState()) {
+ case Stopped:
+ case Failed:
+ case FailedInitialization:
+ case InitializationTimeout:
+ case Killed:
+ switch(process.getResourceState()) {
+ case Allocated:
+ process.setResourceState(ResourceState.Deallocated);
+ logger.info(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process state")+inventoryProcess.getProcessState()+" => "+messages.fetchLabel("resource state")+process.getResourceState());
+ String reason = inventoryProcess.getReasonForStoppingProcess();
+ switch(inventoryProcess.getProcessState()) {
+ case Stopped:
+ if(reason != null) {
+ process.setReasonForStoppingProcess(reason);
+ }
+ process.setProcessDeallocationType(ProcessDeallocationType.AutonomousStop);
+ break;
+ case Failed:
+ if(reason != null) {
+ process.setReasonForStoppingProcess(reason);
+ }
+ process.setProcessDeallocationType(ProcessDeallocationType.Failed);
+ break;
+ case Killed:
+ if(reason != null) {
+ process.setReasonForStoppingProcess(reason);
+ }
+ process.setProcessDeallocationType(ProcessDeallocationType.Killed);
+ break;
+ }
+ break;
+ default:
+ logger.debug(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process state")+inventoryProcess.getProcessState()+" => "+messages.fetchLabel("resource state")+process.getResourceState());
+ break;
+ }
+ break;
+ default:
+ logger.debug(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process state")+inventoryProcess.getProcessState()+" => "+messages.fetchLabel("resource state")+process.getResourceState());
+ break;
+ }
+ logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
+ }
+
+ public void copyInventoryProcessState(IDuccWorkJob job, IDuccProcess inventoryProcess, IDuccProcess process) {
+ String methodName = "copyInventoryProcessState";
+ logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
+ if(!compare(inventoryProcess.getProcessState().toString(),process.getProcessState().toString())) {
+ switch((JobState)job.getStateObject()) {
+ //case Initializing:
+ // logger.info(methodName, jobId, processId, messages.fetchLabel("process state ignored")+inventoryProcess.getProcessState());
+ // break;
+ default:
+ process.advanceProcessState(inventoryProcess.getProcessState());
+ if ( inventoryProcess.getProcessJmxUrl() != null && process.getProcessJmxUrl() == null) {
+ process.setProcessJmxUrl(inventoryProcess.getProcessJmxUrl());
+ }
+ if ( inventoryProcess.getGarbageCollectionStats() != null ) {
+ process.setGarbageCollectionStats(inventoryProcess.getGarbageCollectionStats());
+ logger.trace(methodName, job.getDuccId(), process.getDuccId(), "GC Stats Count:"+process.getGarbageCollectionStats().getCollectionCount());
+ }
+ process.setResidentMemory(inventoryProcess.getResidentMemory());
+ logger.trace(methodName, job.getDuccId(), process.getDuccId(), "Resident Memory:"+process.getResidentMemory());
+ process.setCpuTime(inventoryProcess.getCpuTime());
+ logger.trace(methodName, job.getDuccId(), process.getDuccId(), "Cpu Time:"+process.getCpuTime());
+ logger.info(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process state")+process.getProcessState());
+ break;
+ }
+ }
+ logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
+ }
+
+ private void initStop(IDuccWorkJob job, IDuccProcess process) {
+ String ts = TimeStamp.getCurrentMillis();
+ ITimeWindow twi = process.getTimeWindowInit();
+ if(twi == null) {
+ twi = new TimeWindow();
+ twi.setStart(ts);
+ twi.setEnd(ts);
+ process.setTimeWindowRun(twi);
+ }
+ long i0 = twi.getStartLong();
+ long i1 = twi.getEndLong();
+ if(i0 != i1) {
+ if(i1 < i0) {
+ twi.setEnd(ts);
+ }
+ }
+ }
+
+ private void runStop(IDuccWorkJob job, IDuccProcess process) {
+ String ts = TimeStamp.getCurrentMillis();
+ ITimeWindow twi = process.getTimeWindowInit();
+ if(twi == null) {
+ twi = new TimeWindow();
+ twi.setStart(ts);
+ twi.setEnd(ts);
+ process.setTimeWindowRun(twi);
+ }
+ long i0 = twi.getStartLong();
+ long i1 = twi.getEndLong();
+ if(i0 != i1) {
+ if(i1 < i0) {
+ twi.setEnd(ts);
+ i1 = twi.getEndLong();
+ }
+ }
+ ITimeWindow twr = process.getTimeWindowRun();
+ if(twr == null) {
+ twr = new TimeWindow();
+ twr.setStart(twi.getEnd());
+ twr.setEnd(twi.getEnd());
+ process.setTimeWindowRun(twr);
+ }
+ long r0 = twr.getStartLong();
+ long r1 = twr.getEndLong();
+ if(r0 != r1) {
+ if(r0 < i1) {
+ twr.setStart(twi.getEnd());
+ r0 = twr.getStartLong();
+ }
+ if(r1 < r0) {
+ twr.setEnd(ts);
+ }
+ }
+ }
+
+ public void updateProcessTime(IDuccWorkJob job, IDuccProcess inventoryProcess, IDuccProcess process) {
+ String methodName = "updateProcessTime";
+ logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
+ switch(inventoryProcess.getProcessState()) {
+ case Starting: // Process Manager sent request to start the Process
+ case Initializing: // Process Agent is initializing process
+ copyTimeInit(inventoryProcess, process);
+ break;
+ case Running: // Process Agent is processing work items
+ copyTimeInit(inventoryProcess, process);
+ initStop(job, process);
+ copyTimeRun(inventoryProcess, process);
+ break;
+ case Stopped: // Process Agent reports process stopped
+ case Failed: // Process Agent reports process failed
+ case FailedInitialization: // Process Agent reports process failed initialization
+ case InitializationTimeout: // Process Agent reports process initialization timeout
+ case Killed: // Agent forcefully killed the process
+ copyTimeInit(inventoryProcess, process);
+ copyTimeRun(inventoryProcess, process);
+ runStop(job, process);
+ break;
+ case Undefined:
+ break;
+ }
+ logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
+ }
+
+ public void setStatus(IDuccProcess inventoryProcess) {
+ String methodName = "setStatus";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ DuccId processId = inventoryProcess.getDuccId();
+ logger.debug(methodName, null, processId, messages.fetchLabel("node")+inventoryProcess.getNodeIdentity().getName()+" "+messages.fetchLabel("PID")+inventoryProcess.getPID());
+ long t0 = System.currentTimeMillis();
+ synchronized(workMap) {
+ if(processToJobMap.containsKey(processId)) {
+ DuccId jobId = getJobId(processId);
+ IDuccWork duccWork = workMap.findDuccWork(jobId);
+ if(duccWork != null) {
+ if(duccWork instanceof IDuccWorkExecutable) {
+ IDuccWorkExecutable duccWorkExecutable = (IDuccWorkExecutable) duccWork;
+ IDuccWorkJob job = null;
+ if(duccWork instanceof IDuccWorkJob) {
+ job = (IDuccWorkJob)duccWork;
+ }
+ IDuccProcessMap processMap = duccWorkExecutable.getProcessMap();
+ IDuccProcess process = processMap.get(processId);
+ if(process == null) {
+ if(job != null) {
+ process = job.getDriver().getProcessMap().get(processId);
+ }
+ }
+ if(process != null) {
+ // PID
+ String iPID = inventoryProcess.getPID();
+ String pPID = process.getPID();
+ if(!compare(iPID, pPID)) {
+ process.setPID(iPID);
+ logger.info(methodName, jobId, processId, messages.fetchLabel("pPID")+pPID+" "+messages.fetchLabel("iPID")+iPID);
+ }
+ // Scheduler State
+ setResourceStateAndReason(job, inventoryProcess, process);
+ // Process State
+ copyInventoryProcessState(job, inventoryProcess, process);
+ // Process Init & Run times
+ updateProcessTime(job, inventoryProcess, process);
+ // Process Initialization State
+ switch(inventoryProcess.getProcessState()) {
+ case Running:
+ process.setInitialized();
+ if(job != null) {
+ switch(job.getDuccType()) {
+ case Service:
+ switch(job.getJobState()) {
+ case Initializing:
+ stateJobAccounting.stateChange(job, JobState.Running);
+ break;
+ }
+ break;
+ }
+ }
+ }
+ }
+ else {
+ logger.warn(methodName, jobId, processId, messages.fetch("process not found job's process table"));
+ }
+ }
+ else {
+ logger.warn(methodName, jobId, processId, messages.fetch("not executable"));
+ }
+ }
+ else {
+ logger.warn(methodName, jobId, processId, messages.fetch("job ID not found"));
+ }
+ }
+ else {
+ logger.warn(methodName, null, processId, messages.fetch("ID not found in process map"));
+ }
+ }
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return;
+ }
+
+ public boolean setStatus(DriverStatusReport jdStatusReport, DuccWorkJob duccWorkJob) {
+ String methodName = "setStatus";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ boolean retVal = false;
+ String jdTotalWorkItems = ""+jdStatusReport.getWorkItemsTotal();
+ if(!compare(jdTotalWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsTotal())) {
+ duccWorkJob.getSchedulingInfo().setWorkItemsTotal(jdTotalWorkItems);
+ }
+ String jdCompletedWorkItems = ""+jdStatusReport.getWorkItemsProcessingCompleted();
+ if(!compare(jdCompletedWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsCompleted())) {
+ duccWorkJob.getSchedulingInfo().setWorkItemsCompleted(jdCompletedWorkItems);
+ }
+ String jdDispatchedWorkItems = ""+jdStatusReport.getWorkItemsDispatched();
+ if(!compare(jdDispatchedWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsDispatched())) {
+ duccWorkJob.getSchedulingInfo().setWorkItemsDispatched(jdDispatchedWorkItems);
+ }
+ String jdErrorWorkItems = ""+jdStatusReport.getWorkItemsProcessingError();
+ if(!compare(jdErrorWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsError())) {
+ duccWorkJob.getSchedulingInfo().setWorkItemsError(jdErrorWorkItems);
+ }
+ String jdRetryWorkItems = ""+jdStatusReport.getWorkItemsRetry();
+ if(!compare(jdRetryWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsRetry())) {
+ duccWorkJob.getSchedulingInfo().setWorkItemsRetry(jdRetryWorkItems);
+ }
+ String jdPreemptWorkItems = ""+jdStatusReport.getWorkItemsPreempted();
+ if(!compare(jdPreemptWorkItems,duccWorkJob.getSchedulingInfo().getWorkItemsPreempt())) {
+ duccWorkJob.getSchedulingInfo().setWorkItemsPreempt(jdPreemptWorkItems);
+ }
+
+ duccWorkJob.getSchedulingInfo().setCasQueuedMap(jdStatusReport.getCasQueuedMap());
+ duccWorkJob.getSchedulingInfo().setLimboMap(jdStatusReport.getLimboMap());
+
+ duccWorkJob.getSchedulingInfo().setMostRecentWorkItemStart(jdStatusReport.getMostRecentStart());
+
+ duccWorkJob.getSchedulingInfo().setPerWorkItemStatistics(jdStatusReport.getPerWorkItemStatistics());
+
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return retVal;
+ }
+
+ private void deallocate(IDuccWorkJob job, ProcessDeallocationType processDeallocationType, ProcessState processState, IDuccProcessMap processMap, String type) {
+ String methodName = "deallocate";
+ logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
+ if(processMap != null) {
+ Collection<IDuccProcess> processCollection = processMap.values();
+ Iterator<IDuccProcess> iterator = processCollection.iterator();
+ while(iterator.hasNext()) {
+ IDuccProcess process = iterator.next();
+ switch(process.getResourceState()) {
+ case Allocated:
+ process.setResourceState(ResourceState.Deallocated);
+ process.setProcessDeallocationType(processDeallocationType);
+ logger.info(methodName, job.getDuccId(), process.getDuccId(), type);
+ if(processState != null) {
+ logger.debug(methodName, job.getDuccId(), process.getProcessState()+" -> "+processState);
+ process.advanceProcessState(processState);
+ }
+ break;
+ case Deallocated:
+ if(processState != null) {
+ logger.debug(methodName, job.getDuccId(), process.getProcessState()+" -> "+processState);
+ process.advanceProcessState(processState);
+ }
+ break;
+ }
+ }
+ }
+ logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
+ return;
+ }
+
+ private void deallocate(IDuccWorkJob job, ProcessDeallocationType processDeallocationType, ProcessState processState) {
+ String methodName = "deallocate";
+ logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
+ deallocate(job,processDeallocationType,processState,job.getProcessMap(),"worker");
+ switch(job.getDuccType()) {
+ case Job:
+ deallocate(job,processDeallocationType,processState,job.getDriver().getProcessMap(),"driver");
+ break;
+ case Service:
+ break;
+ }
+ logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
+ return;
+ }
+
+ public void deallocate(IDuccWorkJob job, ProcessDeallocationType processDeallocationType) {
+ deallocate(job,processDeallocationType,null);
+ }
+
+ public void deallocateAndStop(IDuccWorkJob job, ProcessDeallocationType processDeallocationType) {
+ deallocate(job,processDeallocationType,ProcessState.Stopped);
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ReservationFactory.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ReservationFactory.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ReservationFactory.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ReservationFactory.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.TimeStamp;
+import org.apache.uima.ducc.common.utils.id.IDuccIdFactory;
+import org.apache.uima.ducc.orchestrator.utilities.MemorySpecification;
+import org.apache.uima.ducc.transport.event.cli.ReservationRequestProperties;
+import org.apache.uima.ducc.transport.event.cli.ReservationSpecificationProperties;
+import org.apache.uima.ducc.transport.event.common.DuccSchedulingInfo;
+import org.apache.uima.ducc.transport.event.common.DuccStandardInfo;
+import org.apache.uima.ducc.transport.event.common.DuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
+
+
+public class ReservationFactory {
+ private static ReservationFactory reservationFactory = new ReservationFactory();
+ private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(ReservationFactory.class.getName());
+
+ public static ReservationFactory getInstance() {
+ return reservationFactory;
+ }
+
+ private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+ private Messages messages = orchestratorCommonArea.getSystemMessages();
+ private IDuccIdFactory reservationDuccIdFactory = orchestratorCommonArea.getReservationDuccIdFactory();
+
+ public DuccWorkReservation create(CommonConfiguration common, ReservationRequestProperties reservationRequestProperties) {
+ String methodName = "create";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ DuccWorkReservation duccWorkReservation = new DuccWorkReservation();
+ // id, type
+ duccWorkReservation.setDuccId(reservationDuccIdFactory.next());
+ duccWorkReservation.setDuccType(DuccType.Reservation);
+ // standard info
+ DuccStandardInfo standardInfo = new DuccStandardInfo();
+ duccWorkReservation.setStandardInfo(standardInfo);
+ standardInfo.setUser(reservationRequestProperties.getProperty(ReservationSpecificationProperties.key_user));
+ standardInfo.setDateOfSubmission(TimeStamp.getCurrentMillis());
+ standardInfo.setDateOfCompletion(null);
+ standardInfo.setDescription(reservationRequestProperties.getProperty(ReservationSpecificationProperties.key_description));
+ // scheduling info
+ DuccSchedulingInfo schedulingInfo = new DuccSchedulingInfo();
+ duccWorkReservation.setSchedulingInfo(schedulingInfo);
+ schedulingInfo.setSchedulingClass(reservationRequestProperties.getProperty(ReservationSpecificationProperties.key_scheduling_class));
+ String memorySize = reservationRequestProperties.getProperty(ReservationSpecificationProperties.key_instance_memory_size);
+ MemorySpecification memorySpecification = new MemorySpecification(memorySize);
+ schedulingInfo.setShareMemorySize(memorySpecification.getSize());
+ schedulingInfo.setShareMemoryUnits(memorySpecification.getMemoryUnits());
+ schedulingInfo.setInstancesCount(reservationRequestProperties.getProperty(ReservationSpecificationProperties.key_number_of_instances));
+ logger.info(methodName, duccWorkReservation.getDuccId(), messages.fetchLabel("user")+standardInfo.getUser());
+ logger.info(methodName, duccWorkReservation.getDuccId(), messages.fetchLabel("description")+standardInfo.getDescription());
+ logger.info(methodName, duccWorkReservation.getDuccId(), messages.fetchLabel("class")+schedulingInfo.getSchedulingClass());
+ logger.info(methodName, duccWorkReservation.getDuccId(), messages.fetchLabel("priority")+schedulingInfo.getSchedulingPriority());
+ logger.info(methodName, duccWorkReservation.getDuccId(), messages.fetchLabel("memory")+schedulingInfo.getShareMemorySize()+schedulingInfo.getShareMemoryUnits());
+ logger.info(methodName, duccWorkReservation.getDuccId(), messages.fetchLabel("instances")+schedulingInfo.getInstancesCount());
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return duccWorkReservation;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ReservationFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateJobAccounting.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateJobAccounting.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateJobAccounting.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateJobAccounting.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.IRationale;
+import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.JobCompletionType;
+import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
+
+
+public class StateJobAccounting {
+
+ private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(StateJobAccounting.class.getName());
+
+ private static StateJobAccounting instance = new StateJobAccounting();
+
+ public static StateJobAccounting getInstance() {
+ return instance;
+ }
+
+ public boolean stateChange(IDuccWorkJob job, JobState state) {
+ String methodName = "stateChange";
+ boolean retVal = false;
+ JobState prev = job.getJobState();
+ JobState next = state;
+ switch(prev) {
+ case Completing:
+ retVal = stateChangeFromCompleting(prev, next);
+ break;
+ case Completed:
+ switch(next) {
+ case Completing:
+ next = prev;
+ break;
+ default:
+ break;
+ }
+ retVal = stateChangeFromCompleted(prev, next);
+ break;
+ case Initializing:
+ retVal = stateChangeFromInitializing(prev, next);
+ break;
+ case Received:
+ retVal = stateChangeFromReceived(prev, next);
+ break;
+ case Running:
+ retVal = stateChangeFromRunning(prev, next);
+ break;
+ case Undefined:
+ retVal = stateChangeFromUndefined(prev, next);
+ break;
+ case WaitingForDriver:
+ retVal = stateChangeFromWaitingForDriver(prev, next);
+ break;
+ case WaitingForResources:
+ retVal = stateChangeFromWaitingForResources(prev, next);
+ break;
+ case WaitingForServices:
+ retVal = stateChangeFromWaitingForServices(prev, next);
+ break;
+ }
+ if(retVal) {
+ job.setJobState(state);
+ logger.info(methodName, job.getDuccId(),"current["+next+"] previous["+prev+"]");
+ }
+ else {
+ try {
+ throw new RuntimeException();
+ }
+ catch(Exception e) {
+ logger.error(methodName, job.getDuccId(),"current["+prev+"] requested["+next+"]"+" ignored", e);
+ }
+ }
+ return retVal;
+ }
+
+ private boolean stateChangeFromCompleting(JobState prev, JobState next) {
+ boolean retVal = false;
+ switch(next) {
+ case Completing: break;
+ case Completed: retVal = true; break;
+ case Initializing: break;
+ case Received: break;
+ case Running: break;
+ case Undefined: break;
+ case WaitingForDriver: break;
+ case WaitingForResources: break;
+ case WaitingForServices: break;
+ }
+ return retVal;
+ }
+
+ private boolean stateChangeFromCompleted(JobState prev, JobState next) {
+ boolean retVal = false;
+ switch(next) {
+ case Completing: break;
+ case Completed: break;
+ case Initializing: break;
+ case Received: break;
+ case Running: break;
+ case Undefined: break;
+ case WaitingForDriver: break;
+ case WaitingForResources: break;
+ case WaitingForServices: break;
+ }
+ return retVal;
+ }
+
+ private boolean stateChangeFromInitializing(JobState prev, JobState next) {
+ boolean retVal = false;
+ switch(next) {
+ case Completing: retVal = true; break;
+ case Completed: retVal = true; break;
+ case Initializing: break;
+ case Received: break;
+ case Running: retVal = true; break;
+ case Undefined: break;
+ case WaitingForDriver: break;
+ case WaitingForResources: break;
+ case WaitingForServices: break;
+ }
+ return retVal;
+ }
+
+ private boolean stateChangeFromReceived(JobState prev, JobState next) {
+ boolean retVal = false;
+ switch(next) {
+ case Completing: retVal = true; break;
+ case Completed: retVal = true; break;
+ case Initializing: break;
+ case Received: break;
+ case Running: break;
+ case Undefined: break;
+ case WaitingForDriver: retVal = true; break;
+ case WaitingForResources: break;
+ case WaitingForServices: retVal = true; break;
+ }
+ return retVal;
+ }
+
+ private boolean stateChangeFromRunning(JobState prev, JobState next) {
+ boolean retVal = false;
+ switch(next) {
+ case Completing: retVal = true; break;
+ case Completed: retVal = true; break;
+ case Initializing: break;
+ case Received: break;
+ case Running: break;
+ case Undefined: break;
+ case WaitingForDriver: break;
+ case WaitingForResources: break;
+ case WaitingForServices: break;
+ }
+ return retVal;
+ }
+
+ private boolean stateChangeFromUndefined(JobState prev, JobState next) {
+ boolean retVal = false;
+ switch(next) {
+ case Completing: break;
+ case Completed: break;
+ case Initializing: break;
+ case Received: retVal = true; break;
+ case Running: break;
+ case Undefined: break;
+ case WaitingForDriver: break;
+ case WaitingForResources: break;
+ case WaitingForServices: break;
+ }
+ return retVal;
+ }
+
+ private boolean stateChangeFromWaitingForDriver(JobState prev, JobState next) {
+ boolean retVal = false;
+ switch(next) {
+ case Completing: retVal = true; break;
+ case Completed: retVal = true; break;
+ case Initializing: break;
+ case Received: break;
+ case Running: break;
+ case Undefined: break;
+ case WaitingForDriver: break;
+ case WaitingForResources: break;
+ case WaitingForServices: retVal = true; break;
+ }
+ return retVal;
+ }
+
+ private boolean stateChangeFromWaitingForResources(JobState prev, JobState next) {
+ boolean retVal = false;
+ switch(next) {
+ case Completing: retVal = true; break;
+ case Completed: retVal = true; break;
+ case Initializing: retVal = true; break;
+ case Received: break;
+ case Running: break;
+ case Undefined: break;
+ case WaitingForDriver: break;
+ case WaitingForResources: break;
+ case WaitingForServices: break;
+ }
+ return retVal;
+ }
+
+ private boolean stateChangeFromWaitingForServices(JobState prev, JobState next) {
+ boolean retVal = false;
+ switch(next) {
+ case Completing: retVal = true; break;
+ case Completed: retVal = true; break;
+ case Initializing: break;
+ case Received: break;
+ case Running: break;
+ case Undefined: break;
+ case WaitingForDriver: break;
+ case WaitingForResources: retVal = true; break;
+ case WaitingForServices: break;
+ }
+ return retVal;
+ }
+
+ public boolean complete(IDuccWorkJob job, JobCompletionType completionType, IRationale completionRationale) {
+ String methodName = "complete";
+ boolean retVal = false;
+ switch(job.getCompletionType()) {
+ case Undefined:
+ retVal = true;
+ break;
+ }
+ if(retVal) {
+ job.setCompletion(completionType,completionRationale);
+ logger.info(methodName, job.getDuccId(), completionType+" "+completionRationale);
+ }
+ else {
+ logger.info(methodName, job.getDuccId(), completionType+" "+"ignored");
+ }
+ return retVal;
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateJobAccounting.java
------------------------------------------------------------------------------
svn:eol-style = native