You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:37:56 UTC
svn commit: r1427956 [3/4] - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator: main/ main/java/
main/java/org/ main/java/org/apache/ main/java/org/apache/uima/
main/java/org/apache/uima/ducc/
main/java/org/apache/uima/ducc/orchestrator/ main/jav...
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateManager.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateManager.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateManager.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,1124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.TimeStamp;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.DuccProcess;
+import org.apache.uima.ducc.transport.event.common.DuccReservation;
+import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+import org.apache.uima.ducc.transport.event.common.DuccWorkPopDriver;
+import org.apache.uima.ducc.transport.event.common.DuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessWorkItems;
+import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
+import org.apache.uima.ducc.transport.event.common.IDuccUimaDeploymentDescriptor;
+import org.apache.uima.ducc.transport.event.common.IDuccWork;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkService;
+import org.apache.uima.ducc.transport.event.common.IRationale;
+import org.apache.uima.ducc.transport.event.common.Rationale;
+import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.JobCompletionType;
+import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.ReservationCompletionType;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
+import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
+import org.apache.uima.ducc.transport.event.common.IDuccState.ReservationState;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ProcessDeallocationType;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ResourceState;
+import org.apache.uima.ducc.transport.event.common.history.HistoryPersistenceManager;
+import org.apache.uima.ducc.transport.event.jd.DriverStatusReport;
+import org.apache.uima.ducc.transport.event.jd.DuccProcessWorkItemsMap;
+import org.apache.uima.ducc.transport.event.rm.IResource;
+import org.apache.uima.ducc.transport.event.rm.IRmJobState;
+import org.apache.uima.ducc.transport.event.sm.ServiceDependency;
+import org.apache.uima.ducc.transport.event.sm.ServiceMap;
+import org.apache.uima.ducc.transport.event.sm.IService.ServiceState;
+
+
+public class StateManager {
+ private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(StateManager.class.getName());
+
+ private static StateManager stateManager = new StateManager();
+
+ public static StateManager getInstance() {
+ return stateManager;
+ }
+
+ private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+ private Messages messages = orchestratorCommonArea.getSystemMessages();
+ private DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
+ private ConcurrentHashMap<DuccId,DriverStatusReport> driverStatusReportMap = orchestratorCommonArea.getDriverStatusReportMap();
+ private StateJobAccounting stateJobAccounting = StateJobAccounting.getInstance();
+
+ HistoryPersistenceManager hpm = orchestratorCommonArea.getHistoryPersistencemanager();
+
+ private boolean jobDriverTerminated(DuccWorkJob duccWorkJob) {
+ String methodName = "jobDriverTerminated";
+ boolean status = true;
+ logger.trace(methodName, null, messages.fetch("enter"));
+ IDuccProcessMap processMap = duccWorkJob.getDriver().getProcessMap();
+ Iterator<DuccId> processMapIterator = processMap.keySet().iterator();
+ while(processMapIterator.hasNext()) {
+ DuccId duccId = processMapIterator.next();
+ IDuccProcess process = processMap.get(duccId);
+ if(process.isActive()) {
+ logger.debug(methodName, duccId, messages.fetch("processes active"));
+ status = false;
+ }
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return status;
+ }
+
+ private boolean jobProcessesTerminated(DuccWorkJob duccWorkJob) {
+ String methodName = "jobProcessesTerminated";
+ boolean status = true;
+ logger.trace(methodName, null, messages.fetch("enter"));
+ IDuccProcessMap processMap = duccWorkJob.getProcessMap();
+ Iterator<DuccId> processMapIterator = processMap.keySet().iterator();
+ while(processMapIterator.hasNext()) {
+ DuccId duccId = processMapIterator.next();
+ IDuccProcess process = processMap.get(duccId);
+ if(process.isActive()) {
+ logger.debug(methodName, duccId, messages.fetch("processes active"));
+ status = false;
+ }
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return status;
+ }
+
+ private boolean allProcessesTerminated(DuccWorkJob duccWorkJob) {
+ String methodName = "allProcessesTerminated";
+ boolean status = false;
+ logger.trace(methodName, null, messages.fetch("enter"));
+ switch(duccWorkJob.getDuccType()) {
+ case Job:
+ if(jobDriverTerminated(duccWorkJob)) {
+ if(jobProcessesTerminated(duccWorkJob)) {
+ status = true;
+ if(duccWorkJob.getStandardInfo().getDateOfShutdownProcessesMillis() <= 0) {
+ duccWorkJob.getStandardInfo().setDateOfShutdownProcesses(TimeStamp.getCurrentMillis());
+ }
+ }
+ }
+ break;
+ case Service:
+ if(jobProcessesTerminated(duccWorkJob)) {
+ status = true;
+ if(duccWorkJob.getStandardInfo().getDateOfShutdownProcessesMillis() <= 0) {
+ duccWorkJob.getStandardInfo().setDateOfShutdownProcesses(TimeStamp.getCurrentMillis());
+ }
+ }
+ break;
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return status;
+ }
+
+ private long SECONDS = 1000;
+ private long MINUTES = 60 * SECONDS;
+ private long AgeTime = 1 * MINUTES;
+
+ private boolean isAgedOut(IDuccWork duccWork) {
+ String methodName = "isAgedOut";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ boolean retVal = true;
+ long endMillis = 0;
+ long nowMillis = 0;
+ long elapsed = 0;
+ try {
+ endMillis = duccWork.getStandardInfo().getDateOfCompletionMillis();
+ nowMillis = System.currentTimeMillis();
+ elapsed = (nowMillis - endMillis);
+ if(elapsed <= AgeTime) {
+ retVal = false;
+ }
+ endMillis = duccWork.getStandardInfo().getDateOfShutdownProcessesMillis();
+ elapsed = (nowMillis - endMillis);
+ if(elapsed <= AgeTime) {
+ retVal = false;
+ }
+ }
+ catch(Exception e) {
+ logger.error(methodName, null, "nowMillis:"+endMillis+" "+"nowMillis:"+endMillis+" ", e);
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return retVal;
+ }
+
+ public boolean isSaved(IDuccWorkJob duccWorkJob) {
+ String methodName = "isSaved";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ boolean retVal = false;
+ try {
+ switch(duccWorkJob.getDuccType()) {
+ case Job:
+ hpm.jobSave(duccWorkJob);
+ retVal = true;
+ break;
+ case Service:
+ hpm.serviceSave((IDuccWorkService)duccWorkJob);
+ retVal = true;
+ break;
+ }
+ }
+ catch(Exception e) {
+ logger.error(methodName, duccWorkJob.getDuccId(), e);
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return retVal;
+ }
+
+ public boolean isSaved(IDuccWorkReservation duccWorkReservation) {
+ String methodName = "isSaved";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ boolean retVal = false;
+ try {
+ hpm.reservationSave(duccWorkReservation);
+ retVal = true;
+ }
+ catch(Exception e) {
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return retVal;
+ }
+
+ public int prune(DuccWorkMap workMap) {
+ String methodName = "prune";
+ int changes = 0;
+ logger.trace(methodName, null, messages.fetch("enter"));
+ long t0 = System.currentTimeMillis();
+ Iterator<DuccId> workMapIterator = workMap.keySet().iterator();
+ while(workMapIterator.hasNext()) {
+ DuccId duccId = workMapIterator.next();
+ IDuccWork duccWork = workMap.findDuccWork(duccId);
+ switch(duccWork.getDuccType()) {
+ case Job:
+ case Service:
+ DuccWorkJob duccWorkJob = (DuccWorkJob)duccWork;
+ if(duccWorkJob != null) {
+ if(duccWorkJob.isCompleting() && allProcessesTerminated(duccWorkJob)) {
+ stateJobAccounting.stateChange(duccWorkJob, JobState.Completed);
+ }
+ if(duccWorkJob.isCompleted() && allProcessesTerminated(duccWorkJob) && isSaved(duccWorkJob) && isAgedOut(duccWorkJob)) {
+ workMap.removeDuccWork(duccId);
+ driverStatusReportMap.remove(duccId);
+ logger.info(methodName, duccId, messages.fetch("removed job"));
+ changes ++;
+ IDuccProcessMap processMap = duccWorkJob.getProcessMap();
+ Iterator<DuccId> processMapIterator = processMap.keySet().iterator();
+ while(processMapIterator.hasNext()) {
+ DuccId processDuccId = processMapIterator.next();
+ orchestratorCommonArea.getProcessAccounting().removeProcess(processDuccId);
+ logger.info(methodName, duccId, messages.fetch("removed process")+" "+processDuccId.toString());
+ changes ++;
+ }
+ logger.info(methodName, duccId, messages.fetch("processes inactive"));
+ }
+ else {
+ logger.debug(methodName, duccId, messages.fetch("processes active"));
+ }
+ }
+ break;
+ case Reservation:
+ DuccWorkReservation duccWorkReservation = (DuccWorkReservation)duccWork;
+ if(duccWorkReservation != null) {
+ if(duccWorkReservation.isCompleted() && isSaved(duccWorkReservation) && isAgedOut(duccWorkReservation)) {
+ workMap.removeDuccWork(duccId);
+ logger.info(methodName, duccId, messages.fetch("removed reservation"));
+ changes ++;
+ }
+ }
+ break;
+ }
+ }
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ logger.debug(methodName, null, "processToWorkMap.size()="+orchestratorCommonArea.getProcessAccounting().processCount());
+ if(changes > 0) {
+ OrchestratorCheckpoint.getInstance().saveState();
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return changes;
+ }
+
+ private int stateChange(DuccWorkJob duccWorkJob, JobState state) {
+ stateJobAccounting.stateChange(duccWorkJob, state);
+ return 1;
+ }
+
+ private int stateChange(DuccWorkReservation duccWorkReservation, ReservationState state) {
+ duccWorkReservation.stateChange(state);
+ return 1;
+ }
+
+ private void setJdJmxUrl(DuccWorkJob job, String jdJmxUrl) {
+ if(jdJmxUrl != null) {
+ DuccWorkPopDriver driver = job.getDriver();
+ IDuccProcessMap processMap = driver.getProcessMap();
+ if(processMap != null) {
+ Collection<IDuccProcess> processCollection = processMap.values();
+ Iterator<IDuccProcess> iterator = processCollection.iterator();
+ while(iterator.hasNext()) {
+ IDuccProcess process = iterator.next();
+ process.setProcessJmxUrl(jdJmxUrl);
+
+ }
+ }
+ }
+ }
+
+ private void copyProcessWorkItemsReport(DuccWorkJob job, DriverStatusReport jdStatusReport) {
+ String methodName = "copyProcessWorkItemsReport";
+ try {
+ IDuccProcessMap processMap = job.getProcessMap();
+ DuccProcessWorkItemsMap pwiMap = jdStatusReport.getDuccProcessWorkItemsMap();
+ Iterator<DuccId> iterator = pwiMap.keySet().iterator();
+ while(iterator.hasNext()) {
+ DuccId processId = iterator.next();
+ IDuccProcess process = processMap.get(processId);
+ IDuccProcessWorkItems pwi = pwiMap.get(processId);
+ process.setProcessWorkItems(pwi);
+ }
+ }
+ catch(Throwable t) {
+ logger.error(methodName, job.getDuccId(), t);
+ }
+ }
+
+ private void copyDriverWorkItemsReport(DuccWorkJob job, DriverStatusReport jdStatusReport) {
+ String methodName = "copyDriverWorkItemsReport";
+ try {
+ DuccProcessWorkItemsMap pwiMap = jdStatusReport.getDuccProcessWorkItemsMap();
+ IDuccProcessWorkItems pwi = pwiMap.getTotals();
+ DuccWorkPopDriver driver = job.getDriver();
+ IDuccProcessMap processMap = driver.getProcessMap();
+ if(processMap != null) {
+ Iterator<DuccId> iterator = processMap.keySet().iterator();
+ while(iterator.hasNext()) {
+ DuccId processId = iterator.next();
+ IDuccProcess process = processMap.get(processId);
+ process.setProcessWorkItems(pwi);
+ }
+ }
+ }
+ catch(Throwable t) {
+ logger.error(methodName, job.getDuccId(), t);
+ }
+ }
+
+ /**
+ * JD reconciliation
+ */
+ public void reconcileState(DriverStatusReport jdStatusReport) {
+ String methodName = "reconcileState (JD)";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ int changes = 0;
+ long t0 = System.currentTimeMillis();
+ synchronized(workMap) {
+ DuccId duccId = jdStatusReport.getDuccId();
+ DuccWorkJob duccWorkJob = (DuccWorkJob) workMap.findDuccWork(duccId);
+ if(duccWorkJob != null) {
+ String jdJmxUrl = jdStatusReport.getJdJmxUrl();
+ setJdJmxUrl(duccWorkJob, jdJmxUrl);
+ IDuccUimaDeploymentDescriptor uimaDeploymentDescriptor = jdStatusReport.getUimaDeploymentDescriptor();
+ if(uimaDeploymentDescriptor != null) {
+ boolean copyDD = true;
+ if(copyDD) {
+ duccWorkJob.setUimaDeployableConfiguration(uimaDeploymentDescriptor);
+ }
+ }
+ //
+ copyProcessWorkItemsReport(duccWorkJob, jdStatusReport);
+ copyDriverWorkItemsReport(duccWorkJob, jdStatusReport);
+ //
+ switch(duccWorkJob.getJobState()) {
+ case Completed:
+ break;
+ case Completing:
+ default:
+ driverStatusReportMap.put(duccId, jdStatusReport);
+ break;
+ }
+ //
+ switch(jdStatusReport.getDriverState()) {
+ case NotRunning:
+ break;
+ case Initializing:
+ switch(duccWorkJob.getJobState()) {
+ case WaitingForDriver:
+ stateJobAccounting.stateChange(duccWorkJob, JobState.WaitingForServices);
+ break;
+ case Initializing:
+ break;
+ }
+ break;
+ case Running:
+ case Idle:
+ if(jdStatusReport.isKillJob()) {
+ jobTerminate(duccWorkJob, JobCompletionType.CanceledByDriver, jdStatusReport.getJobCompletionRationale(), ProcessDeallocationType.JobFailure);
+ break;
+ }
+ switch(duccWorkJob.getJobState()) {
+ case WaitingForDriver:
+ stateJobAccounting.stateChange(duccWorkJob, JobState.WaitingForServices);
+ break;
+ case Initializing:
+ stateJobAccounting.stateChange(duccWorkJob, JobState.Running);
+ break;
+ }
+ break;
+ case Completed:
+ if(!duccWorkJob.isFinished()) {
+ stateJobAccounting.stateChange(duccWorkJob, JobState.Completing);
+ deallocateJobDriver(duccWorkJob, jdStatusReport);
+ }
+ duccWorkJob.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+ switch(jdStatusReport.getJobCompletionType()) {
+ case EndOfJob:
+ duccWorkJob.setCompletion(JobCompletionType.EndOfJob, new Rationale("state manager detected normal completion"));
+ try {
+ int errors = Integer.parseInt(duccWorkJob.getSchedulingInfo().getWorkItemsError());
+ if(errors > 0) {
+ duccWorkJob.setCompletion(JobCompletionType.Error, new Rationale("state manager detected errors="+errors));
+ }
+ }
+ catch(Exception e) {
+ }
+ break;
+ default:
+ duccWorkJob.setCompletion(jdStatusReport.getJobCompletionType(),jdStatusReport.getJobCompletionRationale());
+ break;
+ }
+ break;
+ case Undefined:
+ break;
+ }
+ OrchestratorCommonArea.getInstance().getProcessAccounting().setStatus(jdStatusReport,duccWorkJob);
+ if(deallocateIdleProcesses(duccWorkJob, jdStatusReport)) {
+ changes++;
+ }
+ if(deallocateFailedProcesses(duccWorkJob, jdStatusReport)) {
+ changes++;
+ }
+ }
+ else {
+ logger.warn(methodName, duccId, messages.fetch("not found"));
+ }
+ }
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ if(changes > 0) {
+ OrchestratorCheckpoint.getInstance().saveState();
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ }
+
+ public boolean isExcessCapacity(DuccWorkJob job, DriverStatusReport jdStatusReport) {
+ String methodName = "isExcessCapacity";
+ boolean retVal = false;
+ if(jdStatusReport != null) {
+ IDuccProcessMap processMap = job.getProcessMap();
+ int threads_per_share = Integer.parseInt(job.getSchedulingInfo().getThreadsPerShare());
+ long capacity = processMap.getUsableProcessCount() * threads_per_share;
+ long total = jdStatusReport.getWorkItemsTotal();
+ long done = jdStatusReport.getWorkItemsProcessingCompleted();
+ long error = jdStatusReport.getWorkItemsProcessingError();
+ long todo = total - (done + error);
+ if(capacity > 0) {
+ if(todo < capacity) {
+ retVal = true;
+ }
+ }
+ logger.info(methodName, job.getDuccId(), "todo:"+todo+" "+"capacity:"+capacity+" "+"excess:"+retVal);
+ }
+ else {
+ logger.info(methodName, job.getDuccId(), "todo:"+"?"+" "+"capacity:"+"?"+" "+"excess:"+retVal);
+ }
+ return retVal;
+ }
+
+ private boolean deallocateIdleProcesses(DuccWorkJob job, DriverStatusReport jdStatusReport) {
+ String methodName = "deallocateIdleProcesses";
+ boolean retVal = false;
+ if(!jdStatusReport.isPending() && !jdStatusReport.isWorkItemPendingProcessAssignment()) {
+ IDuccProcessMap processMap = job.getProcessMap();
+ Iterator<DuccId> iterator = processMap.keySet().iterator();
+ boolean excessCapacity = isExcessCapacity(job, jdStatusReport);
+ while(iterator.hasNext() && excessCapacity) {
+ DuccId duccId = iterator.next();
+ IDuccProcess process = processMap.get(duccId);
+ if(!process.isDeallocated()) {
+ String nodeIP = process.getNodeIdentity().getIp();
+ String PID = process.getPID();
+ if(!jdStatusReport.isOperating(nodeIP, PID)) {
+ process.setResourceState(ResourceState.Deallocated);
+ process.setProcessDeallocationType(ProcessDeallocationType.Voluntary);
+ logger.info(methodName, job.getDuccId(), process.getDuccId(), "deallocated");
+ retVal = true;
+ excessCapacity = isExcessCapacity(job, jdStatusReport);
+ }
+ }
+ }
+ }
+ return retVal;
+ }
+
+ private boolean deallocateFailedProcesses(DuccWorkJob job, DriverStatusReport jdStatusReport) {
+ String methodName = "deallocateFailedProcesses";
+ boolean retVal = false;
+ IDuccProcessMap processMap = job.getProcessMap();
+ Iterator<DuccId> iterator = jdStatusReport.getKillDuccIds();
+ while (iterator.hasNext()) {
+ DuccId duccId = iterator.next();
+ IDuccProcess process = processMap.get(duccId);
+ if(process != null) {
+ if(!process.isDeallocated()) {
+ process.setResourceState(ResourceState.Deallocated);
+ process.setProcessDeallocationType(ProcessDeallocationType.Exception);
+ logger.info(methodName, job.getDuccId(), process.getDuccId(), "deallocated");
+ }
+ }
+ else {
+ logger.warn(methodName, job.getDuccId(), duccId, "not in process map");
+ }
+ }
+ return retVal;
+ }
+
+
+ private boolean deallocateJobDriver(DuccWorkJob job, DriverStatusReport jdStatusReport) {
+ String methodName = "deallocateJobDriver";
+ boolean retVal = false;
+ IDuccProcessMap processMap = job.getDriver().getProcessMap();
+ Iterator<DuccId> iterator = processMap.keySet().iterator();
+ while (iterator.hasNext()) {
+ DuccId duccId = iterator.next();
+ IDuccProcess process = processMap.get(duccId);
+ if(process != null) {
+ if(!process.isDeallocated()) {
+ process.setResourceState(ResourceState.Deallocated);
+ process.setProcessDeallocationType(ProcessDeallocationType.Voluntary);
+ logger.info(methodName, job.getDuccId(), process.getDuccId(), "deallocated");
+ }
+ }
+ else {
+ logger.warn(methodName, job.getDuccId(), duccId, "not in process map");
+ }
+ }
+ return retVal;
+ }
+
+ /**
+ * RM reconciliation
+ */
+ public void reconcileState(Map<DuccId, IRmJobState> rmResourceStateMap) throws Exception {
+ String methodName = "reconcileState (RM)";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ logger.debug(methodName, null, messages.fetchLabel("size")+rmResourceStateMap.size());
+ int changes = 0;
+ long t0 = System.currentTimeMillis();
+ synchronized(workMap) {
+ Iterator<DuccId> rmResourceStateIterator = rmResourceStateMap.keySet().iterator();
+ while(rmResourceStateIterator.hasNext()) {
+ DuccId duccId = rmResourceStateIterator.next();
+ IRmJobState rmResourceState = rmResourceStateMap.get(duccId);
+ if(rmResourceState.getPendingAdditions() != null) {
+ logger.debug(methodName, duccId, messages.fetchLabel("pending additions")+rmResourceState.getPendingAdditions().size());
+ }
+ if(rmResourceState.getPendingRemovals() != null) {
+ logger.debug(methodName, duccId, messages.fetchLabel("pending removals")+rmResourceState.getPendingRemovals().size());
+ }
+ IDuccWork duccWork = workMap.findDuccWork(duccId);
+ if(duccWork== null) {
+ logger.debug(methodName, duccId, messages.fetch("not found"));
+ }
+ else {
+ logger.debug(methodName, duccId, messages.fetchLabel("type")+duccWork.getDuccType());
+ switch(duccWork.getDuccType()) {
+ case Job:
+ logger.debug(methodName, duccId, messages.fetch("processing job..."));
+ DuccWorkJob duccWorkJob = (DuccWorkJob) duccWork;
+ processPurger(duccWorkJob,rmResourceState.getResources());
+ changes += processMapResourcesAdd(duccWorkJob,rmResourceState.getPendingAdditions());
+ changes += processMapResourcesDel(duccWorkJob,rmResourceState.getPendingRemovals());
+ JobState jobState = duccWorkJob.getJobState();
+ logger.debug(methodName, duccId, messages.fetchLabel("job state")+jobState);
+ switch(jobState) {
+ case Received:
+ case WaitingForDriver:
+ logger.warn(methodName, duccId, messages.fetchLabel("unexpected state")+jobState);
+ break;
+ case WaitingForServices:
+ logger.debug(methodName, duccId, messages.fetchLabel("unexpected state")+jobState);
+ break;
+ case WaitingForResources:
+ if(rmResourceState.isRefused()) {
+ duccWorkJob.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+ duccWorkJob.setCompletionType(JobCompletionType.ResourcesUnavailable);
+ duccWorkJob.setCompletionRationale(new Rationale("resource manager refused allocation: "+rmResourceState.getReason()));
+ changes += stateChange(duccWorkJob,JobState.Completed);
+ logger.warn(methodName, duccId, messages.fetchLabel("refused")+rmResourceState.getReason());
+ }
+ if(duccWorkJob.getProcessMap().size() > 0) {
+ changes += stateChange(duccWorkJob,JobState.Initializing);
+ logger.info(methodName, duccId, messages.fetchLabel("resources count")+duccWorkJob.getProcessMap().size());
+ }
+ break;
+ case Initializing:
+ case Running:
+ if(duccWorkJob.getProcessMap().size() == 0) {
+ changes += stateChange(duccWorkJob,JobState.WaitingForResources);
+ logger.info(methodName, duccId, messages.fetchLabel("resources count")+duccWorkJob.getProcessMap().size());
+ }
+ break;
+ case Completing:
+ case Completed:
+ logger.debug(methodName, duccId, messages.fetchLabel("unsuitable state")+jobState);
+ break;
+ case Undefined:
+ logger.warn(methodName, duccId, messages.fetchLabel("unsuitable state")+jobState);
+ break;
+ }
+ break;
+ case Reservation:
+ logger.debug(methodName, duccId, messages.fetch("processing reservation..."));
+ DuccWorkReservation duccWorkReservation = (DuccWorkReservation) duccWork;
+ changes += reservationMapResourcesAdd(duccWorkReservation,rmResourceState.getPendingAdditions());
+ changes += reservationMapResourcesDel(duccWorkReservation,rmResourceState.getPendingRemovals());
+ ReservationState reservationState = duccWorkReservation.getReservationState();
+ switch(reservationState) {
+ case Received:
+ logger.warn(methodName, duccId, messages.fetchLabel("unexpected state")+reservationState);
+ break;
+ case WaitingForResources:
+ if(rmResourceState.isRefused()) {
+ duccWorkReservation.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+ duccWorkReservation.setCompletionType(ReservationCompletionType.ResourcesUnavailable);
+ duccWorkReservation.setCompletionRationale(new Rationale("resource manager refused allocation: "+rmResourceState.getReason()));
+ changes += stateChange(duccWorkReservation,ReservationState.Completed);
+ logger.warn(methodName, duccId, messages.fetchLabel("refused")+rmResourceState.getReason());
+ }
+ else {
+ if(rmResourceState.getResources() != null) {
+ if(!rmResourceState.getResources().isEmpty()) {
+ changes += stateChange(duccWorkReservation,ReservationState.Assigned);
+ logger.info(methodName, duccId, messages.fetchLabel("resources count")+rmResourceState.getResources().size());
+ }
+ }
+ else {
+ logger.info(methodName, duccId, messages.fetch("waiting...no resources?"));
+ }
+ }
+ break;
+ case Assigned:
+ if(rmResourceState.getResources() != null) {
+ if(rmResourceState.getResources().isEmpty()) {
+ changes += stateChange(duccWorkReservation,ReservationState.Completed);
+ logger.info(methodName, duccId, messages.fetchLabel("resources count")+rmResourceState.getResources().size());
+ }
+ }
+ else {
+ logger.info(methodName, duccId, messages.fetch("assigned...no resources?"));
+ }
+ break;
+ case Completed:
+ logger.debug(methodName, duccId, messages.fetchLabel("unsuitable state")+reservationState);
+ break;
+ case Undefined:
+ logger.warn(methodName, duccId, messages.fetchLabel("unsuitable state")+reservationState);
+ break;
+ }
+ break;
+ case Service:
+ logger.debug(methodName, duccId, messages.fetch("processing service..."));
+ DuccWorkJob duccWorkService = (DuccWorkJob) duccWork;
+ processPurger(duccWorkService,rmResourceState.getResources());
+ changes += processMapResourcesAdd(duccWorkService,rmResourceState.getPendingAdditions());
+ changes += processMapResourcesDel(duccWorkService,rmResourceState.getPendingRemovals());
+ JobState serviceState = duccWorkService.getJobState();
+ logger.debug(methodName, duccId, messages.fetchLabel("service state")+serviceState);
+ switch(serviceState) {
+ case Received:
+ logger.warn(methodName, duccId, messages.fetchLabel("unexpected state")+serviceState);
+ break;
+ case WaitingForServices:
+ logger.debug(methodName, duccId, messages.fetchLabel("unexpected state")+serviceState);
+ break;
+ case WaitingForResources:
+ if(rmResourceState.isRefused()) {
+ duccWorkService.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+ duccWorkService.setCompletionType(JobCompletionType.ResourcesUnavailable);
+ duccWorkService.setCompletionRationale(new Rationale("resource manager refused allocation: "+rmResourceState.getReason()));
+ changes += stateChange(duccWorkService,JobState.Completed);
+ logger.warn(methodName, duccId, messages.fetchLabel("refused")+rmResourceState.getReason());
+ }
+ if(duccWorkService.getProcessMap().size() > 0) {
+ changes += stateChange(duccWorkService,JobState.Initializing);
+ logger.info(methodName, duccId, messages.fetchLabel("resources count")+duccWorkService.getProcessMap().size());
+ }
+ break;
+ case Initializing:
+ case Running:
+ if(duccWorkService.getProcessMap().size() == 0) {
+ changes += stateChange(duccWorkService,JobState.WaitingForResources);
+ logger.info(methodName, duccId, messages.fetchLabel("resources count")+duccWorkService.getProcessMap().size());
+ }
+ break;
+ case Completing:
+ case Completed:
+ logger.debug(methodName, duccId, messages.fetchLabel("unsuitable state")+serviceState);
+ break;
+ case Undefined:
+ logger.warn(methodName, duccId, messages.fetchLabel("unsuitable state")+serviceState);
+ break;
+ }
+ break;
+ }
+ }
+ }
+ if(changes > 0) {
+ OrchestratorCheckpoint.getInstance().saveState();
+ }
+ }
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ } logger.trace(methodName, null, messages.fetch("exit"));
+ }
+
+ private int processPurger(DuccWorkJob job,Map<DuccId, IResource> map) {
+ String methodName = "processPurger";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ int changes = 0;
+ if(job != null) {
+ if(map != null) {
+ Iterator<DuccId> iterator = map.keySet().iterator();
+ while(iterator.hasNext()) {
+ DuccId duccId = iterator.next();
+ IResource resource = map.get(duccId);
+ if(resource.isPurged()) {
+ IDuccProcess process = job.getProcessMap().get(duccId);
+ if(!process.isDefunct()) {
+ String rState = process.getResourceState().toString();
+ String pState = process.getProcessState().toString();
+ logger.info(methodName, job.getDuccId(), duccId, "rState:"+rState+" "+"pState"+pState);
+ process.setResourceState(ResourceState.Deallocated);
+ process.setProcessDeallocationType(ProcessDeallocationType.Purged);
+ process.advanceProcessState(ProcessState.Stopped);
+ }
+ }
+ }
+ }
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return changes;
+ }
+
+ private int processMapResourcesAdd(DuccWorkJob duccWorkJob,Map<DuccId,IResource> resourceMap) {
+ String methodName = "processMapResourcesAdd";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ int changes = 0;
+ if(resourceMap == null) {
+ logger.info(methodName, duccWorkJob.getDuccId(), messages.fetch("no map found"));
+ }
+ else {
+ IDuccProcessMap processMap = duccWorkJob.getProcessMap();
+ Iterator<DuccId> resourceMapIterator = resourceMap.keySet().iterator();
+ while(resourceMapIterator.hasNext()) {
+ DuccId duccId = resourceMapIterator.next();
+ NodeIdentity nodeId = resourceMap.get(duccId).getNodeId();
+ if(!processMap.containsKey(duccId)) {
+ DuccProcess process = new DuccProcess(duccId, nodeId, ProcessType.Job_Uima_AS_Process);
+ orchestratorCommonArea.getProcessAccounting().addProcess(duccId, duccWorkJob.getDuccId());
+ processMap.addProcess(process);
+ process.setResourceState(ResourceState.Allocated);
+ logger.info(methodName, duccWorkJob.getDuccId(), messages.fetch("resource added")
+ +" "+messages.fetchLabel("process")+duccId.getFriendly()
+ +" "+messages.fetchLabel("unique")+duccId.getUnique()
+ +" "+messages.fetchLabel("name")+nodeId.getName()
+ +" "+messages.fetchLabel("ip")+nodeId.getIp());
+ changes++;
+ // check on usefulness of recent allocation
+ switch(duccWorkJob.getJobState()) {
+ // allocation unnecessary if job is already completed
+ case Completing:
+ case Completed:
+ process.setResourceState(ResourceState.Deallocated);
+ process.setProcessDeallocationType(ProcessDeallocationType.Voluntary);
+ process.advanceProcessState(ProcessState.Stopped);
+ logger.warn(methodName, duccWorkJob.getDuccId(),
+ messages.fetch("resource allocated for completed job")
+ +" "+
+ messages.fetchLabel("process")+duccId.getFriendly()
+ );
+ break;
+ default:
+ // allocation unnecessary if job has excess capacity
+ if(isExcessCapacity(duccWorkJob,driverStatusReportMap.get(duccId))) {
+ process.setResourceState(ResourceState.Deallocated);
+ process.setProcessDeallocationType(ProcessDeallocationType.Voluntary);
+ process.advanceProcessState(ProcessState.Stopped);
+ logger.warn(methodName, duccWorkJob.getDuccId(),
+ messages.fetch("resource allocated for over capacity job")
+ +" "+
+ messages.fetchLabel("process")+duccId.getFriendly()
+ );
+ }
+ break;
+ }
+ }
+ else {
+ logger.warn(methodName, duccWorkJob.getDuccId(), messages.fetch("resource exists")
+ +" "+messages.fetchLabel("process")+duccId.getFriendly()
+ +" "+messages.fetchLabel("unique")+duccId.getUnique()
+ +" "+messages.fetchLabel("name")+nodeId.getName()
+ +" "+messages.fetchLabel("ip")+nodeId.getIp());
+ }
+ }
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return changes;
+ }
+
+ private int processMapResourcesDel(DuccWorkJob duccWorkJob,Map<DuccId,IResource> resourceMap) {
+ String methodName = "processMapResourcesDel";
+ logger.trace(methodName, duccWorkJob.getDuccId(), messages.fetch("enter"));
+ int changes = 0;
+ if(resourceMap == null) {
+ logger.info(methodName, duccWorkJob.getDuccId(), messages.fetch("no map found"));
+ }
+ else {
+ IDuccProcessMap processMap = duccWorkJob.getProcessMap();
+ Iterator<DuccId> resourceMapIterator = resourceMap.keySet().iterator();
+ logger.debug(methodName, duccWorkJob.getDuccId(), messages.fetchLabel("size")+processMap.size());
+ while(resourceMapIterator.hasNext()) {
+ DuccId duccId = resourceMapIterator.next();
+ NodeIdentity nodeId = resourceMap.get(duccId).getNodeId();
+ logger.info(methodName, duccWorkJob.getDuccId(), messages.fetch("resource processing")
+ +" "+messages.fetchLabel("process")+duccId.getFriendly()
+ +" "+messages.fetchLabel("unique")+duccId.getUnique()
+ +" "+messages.fetchLabel("name")+nodeId.getName()
+ +" "+messages.fetchLabel("ip")+nodeId.getIp());
+ if(processMap.containsKey(duccId)) {
+ IDuccProcess process = processMap.get(duccId);
+ process.setResourceState(ResourceState.Deallocated);
+ process.setProcessDeallocationType(ProcessDeallocationType.Forced);
+ logger.info(methodName, duccWorkJob.getDuccId(), messages.fetch("resource deallocated")
+ +" "+messages.fetchLabel("process")+duccId.getFriendly()
+ +" "+messages.fetchLabel("unique")+duccId.getUnique()
+ +" "+messages.fetchLabel("name")+nodeId.getName()
+ +" "+messages.fetchLabel("ip")+nodeId.getIp());
+ /*
+ if(process.isDefunct()) {
+ orchestratorCommonArea.getProcessAccounting().removeProcess(duccId);
+ processMap.removeProcess(duccId);
+ logger.info(methodName, duccId, messages.fetch("remove resource")+" "+messages.fetchLabel("name")+nodeId.getName()+" "+messages.fetchLabel("ip")+nodeId.getIp());
+ changes++;
+ }
+ */
+ }
+ else {
+ logger.info(methodName, duccWorkJob.getDuccId(), messages.fetch("resource not found")
+ +" "+messages.fetchLabel("process")+duccId.getFriendly()
+ +" "+messages.fetchLabel("unique")+duccId.getUnique()
+ +" "+messages.fetchLabel("name")+nodeId.getName()
+ +" "+messages.fetchLabel("ip")+nodeId.getIp());
+ }
+ }
+ }
+ logger.trace(methodName, duccWorkJob.getDuccId(), messages.fetch("exit"));
+ return changes;
+ }
+
+ private int reservationMapResourcesAdd(DuccWorkReservation duccWorkReservation,Map<DuccId,IResource> resourceMap) {
+ String methodName = "reservationMapResourcesAdd";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ int changes = 0;
+ IDuccReservationMap reservationMap = duccWorkReservation.getReservationMap();
+ if(resourceMap != null) {
+ Iterator<DuccId> resourceMapIterator = resourceMap.keySet().iterator();
+ while(resourceMapIterator.hasNext()) {
+ DuccId duccId = resourceMapIterator.next();
+ IResource resource = resourceMap.get(duccId);
+ NodeIdentity nodeId = resource.getNodeId();
+ int shares = resource.countShares();
+ if(!reservationMap.containsKey(duccId)) {
+ DuccReservation reservation = new DuccReservation(duccId, nodeId, shares);
+ reservationMap.addReservation(reservation);
+ logger.info(methodName, duccId, messages.fetch("add resource")+" "+messages.fetchLabel("name")+nodeId.getName()+" "+messages.fetchLabel("ip")+nodeId.getIp());
+ changes++;
+ }
+ else {
+ logger.debug(methodName, duccId, messages.fetch("duplicate resource")+" "+messages.fetchLabel("name")+nodeId.getName()+" "+messages.fetchLabel("ip")+nodeId.getIp());
+ }
+ }
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return changes;
+ }
+
+ private int reservationMapResourcesDel(DuccWorkReservation duccWorkReservation,Map<DuccId,IResource> resourceMap) {
+ String methodName = "processMapResourcesDel";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ int changes = 0;
+ IDuccReservationMap reservationMap = duccWorkReservation.getReservationMap();
+ if(resourceMap != null) {
+ Iterator<DuccId> resourceMapIterator = resourceMap.keySet().iterator();
+ while(resourceMapIterator.hasNext()) {
+ DuccId duccId = resourceMapIterator.next();
+ NodeIdentity nodeId = resourceMap.get(duccId).getNodeId();
+ if(reservationMap.containsKey(duccId)) {
+ reservationMap.removeReservation(duccId);
+ logger.info(methodName, duccId, messages.fetch("remove resource")+" "+messages.fetchLabel("name")+nodeId.getName()+" "+messages.fetchLabel("ip")+nodeId.getIp());
+ changes++;
+ }
+ else {
+ logger.debug(methodName, duccId, messages.fetch("not found resource")+" "+messages.fetchLabel("name")+nodeId.getName()+" "+messages.fetchLabel("ip")+nodeId.getIp());
+ }
+ }
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return changes;
+ }
+
+ /**
+ * SM reconciliation
+ */
+ private String getServiceDependencyMessages(ServiceDependency sd) {
+ String retVal = null;
+ Map<String, String> messages = sd.getMessages();
+ if(messages != null) {
+ StringBuffer sb = new StringBuffer();
+ for(String endpoint : messages.keySet()) {
+ sb.append(endpoint);
+ sb.append(":");
+ sb.append(messages.get(endpoint));
+ sb.append(";");
+ }
+ retVal = sb.toString();
+ }
+ return retVal;
+ }
+
+ public void reconcileState(ServiceMap serviceMap) {
+ String methodName = "reconcileState (SM)";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ int changes = 0;
+ Iterator<DuccId> serviceMapIterator = serviceMap.keySet().iterator();
+ long t0 = System.currentTimeMillis();
+ synchronized(workMap) {
+ while(serviceMapIterator.hasNext()) {
+ DuccId duccId = serviceMapIterator.next();
+ ServiceDependency services = serviceMap.get(duccId);
+ DuccWorkJob duccWorkJob = (DuccWorkJob) workMap.findDuccWork(duccId);
+ if(duccWorkJob != null) {
+ JobState jobState = duccWorkJob.getJobState();
+ ServiceState serviceState = services.getState();
+ switch(jobState) {
+ case Received:
+ logger.warn(methodName, duccId, messages.fetchLabel("unexpected job state")+jobState);
+ break;
+ case WaitingForDriver:
+ logger.warn(methodName, duccId, messages.fetchLabel("unexpected job state")+jobState);
+ break;
+ case WaitingForServices:
+ switch(serviceState) {
+ case Waiting:
+ case Initializing:
+ break;
+ case Available:
+ stateJobAccounting.stateChange(duccWorkJob, JobState.WaitingForResources);
+ changes++;
+ logger.warn(methodName, duccId, messages.fetchLabel("job state")+jobState+" "+messages.fetchLabel("services state")+serviceState);
+ break;
+ case NotAvailable:
+ case Stopping:
+ stateJobAccounting.stateChange(duccWorkJob, JobState.Completing);
+ duccWorkJob.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+ String sdm = getServiceDependencyMessages(services);
+ IRationale rationale = new Rationale();
+ if(sdm != null) {
+ rationale = new Rationale("service manager reported "+sdm);
+ }
+ stateJobAccounting.complete(duccWorkJob, JobCompletionType.ServicesUnavailable, rationale);
+ changes++;
+ logger.warn(methodName, duccId, messages.fetchLabel("job state")+jobState+" "+messages.fetchLabel("services state")+serviceState);
+ break;
+ case Undefined:
+ logger.warn(methodName, duccId, messages.fetchLabel("job state")+jobState+" "+messages.fetchLabel("services state")+serviceState);
+ break;
+ }
+ break;
+ case WaitingForResources:
+ case Initializing:
+ case Running:
+ logger.warn(methodName, duccId, messages.fetchLabel("job state")+jobState+" "+messages.fetchLabel("services state")+serviceState);
+ break;
+ case Completed:
+ logger.debug(methodName, duccId, messages.fetchLabel("job state")+jobState+" "+messages.fetchLabel("services state")+serviceState);
+ break;
+ case Undefined:
+ logger.warn(methodName, duccId, messages.fetchLabel("unexpected job state")+jobState);
+ break;
+ }
+ }
+ else {
+ logger.debug(methodName, duccId, messages.fetch("job not found"));
+ }
+ }
+ if(changes > 0) {
+ OrchestratorCheckpoint.getInstance().saveState();
+ }
+ }
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ }
+
+ /**
+ * Node Inventory reconciliation
+ */
+ public void reconcileState(HashMap<DuccId, IDuccProcess> inventoryProcessMap) {
+ String methodName = "reconcileState (Node Inventory)";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ Iterator<DuccId> iterator = inventoryProcessMap.keySet().iterator();
+ long t0 = System.currentTimeMillis();
+ synchronized(workMap) {
+ while(iterator.hasNext()) {
+ DuccId processId = iterator.next();
+ IDuccProcess inventoryProcess = inventoryProcessMap.get(processId);
+ switch(inventoryProcess.getProcessType()) {
+ case Pop:
+ DuccId jobId = OrchestratorCommonArea.getInstance().getProcessAccounting().getJobId(processId);
+ if(jobId != null) {
+ IDuccWork duccWork = workMap.findDuccWork(jobId);
+ if(duccWork != null) {
+ switch(duccWork.getDuccType()) {
+ case Job:
+ DuccWorkJob job = (DuccWorkJob) duccWork;
+ OrchestratorCommonArea.getInstance().getProcessAccounting().setStatus(inventoryProcess);
+ switch(inventoryProcess.getProcessState()) {
+ case Failed:
+ if(inventoryProcess.getDuccId().getFriendly() == 0) {
+ jobTerminate(job, JobCompletionType.DriverProcessFailed, new Rationale(inventoryProcess.getReasonForStoppingProcess()), inventoryProcess.getProcessDeallocationType());
+ }
+ else {
+ jobTerminate(job, JobCompletionType.ProcessFailure, new Rationale(inventoryProcess.getReasonForStoppingProcess()), inventoryProcess.getProcessDeallocationType());
+ }
+ break;
+ default:
+ if(inventoryProcess.isComplete()) {
+ OrchestratorCommonArea.getInstance().getProcessAccounting().deallocate(job,ProcessDeallocationType.Stopped);
+ completeJob(job, new Rationale("state manager reported as normal completion"));
+ }
+ break;
+ }
+ break;
+ }
+ }
+ }
+ break;
+ case Service:
+ break;
+ case Job_Uima_AS_Process:
+ OrchestratorCommonArea.getInstance().getProcessAccounting().setStatus(inventoryProcess);
+ break;
+ }
+ }
+ }
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ }
+
+ private void completeJob(DuccWorkJob job, IRationale rationale) {
+ switch(job.getCompletionType()) {
+ case Undefined:
+ job.setCompletion(JobCompletionType.EndOfJob, rationale);
+ job.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+ case EndOfJob:
+ if(job.getProcessFailureCount() > 0) {
+ job.setCompletion(JobCompletionType.Error, rationale);
+ }
+ else if(job.getProcessInitFailureCount() > 0) {
+ job.setCompletion(JobCompletionType.Error, rationale);
+ }
+ else {
+ try {
+ if(Integer.parseInt(job.getSchedulingInfo().getWorkItemsError()) > 0) {
+ job.setCompletion(JobCompletionType.Error, rationale);
+ }
+ }
+ catch(Exception e) {
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ switch(job.getJobState()) {
+ case Completing:
+ case Completed:
+ break;
+ default:
+ if(job.getProcessMap().getAliveProcessCount() == 0) {
+ stateJobAccounting.stateChange(job, JobState.Completing);
+ }
+ }
+ }
+
+ public void jobTerminate(IDuccWorkJob job, JobCompletionType jobCompletionType, IRationale rationale, ProcessDeallocationType processDeallocationType) {
+ if(!job.isFinished()) {
+ stateJobAccounting.stateChange(job, JobState.Completing);
+ stateJobAccounting.complete(job, jobCompletionType, rationale);
+ OrchestratorCommonArea.getInstance().getProcessAccounting().deallocate(job,processDeallocationType);
+ job.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
+ }
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Validate.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Validate.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Validate.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Validate.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import java.util.ArrayList;
+import java.util.Properties;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.orchestrator.authentication.DuccWebAdministrators;
+import org.apache.uima.ducc.orchestrator.utilities.MemorySpecification;
+import org.apache.uima.ducc.transport.event.CancelJobDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelServiceDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitJobDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitServiceDuccEvent;
+import org.apache.uima.ducc.transport.event.cli.JobRequestProperties;
+import org.apache.uima.ducc.transport.event.cli.JobSpecificationProperties;
+import org.apache.uima.ducc.transport.event.cli.ReservationRequestProperties;
+import org.apache.uima.ducc.transport.event.common.DuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccSchedulingInfo;
+
+
+public class Validate {
+ private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(Validate.class.getName());
+
+ private static DuccWebAdministrators duccWebAdministrators = new DuccWebAdministrators();
+
+ @SuppressWarnings("unchecked")
+ private static void addError(Properties properties, String reason) {
+ String methodName = "addError";
+ String key = JobSpecificationProperties.key_submit_errors;
+ ArrayList<String> value = (ArrayList<String>) properties.get(key);
+ if(value == null) {
+ value = new ArrayList<String>();
+ properties.put(key, value);
+ }
+ value.add(reason);
+ logger.info(methodName, null, reason);
+ return;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void addWarning(Properties properties, String reason) {
+ String methodName = "addWarning";
+ String key = JobSpecificationProperties.key_submit_warnings;
+ ArrayList<String> value = (ArrayList<String>) properties.get(key);
+ if(value == null) {
+ value = new ArrayList<String>();
+ properties.put(key, value);
+ }
+ value.add(reason);
+ logger.info(methodName, null, reason);
+ return;
+ }
+
+ private static String createReason(String type, String key, String value) {
+ String retVal = type+": "+key+"="+value;
+ return retVal;
+ }
+
+ public static boolean integer(boolean retVal, Properties properties, String key, String defaultValue, String minValue, String maxValue) {
+ String value = (String)properties.get(key);
+ if(value == null) {
+ String reason = createReason("default",key,defaultValue);
+ addWarning(properties,reason);
+ }
+ else {
+ try {
+ int specified_value = Integer.parseInt(value);
+ int min_value = Integer.parseInt(minValue);
+ int max_value = Integer.parseInt(maxValue);
+ if(specified_value < min_value) {
+ String reason = createReason("invalid, under "+minValue,key,value);
+ addError(properties,reason);
+ retVal = false;
+ }
+ if(specified_value > max_value) {
+ String reason = createReason("invalid, above "+maxValue,key,value);
+ addError(properties,reason);
+ retVal = false;
+ }
+ }
+ catch(Exception e) {
+ String reason = createReason("invalid, non-integer",key,value);
+ addError(properties,reason);
+ retVal = false;
+ }
+ }
+ return retVal;
+ }
+ public static boolean request(SubmitJobDuccEvent duccEvent) {
+ boolean retVal = true;
+ JobRequestProperties properties = (JobRequestProperties) duccEvent.getProperties();
+ //
+ retVal = integer(retVal,
+ properties,
+ JobSpecificationProperties.key_process_thread_count,
+ IDuccSchedulingInfo.defaultThreadsPerShare,
+ IDuccSchedulingInfo.minThreadsPerShare,
+ IDuccSchedulingInfo.maxThreadsPerShare);
+ //TODO
+ return retVal;
+ }
+
+ public static boolean request(CancelJobDuccEvent duccEvent) {
+ boolean retVal = true;
+ //TODO
+ return retVal;
+ }
+
+ public static boolean request(SubmitReservationDuccEvent duccEvent) {
+ boolean retVal = true;
+ ReservationRequestProperties properties = (ReservationRequestProperties) duccEvent.getProperties();
+ String key;
+ String value;
+ // memory size
+ key = ReservationRequestProperties.key_number_of_instances;
+ String memorySize = (String) properties.get(key);
+ MemorySpecification memorySpecification = new MemorySpecification(memorySize);
+ value = memorySpecification.getSize();
+ if(value == null) {
+ String reason = createReason("invalid", key, value);
+ addError(properties,reason);
+ retVal = false;
+ }
+ // number of machines
+ key = ReservationRequestProperties.key_number_of_instances;
+ value = (String) properties.get(key);
+ if(value == null) {
+ String reason = createReason("invalid", key, value);
+ addError(properties,reason);
+ retVal = false;
+ }
+ // scheduling class
+ key = ReservationRequestProperties.key_scheduling_class;
+ value = (String) properties.get(key);
+ if(value == null) {
+ String reason = createReason("using", key, "default");
+ addWarning(properties,reason);
+ }
+ return retVal;
+ }
+
+ public static boolean request(CancelReservationDuccEvent duccEvent, DuccWorkReservation duccWorkReservation) {
+ String location = "request";
+ boolean retVal = false;
+ Properties properties = duccEvent.getProperties();
+ String userid = properties.getProperty(JobSpecificationProperties.key_user);
+ String ownerid = duccWorkReservation.getStandardInfo().getUser();
+ if((userid != null) && (userid.equals(ownerid))) {
+ retVal = true;
+ }
+ else if(duccWebAdministrators.isAdministrator(userid)) {
+ retVal = true;
+ }
+ else {
+ String reason;
+ if(userid == null) {
+ reason = createReason("reservation cancel invalid",JobSpecificationProperties.key_user,"unspecified");
+ }
+ else {
+ reason = createReason("reservation cancel unauthorized",JobSpecificationProperties.key_user,userid);
+ }
+ addWarning(properties,reason);
+ logger.warn(location, duccWorkReservation.getDuccId(), reason);
+ }
+ return retVal;
+ }
+
+ public static boolean request(SubmitServiceDuccEvent duccEvent) {
+ boolean retVal = true;
+ JobRequestProperties properties = (JobRequestProperties) duccEvent.getProperties();
+ //
+ retVal = integer(retVal,
+ properties,
+ JobSpecificationProperties.key_process_thread_count,
+ IDuccSchedulingInfo.defaultThreadsPerShare,
+ IDuccSchedulingInfo.minThreadsPerShare,
+ IDuccSchedulingInfo.maxThreadsPerShare);
+ //TODO
+ return retVal;
+ }
+
+ public static boolean request(CancelServiceDuccEvent duccEvent) {
+ boolean retVal = true;
+ //TODO
+ return retVal;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Validate.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/authentication/DuccWebAdministrators.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/authentication/DuccWebAdministrators.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/authentication/DuccWebAdministrators.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/authentication/DuccWebAdministrators.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.authentication;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.TreeMap;
+
+import org.apache.uima.ducc.common.IDuccEnv;
+
+
+public class DuccWebAdministrators {
+
+ private String fileName = IDuccEnv.DUCC_ADMINISTRATORS_FILE;
+
+ private static DuccWebAdministrators duccWebAdministrators = new DuccWebAdministrators();
+
+ public static DuccWebAdministrators getInstance() {
+ return duccWebAdministrators;
+ }
+
+ private Properties load() {
+ Properties properties = new Properties();
+ try {
+ File file = new File(fileName);
+ FileInputStream fis = new FileInputStream(file);
+ properties.load(fis);
+ fis.close();
+ }
+ catch(IOException e) {
+ e.printStackTrace();
+ }
+ return properties;
+ }
+
+ public Iterator<String> getSortedAuthorizedUserids() {
+ TreeMap<String,String> map = new TreeMap<String,String>();
+ Properties properties = load();
+ if(!properties.isEmpty()) {
+ Enumeration<?> enumeration = properties.propertyNames();
+ while(enumeration.hasMoreElements()) {
+ String name = (String)enumeration.nextElement();
+ map.put(name, name);
+ }
+ }
+ Iterator<String> iterator = map.keySet().iterator();
+ return iterator;
+ }
+
+ public String getAuthorizationFileName() {
+ return fileName;
+ }
+
+ public boolean isAdministrator(String userid) {
+ boolean retVal = false;
+ try {
+ if(userid != null) {
+ Properties properties = load();
+ Iterator<Object> iterator = properties.keySet().iterator();
+ while(iterator.hasNext()) {
+ String authorizedUserid = ((String)(iterator.next())).trim();
+ if(userid.trim().equals(authorizedUserid)) {
+ retVal = true;
+ break;
+ }
+ }
+ }
+ }
+ catch(Exception e) {
+ }
+ return retVal;
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/authentication/DuccWebAdministrators.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/config/OrchestratorConfiguration.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/config/OrchestratorConfiguration.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/config/OrchestratorConfiguration.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/config/OrchestratorConfiguration.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.config;
+
+import org.apache.camel.Body;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jetty.JettyHttpComponent;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.config.DuccBlastGuardPredicate;
+import org.apache.uima.ducc.common.exception.DuccRuntimeException;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.XStreamUtils;
+import org.apache.uima.ducc.orchestrator.ORTracer;
+import org.apache.uima.ducc.orchestrator.Orchestrator;
+import org.apache.uima.ducc.orchestrator.OrchestratorCommonArea;
+import org.apache.uima.ducc.orchestrator.OrchestratorComponent;
+import org.apache.uima.ducc.orchestrator.event.OrchestratorEventListener;
+import org.apache.uima.ducc.orchestrator.monitor.Xmon;
+import org.apache.uima.ducc.orchestrator.monitor.Xmon.ExchangeType;
+import org.apache.uima.ducc.orchestrator.monitor.Xmon.LifeStatus;
+import org.apache.uima.ducc.transport.DuccTransportConfiguration;
+import org.apache.uima.ducc.transport.event.CancelJobDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelJobReplyDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelReservationReplyDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelServiceDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelServiceReplyDuccEvent;
+import org.apache.uima.ducc.transport.event.OrchestratorAbbreviatedStateDuccEvent;
+import org.apache.uima.ducc.transport.event.OrchestratorStateDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitJobDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitJobReplyDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitReservationReplyDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitServiceDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitServiceReplyDuccEvent;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+
+@Configuration
+@Import({DuccTransportConfiguration.class,CommonConfiguration.class})
+
+public class OrchestratorConfiguration {
+ // Springframework magic to inject instance of {@link CommonConfiguration}
+ @Autowired CommonConfiguration common;
+ // Springframework magic to inject instance of {@link DuccTransportConfiguration}
+ @Autowired DuccTransportConfiguration orchestratorTransport;
+
+ private DuccLogger duccLogger = DuccLoggerComponents.getOrLogger(OrchestratorConfiguration.class.getName());
+
+ /**
+ * Creates Camel router that will handle incoming request messages. Each message will
+ * be unmarshalled using xstream and delegated to provided {@code OrchestratorEventListener}.
+ *
+ * @param endpoint - endpoint where the job manager expects to receive messages
+ * @param delegate - {@code OrchestratorEventListener} instance to delegate incoming messages
+ * @return
+ */
+
+ public RouteBuilder routeBuilderForEndpoint(final String endpoint, final OrchestratorEventListener delegate) {
+
+ return new RouteBuilder() {
+
+ public void configure() {
+ Xmon xmStart = new Xmon(LifeStatus.Start, ExchangeType.Receive);
+ Xmon xmEnded = new Xmon(LifeStatus.Ended, ExchangeType.Receive);
+ Xmon xmError = new Xmon(LifeStatus.Error, ExchangeType.Receive);
+ onException(Exception.class).handled(true).process(xmError);
+ from(endpoint)
+ .process(xmStart)
+ .bean(delegate)
+ .process(xmEnded)
+ ;
+ }
+ };
+ }
+
+ /**
+ * Creates Camel router that will handle incoming request messages. Each message will
+ * be unmarshalled using xstream and delegated to provided {@code OrchestratorEventListener}.
+ *
+ * @param endpoint - endpoint where the job manager expects to receive messages
+ * @param delegate - {@code OrchestratorEventListener} instance to delegate incoming messages
+ * @return
+ */
+ /*
+ public RouteBuilder routeBuilderForReplyEndpoint(final String endpoint, final OrchestratorEventListener delegate) {
+
+ return new RouteBuilder() {
+ public void configure() {
+ from(endpoint)
+ .unmarshal().xstream()
+ .process(new TransportProcessor()) // intermediate processing before delegating to event listener
+ .bean(delegate)
+ .process(new OrchestratorReplyProcessor()) // inject reply object
+ .marshal().xstream()
+ ;
+ }
+ };
+ }
+ */
+ private RouteBuilder routeBuilder(final CamelContext context, final OrchestratorEventListener delegate) throws Exception {
+
+ return new RouteBuilder() {
+ public void configure() {
+
+ JettyHttpComponent jettyComponent = new JettyHttpComponent();
+
+ Xmon xmStart = new Xmon(LifeStatus.Start, ExchangeType.Receive);
+ Xmon xmEnded = new Xmon(LifeStatus.Ended, ExchangeType.Reply);
+ //ExchangeMonitor xmError = new ExchangeMonitor(LifeStatus.Error, ExchangeType.Receive);
+
+ context.addComponent("jetty", jettyComponent);
+ onException(Throwable.class).maximumRedeliveries(0).handled(false).process(new ErrorProcessor());
+
+ from("jetty://http://0.0.0.0:"+common.duccORHttpPort+"/or")
+ .unmarshal().xstream()
+
+ .process(xmStart)
+ .bean(delegate)
+ .process(new OrchestratorReplyProcessor()) // inject reply object
+ .process(xmEnded)
+ .process(new Processor() {
+
+ public void process(Exchange exchange) throws Exception {
+ exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
+ exchange.getOut().setHeader("content-type", "text/xml");
+ Object o = exchange.getIn().getBody();
+ if ( o != null ) {
+ String body = XStreamUtils.marshall(o);
+ exchange.getOut().setBody(body);
+ exchange.getOut().setHeader("content-length", body.length());
+ } else {
+ duccLogger.warn("RouteBuilder.configure", null, new DuccRuntimeException("Orchestrator Has Not Provided a Reply Object."));
+ exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 500);
+ }
+ }
+ })
+ ;
+ }
+ };
+ }
+
+ private class OrchestratorReplyProcessor implements Processor {
+
+ private OrchestratorReplyProcessor() {
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ Object obj = exchange.getIn().getBody();
+ if(obj instanceof SubmitJobDuccEvent) {
+ SubmitJobDuccEvent submitJobEvent = exchange.getIn().getBody(SubmitJobDuccEvent.class);
+ SubmitJobReplyDuccEvent replyJobEvent = new SubmitJobReplyDuccEvent();
+ replyJobEvent.setProperties(submitJobEvent.getProperties());
+ exchange.getIn().setBody(replyJobEvent);
+ }
+ if(obj instanceof CancelJobDuccEvent) {
+ CancelJobDuccEvent cancelJobEvent = exchange.getIn().getBody(CancelJobDuccEvent.class);
+ CancelJobReplyDuccEvent replyJobEvent = new CancelJobReplyDuccEvent();
+ replyJobEvent.setProperties(cancelJobEvent.getProperties());
+ exchange.getIn().setBody(replyJobEvent);
+ }
+ if(obj instanceof SubmitReservationDuccEvent) {
+ SubmitReservationDuccEvent submitReservationEvent = exchange.getIn().getBody(SubmitReservationDuccEvent.class);
+ SubmitReservationReplyDuccEvent replyReservationEvent = new SubmitReservationReplyDuccEvent();
+ replyReservationEvent.setProperties(submitReservationEvent.getProperties());
+ exchange.getIn().setBody(replyReservationEvent);
+ }
+ if(obj instanceof CancelReservationDuccEvent) {
+ CancelReservationDuccEvent cancelReservationEvent = exchange.getIn().getBody(CancelReservationDuccEvent.class);
+ CancelReservationReplyDuccEvent replyReservationEvent = new CancelReservationReplyDuccEvent();
+ replyReservationEvent.setProperties(cancelReservationEvent.getProperties());
+ exchange.getIn().setBody(replyReservationEvent);
+ }
+ if(obj instanceof SubmitServiceDuccEvent) {
+ SubmitServiceDuccEvent submitServiceEvent = exchange.getIn().getBody(SubmitServiceDuccEvent.class);
+ SubmitServiceReplyDuccEvent replyServiceEvent = new SubmitServiceReplyDuccEvent();
+ replyServiceEvent.setProperties(submitServiceEvent.getProperties());
+ exchange.getIn().setBody(replyServiceEvent);
+ }
+ if(obj instanceof CancelServiceDuccEvent) {
+ CancelServiceDuccEvent cancelServiceEvent = exchange.getIn().getBody(CancelServiceDuccEvent.class);
+ CancelServiceReplyDuccEvent replyServiceEvent = new CancelServiceReplyDuccEvent();
+ replyServiceEvent.setProperties(cancelServiceEvent.getProperties());
+ exchange.getIn().setBody(replyServiceEvent);
+ }
+ }
+ }
+
+ /**
+ * Creates Camel router that will publish Orchestrator state at regular intervals.
+ *
+ * @param targetEndpointToReceiveOrchestratorStateUpdate - endpoint where to publish JM state
+ * @param statePublishRate - how often to publish state
+ * @return
+ * @throws Exception
+ */
+ private RouteBuilder routeBuilderForOrchestratorStatePost(final Orchestrator orchestrator, final String targetEndpointToReceiveOrchestratorStateUpdate, final int statePublishRate) throws Exception {
+ final OrchestratorStateProcessor orchestratorp = // an object responsible for generating the state
+ new OrchestratorStateProcessor(orchestrator);
+
+ return new RouteBuilder() {
+ public void configure() {
+ //Xmon xmStart = new Xmon(LifeStatus.Start, ExchangeType.Send, OrchestratorStateDuccEvent.class);
+ //Xmon xmEnded = new Xmon(LifeStatus.Ended, ExchangeType.Send, OrchestratorStateDuccEvent.class);
+ //Xmon xmError = new Xmon(LifeStatus.Error, ExchangeType.Send, OrchestratorStateDuccEvent.class);
+ //onException(Exception.class).handled(true).process(xmError);
+ String route = "StatePost";
+ ORTracer ort1 = new ORTracer(route+":error");
+ ORTracer ort2 = new ORTracer(route+":begin");
+ ORTracer ort3 = new ORTracer(route+":done:orchestratorp");
+ ORTracer ort4 = new ORTracer(route+":done:to");
+
+ final Predicate blastFilter = new DuccBlastGuardPredicate(duccLogger);
+
+ onException(Exception.class).handled(true).process(ort1);
+ from("timer:orchestratorStateDumpTimer?fixedRate=true&period=" + statePublishRate)
+ // This route uses a filter to prevent sudden bursts of messages which
+ // may flood DUCC daemons causing chaos. The filter disposes any event
+ // that appears in a window of 1 sec or less.
+ .filter(blastFilter)
+ //.process(xmStart)
+ .process(ort2)
+ .process(orchestratorp)
+ //.process(xmEnded)
+ .process(ort3)
+ .to(targetEndpointToReceiveOrchestratorStateUpdate)
+ .process(ort4)
+ ;
+ }
+ };
+ }
+
+ /**
+ * Camel Processor responsible for generating Orchestrator's state.
+ *
+ */
+ private class OrchestratorStateProcessor implements Processor {
+ private Orchestrator orchestrator;
+
+ private OrchestratorStateProcessor(Orchestrator orchestrator) {
+ this.orchestrator = orchestrator;
+ }
+ public void process(Exchange exchange) throws Exception {
+ // Fetch new state from Orchestrator
+ OrchestratorStateDuccEvent jse = orchestrator.getState();
+ // Add the state object to the Message
+ exchange.getIn().setBody(jse);
+ }
+ }
+
+
+ /**
+ * Creates Camel router that will publish Orchestrator abbreviated state at regular intervals.
+ *
+ * @param targetEndpointToReceiveOrchestratorAbbreviatedStateUpdate - endpoint where to publish state
+ * @param statePublishRate - how often to publish state
+ * @return
+ * @throws Exception
+ */
+ private RouteBuilder routeBuilderForOrchestratorAbbreviatedStatePost(final Orchestrator orchestrator, final String targetEndpointToReceiveOrchestratorAbbreviatedStateUpdate, final int statePublishRate) throws Exception {
+ final OrchestratorAbbreviatedStateProcessor orchestratorp = // an object responsible for generating the state
+ new OrchestratorAbbreviatedStateProcessor(orchestrator);
+
+ return new RouteBuilder() {
+ public void configure() {
+ //Xmon xmStart = new Xmon(LifeStatus.Start, ExchangeType.Send, OrchestratorAbbreviatedStateDuccEvent.class);
+ //Xmon xmEnded = new Xmon(LifeStatus.Ended, ExchangeType.Send, OrchestratorAbbreviatedStateDuccEvent.class);
+ //Xmon xmError = new Xmon(LifeStatus.Error, ExchangeType.Send, OrchestratorAbbreviatedStateDuccEvent.class);
+ //onException(Exception.class).handled(true).process(xmError);
+ String route = "AbbreviatedStatePost";
+ ORTracer ort1 = new ORTracer(route+":error");
+ ORTracer ort2 = new ORTracer(route+":begin");
+ ORTracer ort3 = new ORTracer(route+":done:orchestratorp");
+ ORTracer ort4 = new ORTracer(route+":done:to");
+
+ final Predicate blastFilter = new DuccBlastGuardPredicate(duccLogger);
+
+ onException(Exception.class).handled(true).process(ort1);
+ from("timer:orchestratorAbbreviatedStateDumpTimer?fixedRate=true&period=" + statePublishRate)
+ // This route uses a filter to prevent sudden bursts of messages which
+ // may flood DUCC daemons causing chaos. The filter disposes any event
+ // that appears in a window of 1 sec or less.
+ .filter(blastFilter)
+ //.process(xmStart)
+ .process(ort2)
+ .process(orchestratorp)
+ //.process(xmEnded)
+ .process(ort3)
+ .to(targetEndpointToReceiveOrchestratorAbbreviatedStateUpdate)
+ .process(ort4)
+ ;
+ }
+ };
+ }
+
+ /**
+ * Camel Processor responsible for generating Orchestrator's state.
+ *
+ */
+ private class OrchestratorAbbreviatedStateProcessor implements Processor {
+ private Orchestrator orchestrator;
+
+ private OrchestratorAbbreviatedStateProcessor(Orchestrator orchestrator) {
+ this.orchestrator = orchestrator;
+ }
+ public void process(Exchange exchange) throws Exception {
+ // Fetch new state from Orchestrator
+ OrchestratorAbbreviatedStateDuccEvent jse = orchestrator.getAbbreviatedState();
+ // Add the state object to the Message
+ exchange.getIn().setBody(jse);
+ }
+ }
+
+ /**
+ * Instantiate a listener to which Camel will route a body of the incoming message.
+ * The listener should provide a method for each object class it expects to receive.
+ * Camel uses introspection to analyze given listener and find a match based on
+ * what is in the incoming message.
+ *
+ * @return
+ */
+ public OrchestratorEventListener orchestratorDelegateListener(OrchestratorComponent orchestrator) {
+ OrchestratorEventListener orchestratorel = new OrchestratorEventListener(orchestrator);
+ return orchestratorel;
+ }
+
+ @Bean
+ public OrchestratorComponent orchestrator() throws Exception {
+ OrchestratorCommonArea.initialize(common);
+ OrchestratorComponent orchestrator = new OrchestratorComponent(common.camelContext());
+ // Instantiate JobManagerEventListener delegate listener. This listener will receive
+ // incoming messages.
+ OrchestratorEventListener delegateListener = this.orchestratorDelegateListener(orchestrator);
+ // Inject a dispatcher into the listener in case it needs to send
+ // a message to another component
+ delegateListener.setDuccEventDispatcher(orchestratorTransport.duccEventDispatcher(common.pmRequestEndpoint, orchestrator.getContext()));
+// orchestrator.getContext().addRoutes(this.routeBuilderForReplyEndpoint(common.orchestratorRequestEndpoint, delegateListener));
+ orchestrator.getContext().addRoutes(this.routeBuilder(orchestrator.getContext(), delegateListener));
+ orchestrator.getContext().addRoutes(this.routeBuilderForEndpoint(common.rmStateUpdateEndpoint, delegateListener));
+ orchestrator.getContext().addRoutes(this.routeBuilderForEndpoint(common.smStateUpdateEndpoint, delegateListener));
+ orchestrator.getContext().addRoutes(this.routeBuilderForEndpoint(common.jdStateUpdateEndpoint,delegateListener));
+ orchestrator.getContext().addRoutes(this.routeBuilderForEndpoint(common.nodeInventoryEndpoint,delegateListener));
+ orchestrator.getContext().addRoutes(this.routeBuilderForOrchestratorStatePost(orchestrator, common.orchestratorStateUpdateEndpoint, Integer.parseInt(common.orchestratorStatePublishRate)));
+ orchestrator.getContext().addRoutes(this.routeBuilderForOrchestratorAbbreviatedStatePost(orchestrator, common.orchestratorAbbreviatedStateUpdateEndpoint, Integer.parseInt(common.orchestratorAbbreviatedStatePublishRate)));
+ return orchestrator;
+ }
+ public class ErrorProcessor implements Processor {
+
+ public void process(Exchange exchange) throws Exception {
+ // the caused by exception is stored in a property on the exchange
+ Throwable caused = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
+ exchange.getOut().setBody(caused); //XStreamUtils.marshall(caused));
+ caused.printStackTrace();
+ }
+ }
+ public class ServiceRequestHandler {
+ public void handleRequest(@Body SubmitJobDuccEvent jobSubmit) throws Exception {
+ // public void handleRequest(@Body ErrorProcessor jobSubmit) throws Exception {
+ System.out.println("ServiceRequestHandler Received Request of type: "+jobSubmit.getClass().getName());
+ synchronized(this) {
+ this.wait(2000);
+ }
+ }
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/config/OrchestratorConfiguration.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/JdStateEventLogger.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/JdStateEventLogger.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/JdStateEventLogger.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/JdStateEventLogger.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.event;
+
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.orchestrator.OrchestratorCommonArea;
+import org.apache.uima.ducc.transport.event.JdStateDuccEvent;
+
+
+public class JdStateEventLogger {
+
+ private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(JdStateEventLogger.class.getName());
+
+ private static final OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+ private static final Messages messages = orchestratorCommonArea.getSystemMessages();
+
+ public static void receiver(JdStateDuccEvent jdStateDuccEvent) {
+ String methodName = "receiver";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/JdStateEventLogger.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/NodeInventoryEventLogger.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/NodeInventoryEventLogger.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/NodeInventoryEventLogger.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/NodeInventoryEventLogger.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.event;
+
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.orchestrator.OrchestratorCommonArea;
+import org.apache.uima.ducc.transport.event.NodeInventoryUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+
+
+public class NodeInventoryEventLogger {
+
+ private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(NodeInventoryEventLogger.class.getName());
+
+ private static final OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+ private static final Messages messages = orchestratorCommonArea.getSystemMessages();
+
+ public static void receiver(NodeInventoryUpdateDuccEvent nodeInventoryUpdateDuccEvent) {
+ String methodName = "receiver";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ HashMap<DuccId, IDuccProcess> processMap = nodeInventoryUpdateDuccEvent.getProcesses();
+ if(processMap != null) {
+ logger.debug(methodName, null, processMap.size());
+ Iterator<DuccId> iterator = processMap.keySet().iterator();
+ while(iterator.hasNext()) {
+ DuccId processId = iterator.next();
+ DuccId jobId = orchestratorCommonArea.getProcessAccounting().getJobId(processId);
+ IDuccProcess process = processMap.get(processId);
+ String processState = ""+process.getProcessState();
+ String nodeIdentity = ""+process.getNodeIdentity();
+ String PID = ""+process.getPID();
+ String resourceState = ""+process.getResourceState();
+ logger.debug(methodName, jobId, processId, processState+" "+resourceState+" "+nodeIdentity+" "+PID);
+ }
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/NodeInventoryEventLogger.java
------------------------------------------------------------------------------
svn:eol-style = native