You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:37:56 UTC

svn commit: r1427956 [4/4] - in /uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator: main/ main/java/ main/java/org/ main/java/org/apache/ main/java/org/apache/uima/ main/java/org/apache/uima/ducc/ main/java/org/apache/uima/ducc/orchestrator/ main/jav...

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/OrchestratorEventListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/OrchestratorEventListener.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/OrchestratorEventListener.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/OrchestratorEventListener.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.event;
+
+import org.apache.camel.Body;
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.orchestrator.Orchestrator;
+import org.apache.uima.ducc.orchestrator.OrchestratorCommonArea;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.CancelJobDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelServiceDuccEvent;
+import org.apache.uima.ducc.transport.event.JdStateDuccEvent;
+import org.apache.uima.ducc.transport.event.NodeInventoryUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.RmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.SmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitJobDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitServiceDuccEvent;
+import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
+
+
+public class OrchestratorEventListener implements DuccEventDelegateListener {
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(OrchestratorEventListener.class.getName());
+
+	private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+	private Messages messages = orchestratorCommonArea.getSystemMessages();
+	
+	private Orchestrator orchestrator;
+	
+	public OrchestratorEventListener(Orchestrator orchestrator) {
+		this.orchestrator = orchestrator;
+	}
+	public void setDuccEventDispatcher( DuccEventDispatcher eventDispatcher ) {
+	}
+	public void setEndpoint( String endpoint ) {
+	}
+	public void onSubmitJobEvent(@Body SubmitJobDuccEvent duccEvent) throws Exception {
+		String methodName = "onSubmitJobEvent";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		try {
+			orchestrator.startJob(duccEvent);
+		}
+		catch(Throwable t) {
+			logger.error(methodName, null, t);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	public void onCancelJobEvent(@Body CancelJobDuccEvent duccEvent) throws Exception {
+		String methodName = "onCancelJobEvent";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		try {
+			orchestrator.stopJob(duccEvent);
+		}
+		catch(Throwable t) {
+			logger.error(methodName, null, t);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	public void onSubmitReservationEvent(@Body SubmitReservationDuccEvent duccEvent) throws Exception {
+		String methodName = "onSubmitReservationEvent";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		try {
+			orchestrator.startReservation(duccEvent);
+		}
+		catch(Throwable t) {
+			logger.error(methodName, null, t);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	public void onCancelReservationEvent(@Body CancelReservationDuccEvent duccEvent) throws Exception {
+		String methodName = "onCancelReservationEvent";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		try {
+			orchestrator.stopReservation(duccEvent);
+		}
+		catch(Throwable t) {
+			logger.error(methodName, null, t);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	public void onSubmitServiceEvent(@Body SubmitServiceDuccEvent duccEvent) throws Exception {
+		String methodName = "onSubmitServiceEvent";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		try {
+			orchestrator.startService(duccEvent);
+		}
+		catch(Throwable t) {
+			logger.error(methodName, null, t);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	public void onCancelServiceEvent(@Body CancelServiceDuccEvent duccEvent) throws Exception {
+		String methodName = "onCancelServiceEvent";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		try {
+			orchestrator.stopService(duccEvent);
+		}
+		catch(Throwable t) {
+			logger.error(methodName, null, t);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	public void onSmStateUpdateEvent(@Body SmStateDuccEvent duccEvent) throws Exception {
+		String methodName = "onSmStateUpdateEvent";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		try {
+			orchestrator.reconcileSmState(duccEvent);
+		}
+		catch(Throwable t) {
+			logger.error(methodName, null, t);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	public void onRmStateUpdateEvent(@Body RmStateDuccEvent duccEvent) throws Exception {
+		String methodName = "onRmStateUpdateEvent";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		try {
+			RMStateEventLogger.receiver(duccEvent);
+			orchestrator.reconcileRmState(duccEvent);
+		}
+		catch(Throwable t) {
+			logger.error(methodName, null, t);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	public void onJdStateUpdateEvent(@Body JdStateDuccEvent duccEvent) throws Exception {
+		String methodName = "onJdStateUpdateEvent";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		try {
+			JdStateEventLogger.receiver(duccEvent);
+			orchestrator.reconcileJdState(duccEvent);
+		}
+		catch(Throwable t) {
+			logger.error(methodName, null, t);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+	public void onNodeInventoryUpdateDuccEvent(@Body NodeInventoryUpdateDuccEvent duccEvent) throws Exception {
+		String methodName = "onNodeInventoryUpdateDuccEvent";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		try {
+			NodeInventoryEventLogger.receiver(duccEvent);
+			orchestrator.reconcileNodeInventory(duccEvent);
+		}
+		catch(Throwable t) {
+			logger.error(methodName, null, t);
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/OrchestratorEventListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/RMStateEventLogger.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/RMStateEventLogger.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/RMStateEventLogger.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/RMStateEventLogger.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.event;
+
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.orchestrator.OrchestratorCommonArea;
+import org.apache.uima.ducc.transport.event.RmStateDuccEvent;
+
+
+public class RMStateEventLogger {
+	
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(RMStateEventLogger.class.getName());
+	
+	private static final OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+	private static final Messages messages = orchestratorCommonArea.getSystemMessages();
+	
+	public static void receiver(RmStateDuccEvent rmStateDuccEvent) {
+		String methodName = "receiver";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return;
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/RMStateEventLogger.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/HealthMonitor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/HealthMonitor.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/HealthMonitor.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/HealthMonitor.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.maintenance;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.orchestrator.Constants;
+import org.apache.uima.ducc.orchestrator.OrchestratorCheckpoint;
+import org.apache.uima.ducc.orchestrator.OrchestratorCommonArea;
+import org.apache.uima.ducc.orchestrator.StateManager;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+import org.apache.uima.ducc.transport.event.common.DuccWorkPopDriver;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.IRationale;
+import org.apache.uima.ducc.transport.event.common.Rationale;
+import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.JobCompletionType;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ProcessDeallocationType;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ResourceState;
+
+
+public class HealthMonitor {
+	
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(HealthMonitor.class.getName());
+	
+	private static HealthMonitor healthMonitor = new HealthMonitor();
+	
+	public static HealthMonitor getInstance() {
+		return healthMonitor;
+	}
+	
+	private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+	private Messages messages = orchestratorCommonArea.getSystemMessages();
+	private DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
+
+	private boolean isCancelJobExcessiveInitializationFailures(IDuccWorkJob job) {
+		String methodName = "isCancelJobExcessiveInitializationFailures";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		boolean ckpt = false;
+		if(!job.isInitialized()) {
+			long count = job.getProcessInitFailureCount();
+			long limit = job.getProcessInitFailureLimit();
+			if(count >= limit) {
+				IRationale rationale = new Rationale("health monitor detected job initialization failures limit reached:"+limit);
+				StateManager.getInstance().jobTerminate(job, JobCompletionType.ProcessInitializationFailure, rationale, ProcessDeallocationType.JobCanceled);
+				logger.info(methodName, job.getDuccId(), JobCompletionType.ProcessInitializationFailure);
+				ckpt = true;
+			}
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return ckpt;
+	}
+	
+	private boolean isCancelJobExcessiveProcessFailures(IDuccWorkJob job) {
+		String methodName = "isCancelJobExcessiveProcessFailures";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		boolean ckpt = false;
+		long processFailureCount = job.getProcessFailureCount();
+		if(processFailureCount > 0) {
+			long limit = job.getProcessFailureLimit();
+			if(job.isInitialized()) {
+				if(processFailureCount >= limit) {
+					IRationale rationale = new Rationale("health monitor detected job process failures limit reached:"+limit);
+					StateManager.getInstance().jobTerminate(job, JobCompletionType.ProcessFailure, rationale, ProcessDeallocationType.JobCanceled);
+					logger.info(methodName, job.getDuccId(), JobCompletionType.ProcessFailure);
+					ckpt = true;
+				}
+			}
+			else {
+				IRationale rationale = new Rationale("health monitor detected job process failure during initialization of first process");
+				StateManager.getInstance().jobTerminate(job, JobCompletionType.ProcessInitializationFailure, rationale, ProcessDeallocationType.JobCanceled);
+				logger.info(methodName, job.getDuccId(), JobCompletionType.ProcessInitializationFailure);
+				ckpt = true;
+			}
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return ckpt;
+	}
+	
+	private boolean isCancelJobDriverProcessFailed(IDuccWorkJob job) {
+		String methodName = "isCancelJobDriverProcessFailed";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		boolean ckpt = false;
+		if(!job.isFinished()) {
+			DuccWorkPopDriver driver = job.getDriver();
+			IDuccProcessMap processMap = driver.getProcessMap();
+			if(processMap != null) {
+				Collection<IDuccProcess> processCollection = processMap.values();
+				Iterator<IDuccProcess> iterator = processCollection.iterator();
+				while(iterator.hasNext()) {
+					IDuccProcess process = iterator.next();
+					if(process.isComplete()) {
+						IRationale rationale = new Rationale("health monitor detected job driver failed unexpectedly");
+						StateManager.getInstance().jobTerminate(job, JobCompletionType.DriverProcessFailed, rationale, ProcessDeallocationType.JobCanceled);
+						logger.info(methodName, job.getDuccId(), JobCompletionType.DriverProcessFailed);
+						ckpt = true;
+						break;
+					}
+				}
+			}
+			if(job.getProcessMap().getAliveProcessCount() == 0) {
+				job.getSchedulingInfo().setWorkItemsDispatched("0");
+			}
+		}
+		else {
+			if(!job.getSchedulingInfo().getWorkItemsDispatched().equals("0")) {
+				job.getSchedulingInfo().setWorkItemsDispatched("0");
+				logger.info(methodName, job.getDuccId(), "dispatched set to 0");
+				ckpt = true;
+			}
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return ckpt;
+	}
+	
+	private boolean isDriverCompleted(IDuccWorkJob job) {
+		String methodName = "isDriverCompleted";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		boolean ckpt = false;
+		if(job.isFinished()) {
+			DuccWorkPopDriver driver = job.getDriver();
+			IDuccProcessMap processMap = driver.getProcessMap();
+			if(processMap != null) {
+				Collection<IDuccProcess> processCollection = processMap.values();
+				Iterator<IDuccProcess> iterator = processCollection.iterator();
+				while(iterator.hasNext()) {
+					IDuccProcess process = iterator.next();
+					if(!process.isDeallocated()) {
+						process.setResourceState(ResourceState.Deallocated);
+						process.setProcessDeallocationType(ProcessDeallocationType.JobCompleted);
+						logger.info(methodName, job.getDuccId(), process.getDuccId(), ProcessDeallocationType.JobCompleted);
+						ckpt = true;
+					}
+					else {
+						if(!process.isComplete()) {
+							String nodeName = process.getNodeIdentity().getName();
+							if(!NodeAccounting.getInstance().isAlive(nodeName)) {
+								process.advanceProcessState(ProcessState.Stopped);
+								logger.info(methodName, job.getDuccId(), process.getDuccId(), ProcessState.Stopped);
+								ckpt = true;
+							}
+						}
+					}
+				}
+			}
+		}
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return ckpt;
+	}
+	
+	private void ajudicateJobs() {
+		String methodName = "ajudicateJobs";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		long t0 = System.currentTimeMillis();
+		try {
+			Set<DuccId> jobKeySet = workMap.getJobKeySet();
+			boolean ckpt = false;
+			for(DuccId jobId : jobKeySet) {
+				try {
+					IDuccWorkJob job = (IDuccWorkJob) workMap.findDuccWork(jobId);
+					if(isDriverCompleted(job)) {
+						ckpt = true;
+					}
+					if(isCancelJobExcessiveProcessFailures(job)) {
+						ckpt = true;
+					}
+					else if(isCancelJobDriverProcessFailed(job)) {
+						ckpt = true;
+					}
+					long cap = job.getProcessInitFailureCap();
+					// if an initialization cap was specified
+					if(cap > 0) {
+						long initFails = job.getProcessInitFailureCount();
+						// if current number of initialization failures exceeds specified cap
+						if(initFails > cap) {
+							// set job's max shares to -1, indicating stop process expansion to RM
+							job.getSchedulingInfo().setLongSharesMax(-1);
+						}
+					}
+				}
+				catch(Exception e) {
+					logger.error(methodName, null, e);
+				}
+			}
+			if(ckpt) {
+				OrchestratorCheckpoint.getInstance().saveState();
+			}
+		}
+		catch(Throwable t) {
+			logger.error(methodName, null, t);
+		}
+		long t1 = System.currentTimeMillis();
+		long elapsed = t1 - t0;
+		if(elapsed > Constants.SYNC_LIMIT) {
+			logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+		}		
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return;
+	}
+	
+	private void ajudicateServices() {
+		String methodName = "ajudicateServices";
+		logger.trace(methodName, null, messages.fetch("enter"));
+		long t0 = System.currentTimeMillis();
+		try {
+			Set<DuccId> serviceKeySet = workMap.getServiceKeySet();
+			boolean ckpt = false;
+			for(DuccId serviceId : serviceKeySet) {
+				try {
+					IDuccWorkJob service = (IDuccWorkJob) workMap.findDuccWork(serviceId);
+					if(isCancelJobExcessiveProcessFailures(service)) {
+						ckpt = true;
+					}
+					else if(isCancelJobExcessiveInitializationFailures(service)) {
+						ckpt = true;
+					}
+					long cap = service.getProcessInitFailureCap();
+					// if an initialization cap was specified
+					if(cap > 0) {
+						long initFails = service.getProcessInitFailureCount();
+						// if current number of initialization failures exceeds specified cap
+						if(initFails > cap) {
+							// set job's max shares to -1, indicating stop process expansion to RM
+							service.getSchedulingInfo().setLongSharesMax(-1);
+						}
+					}
+				}
+				catch(Exception e) {
+					logger.error(methodName, null, e);
+				}
+			}
+			if(ckpt) {
+				OrchestratorCheckpoint.getInstance().saveState();
+			}
+		}
+		catch(Throwable t) {
+			logger.error(methodName, null, t);
+		}
+		long t1 = System.currentTimeMillis();
+		long elapsed = t1 - t0;
+		if(elapsed > Constants.SYNC_LIMIT) {
+			logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+		}		
+		logger.trace(methodName, null, messages.fetch("exit"));
+		return;
+	}
+	
+	public void ajudicate() {
+		ajudicateJobs();
+		ajudicateServices();
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/HealthMonitor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/MaintenanceThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/MaintenanceThread.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/MaintenanceThread.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/MaintenanceThread.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.maintenance;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.orchestrator.OrchestratorCommonArea;
+import org.apache.uima.ducc.orchestrator.StateManager;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+
+
+public class MaintenanceThread extends Thread {
+	
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(MaintenanceThread.class.getName());
+	
+	private static MaintenanceThread instance = new MaintenanceThread();
+	
+	public static MaintenanceThread getInstance() {
+		return instance;
+	}
+	
+	private static DuccId jobid = null;
+	
+	private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+	private DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
+	
+	private StateManager stateManager = StateManager.getInstance();
+	private HealthMonitor healthMonitor = HealthMonitor.getInstance();
+	private MqReaper mqReaper = MqReaper.getInstance();
+	
+	private long minMillis = 1000;
+	private long wakeUpMillis = 2*60*1000;
+	
+	private long sleepTime = wakeUpMillis;
+	private long lastTime = System.currentTimeMillis();;
+	
+	private boolean die = false;
+	
+	private MaintenanceThread() {
+		initialize();
+	}
+	
+	private void initialize() {
+		String location = "initialize";
+		String maintenance_rate = DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_orchestrator_maintenance_rate);
+		if(maintenance_rate != null) {
+			try {
+				long rate = Long.parseLong(maintenance_rate);
+				if(rate < minMillis) {
+					logger.error(location, jobid, DuccPropertiesResolver.ducc_orchestrator_maintenance_rate+" < minimum of "+minMillis);
+				}
+				else {
+					wakeUpMillis = rate;
+					sleepTime = wakeUpMillis;
+				}
+			}
+			catch(Throwable t) {
+				logger.error(location, jobid, t);
+			}
+		}
+		logger.info(location, jobid, "rate:"+wakeUpMillis);
+	}
+	
+	private boolean isTime() {
+		boolean retVal = true;
+		long currTime = System.currentTimeMillis();
+		long diffTime = currTime - lastTime;
+		if(diffTime < wakeUpMillis) {
+			retVal = false;
+			sleepTime = diffTime;
+		}
+		else {
+			lastTime = currTime;
+			sleepTime = wakeUpMillis;
+		}
+		return retVal;
+	}
+	
+	public void run() {
+		String location = "run";
+		logger.trace(location, jobid, "enter");
+		while(!die) {
+			try {
+				if(isTime()) {
+					stateManager.prune(workMap);
+					healthMonitor.ajudicate();
+					mqReaper.removeUnusedJdQueues(workMap);
+				}
+			}
+			catch(Throwable t) {
+				logger.error(location, jobid, t);
+			}
+			try {
+				Thread.sleep(sleepTime);
+			}
+			catch(Throwable t) {
+				logger.error(location, jobid, t);
+			}
+		}
+		logger.trace(location, jobid, "exit");
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/MaintenanceThread.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/MqReaper.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/MqReaper.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/MqReaper.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/MqReaper.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.maintenance;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.Iterator;
+
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+
+
+public class MqReaper {
+	
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(MqReaper.class.getName());
+	
+	private static MqReaper mqReaper = new MqReaper();
+	
+	public static MqReaper getInstance() {
+		return mqReaper;
+	}
+	
+	private String ducc_broker_name = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_broker_name);
+	private String ducc_broker_jmx_port = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_broker_jmx_port);
+	private String ducc_broker_url = "service:jmx:rmi:///jndi/rmi://"+ducc_broker_name+":"+ducc_broker_jmx_port+"/jmxrmi";
+	private String ducc_jd_queue_prefix = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_jd_queue_prefix);
+	private String objectName = "org.apache.activemq:BrokerName="+ducc_broker_name+",Type=Broker";
+	
+	private JMXServiceURL url;
+	private JMXConnector jmxc;
+	private MBeanServerConnection conn;
+	private ObjectName activeMQ;
+	private BrokerViewMBean mbean;
+	
+	private boolean mqConnected = false;
+	
+	public MqReaper() {
+		init();
+	}
+	
+	private void init() {
+		String location = "init";
+		if(ducc_broker_name == null) {
+			ducc_broker_name = "localhost";
+		}
+		if(ducc_broker_jmx_port == null) {
+			ducc_broker_jmx_port = "1099";
+		}
+		if(ducc_jd_queue_prefix == null) {
+			ducc_broker_jmx_port = "ducc.jd.queue.";
+		}
+		logger.info(location,null,DuccPropertiesResolver.ducc_broker_name+":"+ducc_broker_name);
+		logger.info(location,null,DuccPropertiesResolver.ducc_broker_jmx_port+":"+ducc_broker_jmx_port);
+		logger.info(location,null,DuccPropertiesResolver.ducc_broker_url+":"+ducc_broker_url);
+		logger.info(location,null,DuccPropertiesResolver.ducc_jd_queue_prefix+":"+ducc_jd_queue_prefix);
+		logger.info(location,null,"objectName"+":"+objectName);
+	}
+	
+	private boolean mqConnect() {
+		String location = "mqConnect";
+		if(!mqConnected) {
+			try {
+				url = new JMXServiceURL(ducc_broker_url);
+			} 
+			catch (MalformedURLException e) {
+				logger.error(location, null, e);
+				return mqConnected;
+			}
+			try {
+				jmxc = JMXConnectorFactory.connect(url);
+			} 
+			catch (IOException e) {
+				logger.error(location, null, e);
+				return mqConnected;
+			}
+			try {
+				conn = jmxc.getMBeanServerConnection();
+			} 
+			catch (IOException e) {
+				logger.error(location, null, e);
+				return mqConnected;
+			}
+			try {
+				activeMQ = new ObjectName(objectName);
+			} 
+			catch (MalformedObjectNameException e) {
+				logger.error(location, null, e);
+				return mqConnected;
+			} 
+			catch (NullPointerException e) {
+				logger.error(location, null, e);
+				return mqConnected;
+			}
+			mbean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, activeMQ, BrokerViewMBean.class, true);
+			mqConnected = true;
+		}
+		return mqConnected;
+	}
+	
+	private boolean isEqual(String a, String b) {
+		boolean retVal = false;
+		if(a != null) {
+			if(b != null) {
+				if(a.equals(b)) {
+					retVal = true;
+				}
+			}
+		}
+		return retVal;
+	}
+	
+	private boolean isStartsWith(String a, String b) {
+		boolean retVal = false;
+		if(a != null) {
+			if(b != null) {
+				if(a.startsWith(b)) {
+					retVal = true;
+				}
+			}
+		}
+		return retVal;
+	}
+	
+	public ArrayList<String> getJdQueues() {
+		String location = "getJdQueues";
+		ArrayList<String> jdQueues = new ArrayList<String>();
+		if(mqConnect()) {
+			try {
+			ObjectName[] queues = mbean.getQueues();
+				for( ObjectName queue : queues ) {
+					Hashtable<String, String> propertyTable = queue.getKeyPropertyList();
+					if(propertyTable != null) {
+						String type = propertyTable.get("Type");
+						String destination = propertyTable.get("Destination");
+						if(isEqual(type, "Queue")) {
+							if(isStartsWith(destination, ducc_jd_queue_prefix)) {
+								logger.trace(location, null, "consider:"+destination);
+								jdQueues.add(destination);
+							}
+							else {
+								logger.trace(location, null, "skip:"+destination);
+							}
+						}
+						else {
+							logger.trace(location, null, "type:"+type+" "+"destination:"+destination);
+						}
+					}
+					else {
+						logger.trace(location, null, "propertyTable:"+propertyTable);
+					}
+				}
+			}
+			catch(Throwable t) {
+				logger.trace(location, null, t);
+			}
+		}
+		return jdQueues;
+	}
+	
+	public void removeUnusedJdQueues(DuccWorkMap workMap) {
+		String location = "removeUnusedJdQueues";
+		try {
+			ArrayList<String> queues = getJdQueues();
+			Iterator<DuccId> iterator = workMap.getJobKeySet().iterator();
+			while( iterator.hasNext() ) {
+				DuccId jobId = iterator.next();
+				String jqKeep = ducc_jd_queue_prefix+jobId.getFriendly();
+				if(queues.remove(jqKeep)) {
+					logger.debug(location, null, "queue keep:"+jqKeep);
+				}
+				else {
+					logger.trace(location, null, "queue not found:"+jqKeep);
+				}
+			}
+			for( String jqDiscard : queues ) {
+				logger.info(location, null, "queue discard:"+jqDiscard);
+				try {
+					mbean.removeQueue(jqDiscard); 
+				}
+				catch(Throwable t) {
+					logger.error(location, null, t);
+					mqConnected = false;
+				}
+			}
+		}
+		catch(Throwable t) {
+			logger.error(location, null, t);
+		}
+	}
+	
+	public static void main(String[] args) {
+		MqReaper mqr = MqReaper.getInstance();
+		DuccWorkMap workMap = new DuccWorkMap();
+		mqr.removeUnusedJdQueues(workMap);
+	}
+	
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/MqReaper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/NodeAccounting.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/NodeAccounting.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/NodeAccounting.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/NodeAccounting.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.maintenance;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+
+
+public class NodeAccounting {
+	
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(NodeAccounting.class.getName());
+	private static final DuccId jobid = null;
+	
+	private static NodeAccounting instance = new NodeAccounting();
+	
+	public static NodeAccounting getInstance() {
+		return instance;
+	}
+	
+	private ConcurrentHashMap<String, Long> timeMap = new ConcurrentHashMap<String, Long>();
+	
+	private long inventoryRate = 30 * 1000;
+	private long inventorySkip = 0;
+	
+	private long heartbeatMissingTolerance = 3;
+	
+	private boolean inventoryRateMessage = false;
+	private boolean inventorySkipMessage = false;
+	
+	private long getRate() {
+		String methodName = "getRate";
+		long retVal = inventoryRate;
+		try {
+			String property_inventory_rate = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_agent_node_inventory_publish_rate);
+			if(property_inventory_rate == null) {
+				property_inventory_rate = ""+inventoryRate;
+			}
+			long property_value = Long.parseLong(property_inventory_rate.trim());
+			if(property_value != inventoryRate) {
+				inventoryRate = property_value;
+				logger.info(methodName, jobid, "rate:"+inventoryRate);
+			}
+			retVal = property_value;
+		}
+		catch(Throwable t) {
+			if(!inventoryRateMessage) {
+				inventoryRateMessage = true;
+				logger.warn(methodName, jobid, t);
+			}
+		}
+		return retVal;
+	}
+	
+	private long getSkip() {
+		String methodName = "getSkip";
+		long retVal = inventorySkip;
+		try {
+			String property_inventory_skip = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_agent_node_inventory_publish_rate_skip);
+			if(property_inventory_skip == null) {
+				property_inventory_skip = ""+inventorySkip;
+			}
+			long property_value = Long.parseLong(property_inventory_skip.trim());
+			if(property_value != inventorySkip) {
+				inventorySkip = property_value;
+				logger.info(methodName, jobid, "skip:"+inventorySkip);
+			}
+			retVal = property_value;
+		}
+		catch(Throwable t) {
+			if(!inventorySkipMessage) {
+				inventorySkipMessage = true;
+				logger.warn(methodName, jobid, t);
+			}
+		}
+		return retVal;
+	}
+	
+	private long getNodeMissingTime() {
+		String methodName = "getNodeMissingTime";
+		long retVal = inventoryRate * heartbeatMissingTolerance;
+		try {
+			long rate = getRate();
+			long skip = getSkip();
+			if(skip > 0) {
+				rate = rate * skip;
+			}
+			retVal = rate *  heartbeatMissingTolerance;
+		}
+		catch(Throwable t) {
+			logger.error(methodName, jobid, t);
+		}
+		return retVal;
+	}
+	
+	public void heartbeat(HashMap<DuccId,IDuccProcess> processMap) {
+		String location = "heartbeat";
+		try {
+			Iterator<DuccId> iterator = processMap.keySet().iterator();
+			while(iterator.hasNext()) {
+				DuccId duccId = iterator.next();
+				IDuccProcess process = processMap.get(duccId);
+				NodeIdentity nodeIdentity = process.getNodeIdentity();
+				String nodeName = nodeIdentity.getName();
+				heartbeat(nodeName);
+				break;
+			}
+		}
+		catch(Throwable t) {
+			logger.error(location, jobid, "");
+		}
+	}
+	
+	public void heartbeat(String nodeName) {
+		String location = "heartbeat";
+		record(nodeName);
+		logger.debug(location, jobid, nodeName);
+	}
+	
+	private void record(String nodeName) {
+		if(nodeName != null) {
+			Long value = new Long(System.currentTimeMillis());
+			timeMap.put(nodeName, value);
+		}
+	}
+	
+	public boolean isAlive(String nodeName) {
+		String location = "isAlive";
+		boolean retVal = true;
+		try {
+			if(!timeMap.containsKey(nodeName)) {
+				record(nodeName);
+			}
+			long heartbeatTime = timeMap.get(nodeName);
+			long currentTime = System.currentTimeMillis();
+			long elapsed = currentTime - heartbeatTime;
+			if( elapsed > getNodeMissingTime() ) {
+				retVal = false;
+				logger.info(location, jobid, "down:"+nodeName+" elapsed:"+elapsed);
+			}
+		}
+		catch(Throwable t) {
+			logger.error(location, jobid, nodeName);
+		}
+		return retVal;
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/NodeAccounting.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/monitor/Xmon.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/monitor/Xmon.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/monitor/Xmon.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/monitor/Xmon.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.monitor;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.CancelJobDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelServiceDuccEvent;
+import org.apache.uima.ducc.transport.event.DuccEvent;
+import org.apache.uima.ducc.transport.event.JdStateDuccEvent;
+import org.apache.uima.ducc.transport.event.NodeInventoryUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.OrchestratorAbbreviatedStateDuccEvent;
+import org.apache.uima.ducc.transport.event.OrchestratorStateDuccEvent;
+import org.apache.uima.ducc.transport.event.RmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.SmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitJobDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitServiceDuccEvent;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.jd.DriverStatusReport;
+import org.apache.uima.ducc.transport.event.jd.IDriverState.DriverState;
+import org.apache.uima.ducc.transport.event.rm.IRmJobState;
+import org.apache.uima.ducc.transport.event.sm.ServiceMap;
+
+
+public class Xmon implements Processor {
+
+	private static final DuccLogger logger = DuccLogger.getLogger(Xmon.class, "XM");
+	private static final DuccId duccId = null;
+	
+	public static enum LifeStatus { Start, Ended, Error, Timer };
+	public static enum ExchangeType { Send, Receive, Reply };
+	
+	private static enum LogType { Debug, Info, Warn, Error };
+	
+	private static LogType defaultLogType = LogType.Info;
+	
+	private static AtomicLong sequence = new AtomicLong(-1);
+	
+	private LifeStatus lifeStatus;
+	private ExchangeType exchangeType;
+	@SuppressWarnings("rawtypes")
+	private Class xclass;
+	
+	private static long ReasonableElapsedMillis = 10 * 1000;
+	
+	private static String keySequence  = "ducc.exchange.monitor.sequencer";
+	private static String keyStartMillis  = "ducc.exchange.monitor.start.millis";
+	
+	private void putSequence(Exchange exchange) {
+		String key = keySequence;
+		String value = String.format("%06d", sequence.incrementAndGet());
+		exchange.setProperty(key, value);
+	}
+	
+	private String getSequence(Exchange exchange) {
+		String key = keySequence;
+		String value = (String)exchange.getProperty(key);
+		return value;
+	}
+
+	private void putStartMillis(Exchange exchange) {
+		String key = keyStartMillis;
+		Long value = new Long(System.currentTimeMillis());
+		exchange.setProperty(key, value);
+	}
+	
+	private Long getStartMillis(Exchange exchange) {
+		String key = keyStartMillis;
+		Long value = (Long)exchange.getProperty(key);
+		if(value == null) {
+			value = new Long(System.currentTimeMillis());
+		}
+		return value;
+	}
+
+	public Xmon(LifeStatus lifeStatus, ExchangeType exchangeType) {
+		setLifeStatus(lifeStatus);
+		setExchangeType(exchangeType);
+	}
+	
+	public Xmon(LifeStatus lifeStatus, ExchangeType exchangeType, @SuppressWarnings("rawtypes") Class xclass) {
+		setLifeStatus(lifeStatus);
+		setExchangeType(exchangeType);
+		setClass(xclass);
+	}
+	
+
+	private LifeStatus getLifeStatus() {
+		return this.lifeStatus;
+	}
+	
+	private void setLifeStatus(LifeStatus lifeStatus) {
+		this.lifeStatus = lifeStatus;
+	}
+	
+	private void setExchangeType(ExchangeType exchangeType) {
+		this.exchangeType = exchangeType;
+	}
+	
+	private void setClass(@SuppressWarnings("rawtypes") Class xclass) {
+		this.xclass = xclass;
+	}
+	
+	private void timex(Exchange exchange, String exchId, String event) {
+		switch(lifeStatus) {
+		case Start:
+			break;
+		default:
+			long t0 = getStartMillis(exchange);
+			long t1 = System.currentTimeMillis();
+			long elapsed = t1-t0;
+			if(elapsed > ReasonableElapsedMillis) {
+				String details = "elapsed:"+elapsed;
+				LifeStatus save = getLifeStatus();
+				setLifeStatus(LifeStatus.Timer);
+				log(exchange, LogType.Warn, exchId, event, details);
+				setLifeStatus(save);
+			}
+			break;
+		}
+	}
+	
+	
+	public void process(Exchange exchange) throws Exception {
+		String location = "process";
+		try {
+			switch(lifeStatus) {
+			case Start:
+				putSequence(exchange);
+				putStartMillis(exchange);
+				break;
+			}
+			switch(exchangeType) {
+			case Receive:
+				processReceive(exchange);
+				break;
+			case Send:
+				processSend(exchange);
+				break;
+			case Reply:
+				processReply(exchange);
+				break;
+			}
+		}
+		catch(Exception e) {
+			logger.error(location, duccId, e);
+		}
+	};
+	
+	private void processReceive(Exchange exchange) throws Exception {
+		Object body = exchange.getIn().getBody();
+		if(body instanceof SubmitJobDuccEvent) {
+			processReceive(exchange, (SubmitJobDuccEvent)body);
+		}
+		else if(body instanceof CancelJobDuccEvent) {
+			processReceive(exchange, (CancelJobDuccEvent)body);
+		}
+		else if(body instanceof SubmitReservationDuccEvent) {
+			processReceive(exchange, (SubmitReservationDuccEvent)body);
+		}
+		else if(body instanceof CancelReservationDuccEvent) {
+			processReceive(exchange, (CancelReservationDuccEvent)body);
+		}
+		else if(body instanceof SubmitServiceDuccEvent) {
+			processReceive(exchange, (SubmitServiceDuccEvent)body);
+		}
+		else if(body instanceof CancelServiceDuccEvent) {
+			processReceive(exchange, (CancelServiceDuccEvent)body);
+		}
+		else if(body instanceof SmStateDuccEvent) {
+			processReceive(exchange, (SmStateDuccEvent)body);
+		}
+		else if(body instanceof RmStateDuccEvent) {
+			processReceive(exchange, (RmStateDuccEvent)body);
+		}
+		else if(body instanceof JdStateDuccEvent) {
+			processReceive(exchange, (JdStateDuccEvent)body);
+		}
+		else if(body instanceof NodeInventoryUpdateDuccEvent) {
+			processReceive(exchange, (NodeInventoryUpdateDuccEvent)body);
+		}
+		else {
+			processUnexpected(exchange, (DuccEvent)body);
+		}
+	}
+	
+	private void processReceive(Exchange exchange, SubmitJobDuccEvent duccEvent) {
+		String exchId = exchange.getExchangeId();
+		String event = duccEvent.getClass().getSimpleName();
+		log(exchange, defaultLogType, exchId, event);
+		timex(exchange, exchId, event);
+	}
+	
+	private void processReceive(Exchange exchange, CancelJobDuccEvent duccEvent) {
+		String exchId = exchange.getExchangeId();
+		String event = duccEvent.getClass().getSimpleName();
+		log(exchange, defaultLogType, exchId, event);
+		timex(exchange, exchId, event);
+	}
+	
+	private void processReceive(Exchange exchange, SubmitReservationDuccEvent duccEvent) {
+		String exchId = exchange.getExchangeId();
+		String event = duccEvent.getClass().getSimpleName();
+		log(exchange, defaultLogType, exchId, event);
+		timex(exchange, exchId, event);
+	}
+	
+	private void processReceive(Exchange exchange, CancelReservationDuccEvent duccEvent) {
+		String exchId = exchange.getExchangeId();
+		String event = duccEvent.getClass().getSimpleName();
+		log(exchange, defaultLogType, exchId, event);
+		timex(exchange, exchId, event);
+	}
+	
+	private void processReceive(Exchange exchange, SubmitServiceDuccEvent duccEvent) {
+		String exchId = exchange.getExchangeId();
+		String event = duccEvent.getClass().getSimpleName();
+		log(exchange, defaultLogType, exchId, event);
+		timex(exchange, exchId, event);
+	}
+	
+	private void processReceive(Exchange exchange, CancelServiceDuccEvent duccEvent) {
+		String exchId = exchange.getExchangeId();
+		String event = duccEvent.getClass().getSimpleName();
+		log(exchange, defaultLogType, exchId, event);
+		timex(exchange, exchId, event);
+	}
+	
+	private void processReceive(Exchange exchange, SmStateDuccEvent duccEvent) {
+		String exchId = exchange.getExchangeId();
+		String event = duccEvent.getClass().getSimpleName();
+		int count = 0;
+		ServiceMap map = duccEvent.getServiceMap();
+		if(map != null) {
+			count = map.size();
+		}
+		String details = "count:"+count;
+		log(exchange, defaultLogType, exchId, event, details);
+		timex(exchange, exchId, event);
+	}
+	
+	private void processReceive(Exchange exchange, RmStateDuccEvent duccEvent) {
+		String exchId = exchange.getExchangeId();
+		String event = duccEvent.getClass().getSimpleName();
+		Map<DuccId, IRmJobState> map = duccEvent.getJobState();
+		if(map != null) {
+			Iterator<Entry<DuccId, IRmJobState>> iterator = map.entrySet().iterator();
+			int countJ = 0;
+			int countR = 0;
+			int countS = 0;
+			int countO = 0;
+			while(iterator.hasNext()) {
+				Entry<DuccId, IRmJobState> entry = iterator.next();
+				IRmJobState resource = entry.getValue();
+				switch(resource.getDuccType()) {
+				case Job:
+					countJ++;
+					break;
+				case Reservation:
+					countR++;
+					break;
+				case Service:
+					countS++;
+					break;
+				default:
+					countO++;
+					break;
+				}
+			}
+			String details = "jobs:"+countJ+" "+"reservations:"+countR+" "+"services:"+countS+" "+"other:"+countO;
+			log(exchange, defaultLogType, exchId, event, details);
+			timex(exchange, exchId, event);
+		}
+		else {
+			log(exchange, LogType.Warn, exchId, event, "null map?");
+			timex(exchange, exchId, event);
+		}
+	}
+	
+	private void processReceive(Exchange exchange, JdStateDuccEvent duccEvent) {
+		String exchId = exchange.getExchangeId();
+		String event = duccEvent.getClass().getSimpleName();
+		DriverStatusReport dsr = duccEvent.getState();
+		if(dsr == null) {
+			String details = "dsr:"+dsr;
+			log(exchange, defaultLogType, exchId, event, details);
+			timex(exchange, exchId, event);
+		}
+		else {
+			String id = dsr.getId();
+			DriverState driverState = dsr.getDriverState();
+			if(driverState != null) {
+				String state = dsr.getDriverState().toString();
+				int threadCount = dsr.getThreadCount();
+				int wiFetch = dsr.getWorkItemsFetched();
+				int wiCompleted = dsr.getWorkItemsProcessingCompleted();
+				int wiError = dsr.getWorkItemsProcessingError();
+				String details = "id:"+id+" "+"state:"+state+" "+"threads:"+threadCount+" "+"wi.fecth:"+wiFetch+" "+"wi.completed:"+wiCompleted+" "+"wi.error:"+wiError;
+				log(exchange, defaultLogType, exchId, event, details);
+				timex(exchange, exchId, event);
+			}
+			else {
+				String state = null;
+				String details = "id:"+id+" "+"state:"+state;
+				log(exchange, LogType.Warn, exchId, event, details);
+				timex(exchange, exchId, event);
+			}
+		}
+	}
+	
+	private void processReceive(Exchange exchange, NodeInventoryUpdateDuccEvent duccEvent) {
+		String exchId = exchange.getExchangeId();
+		String event = duccEvent.getClass().getSimpleName();
+		HashMap<DuccId, IDuccProcess> map = duccEvent.getProcesses();
+		if(map != null) {
+			Iterator<Entry<DuccId, IDuccProcess>> iterator = map.entrySet().iterator();
+			if(iterator.hasNext()) {
+				Entry<DuccId, IDuccProcess> entry = iterator.next();
+				IDuccProcess jp = entry.getValue();
+				NodeIdentity nodeIdentity = jp.getNodeIdentity();
+				String ip = nodeIdentity.getIp();
+				String name = nodeIdentity.getName();
+				String details = "ip:"+ip+" "+"name:"+name+" "+"jp-count:"+map.size();
+				log(exchange, defaultLogType, exchId, event, details);
+				timex(exchange, exchId, event);
+			}
+			else {
+				log(exchange, LogType.Warn, exchId, event, "empty map?");
+				timex(exchange, exchId, event);
+			}
+		}
+		else {
+			log(exchange, LogType.Warn, exchId, event, "null map?");
+			timex(exchange, exchId, event);
+		}
+	}
+	
+	private void processSend(Exchange exchange) throws Exception {
+		String exchId = null;
+		String event = xclass.getSimpleName();
+		String details = null;
+		if(exchange != null) {
+			if(exchange.getIn() != null) {
+				Object body = exchange.getIn().getBody();
+				if(body != null) {
+					if(body instanceof OrchestratorStateDuccEvent) {
+						OrchestratorStateDuccEvent duccEvent = (OrchestratorStateDuccEvent)body;
+						int countJ = duccEvent.getWorkMap().getJobCount();
+						int countR = duccEvent.getWorkMap().getReservationCount();
+						int countS = duccEvent.getWorkMap().getServiceCount();
+						details = "jobs:"+countJ+" "+"reservations:"+countR+" "+"services:"+countS;
+					}
+					if(body instanceof OrchestratorAbbreviatedStateDuccEvent) {
+						OrchestratorAbbreviatedStateDuccEvent duccEvent = (OrchestratorAbbreviatedStateDuccEvent)body;
+						int countJ = duccEvent.getWorkMap().getJobCount();
+						int countR = duccEvent.getWorkMap().getReservationCount();
+						int countS = duccEvent.getWorkMap().getServiceCount();
+						details = "jobs:"+countJ+" "+"reservations:"+countR+" "+"services:"+countS;
+					}
+				}
+			}
+		}
+		log(exchange, defaultLogType, exchId, event, details);
+		timex(exchange, exchId, event);
+	}
+	
+	private void processReply(Exchange exchange) throws Exception {
+		String exchId = null;
+		Object body = exchange.getIn().getBody();
+		String event = body.getClass().getSimpleName();
+		log(exchange, defaultLogType, exchId, event);
+		timex(exchange, exchId, event);
+	}
+	
+	private void processUnexpected(Exchange exchange, DuccEvent duccEvent) {
+		String exchId = null;
+		String event = null;
+		if(duccEvent != null) {
+			event = duccEvent.getClass().getSimpleName();
+		}
+		log(exchange, LogType.Warn, exchId, event);
+		timex(exchange, exchId, event);
+	}
+	
+	private void log(Exchange exchange, LogType logType, String exchId, String event) {
+		log(exchange, defaultLogType, exchId, event, null);
+	}
+	
+	private boolean trSequence = true;
+	private boolean trLifeStatus = true;
+	private boolean trExchId = false;
+	private boolean trEvent = true;
+	private boolean trDetails = true;
+	
+	private String log( Exchange exchange, LogType logType, String exchId, String event, String details) {
+		String location = "log";
+		StringBuffer msgBuf = new StringBuffer();
+		if(trSequence) {
+			msgBuf.append("seq:"+getSequence(exchange)+" ");
+		}
+		if(trLifeStatus) {
+			msgBuf.append("life:"+lifeStatus+" ");
+		}
+		if(trExchId) {
+			msgBuf.append("exchId:"+exchId+" ");
+		}
+		if(trEvent) {
+			msgBuf.append("event:"+event+" ");
+		}
+		if(trDetails) {
+			if(details != null) {
+				msgBuf.append(details+" ");
+			}
+		}
+		String message = msgBuf.toString().trim();
+		switch(logType ) {
+			case Warn:
+				logger.warn(location, duccId, message);
+				break;
+			case Error:
+				logger.error(location, duccId, message);
+				break;
+			case Info:
+				logger.info(location, duccId, message);
+				break;
+			case Debug:
+				logger.debug(location, duccId, message);
+				break;
+		}
+		return message;
+	}
+	
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/monitor/Xmon.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/Checkpointable.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/Checkpointable.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/Checkpointable.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/Checkpointable.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.utilities;
+
+import java.io.Serializable;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+
+
+@SuppressWarnings("serial")
+public class Checkpointable implements Serializable {
+
+	private DuccWorkMap workMap;
+	private ConcurrentHashMap<DuccId,DuccId> processToJobMap;
+	
+	public Checkpointable(DuccWorkMap workMap, ConcurrentHashMap<DuccId,DuccId> processToJobMap) {
+		this.workMap = workMap;
+		this.processToJobMap = processToJobMap;
+	}
+	
+	public DuccWorkMap getWorkMap() {
+		return workMap;
+	}
+	
+	public ConcurrentHashMap<DuccId,DuccId> getProcessToJobMap() {
+		return processToJobMap;
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/Checkpointable.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/ComponentHelper.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/ComponentHelper.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/ComponentHelper.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/ComponentHelper.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.utilities;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.IOHelper;
+import org.apache.uima.ducc.orchestrator.OrchestratorCommonArea;
+
+
+public class ComponentHelper {
+	
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(ComponentHelper.class.getName());
+	private static OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+	private static Messages messages = orchestratorCommonArea.getSystemMessages();
+	
+	/**
+	 * Abort component (at start-up) if already running, indicated by the existence of
+	 * file <componentName>.lock in <directory>.
+	 * 
+	 * @param directory
+	 * @param componentName
+	 */
+	public static void oneInstance(String directory, String componentName) {
+		String methodName = "oneInstance";
+		try {
+			IOHelper.mkdirs(directory);
+			String filename = directory+componentName+".lock";
+			File file = new File(filename);
+			if(file.exists()) {
+				logger.error(methodName, null, messages.fetchLabel("found file")+filename);
+				BufferedReader in = new BufferedReader(new FileReader(file));
+				String hostname = in.readLine();
+				logger.error(methodName, null, messages.fetchLabel("already running on host")+hostname);
+				System.exit(-1);
+			}
+			file.deleteOnExit();
+			InetAddress addr = InetAddress.getLocalHost();
+			String hostname = addr.getHostName();
+			BufferedWriter out = new BufferedWriter(new FileWriter(file));
+		    out.write(hostname+"\n");
+		    out.close();
+		} catch (IOException e) {
+			logger.error(methodName, null, e);
+		}
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/ComponentHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/MemorySpecification.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/MemorySpecification.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/MemorySpecification.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/MemorySpecification.java Wed Jan  2 19:37:55 2013
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.utilities;
+
+import org.apache.uima.ducc.transport.event.common.IDuccUnits;
+
+public class MemorySpecification {
+
+	private String msize = null;
+	private String units = null;
+	private IDuccUnits.MemoryUnits memUnits = null;
+	
+	public MemorySpecification(String memorySpecification) {
+		init(memorySpecification);
+	}
+	
+	private void init(String memorySpecification) {
+		if(memorySpecification != null) {
+			msize = memorySpecification.replaceAll("\\s","");
+			if(msize.endsWith("KB")) {
+				units = "KB";
+				memUnits = IDuccUnits.MemoryUnits.KB;
+				msize = msize.substring(0,msize.length()-2);
+			}
+			else if(msize.endsWith("MB")) {
+				units = "MB";
+				memUnits = IDuccUnits.MemoryUnits.MB;
+				msize = msize.substring(0,msize.length()-2);
+			}
+			else if(msize.endsWith("GB")) {
+				units = "GB";
+				memUnits = IDuccUnits.MemoryUnits.GB;
+				msize = msize.substring(0,msize.length()-2);
+			}
+			else if(msize.endsWith("TB")) {
+				units = "TB";
+				memUnits = IDuccUnits.MemoryUnits.TB;
+				msize = msize.substring(0,msize.length()-2);
+			}
+			try {
+				Integer.parseInt(msize);
+			}
+			catch(Exception e) {
+				msize = null;
+			}
+		}
+	}
+	
+	public String getSize() {
+		return msize;
+	}
+	
+	public String getUnits() {
+		return units;
+	}
+	
+	public IDuccUnits.MemoryUnits getMemoryUnits() {
+		return memUnits;
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/MemorySpecification.java
------------------------------------------------------------------------------
    svn:eol-style = native