You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2014/04/23 20:02:11 UTC

svn commit: r1589473 - /uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java

Author: degenaro
Date: Wed Apr 23 18:02:11 2014
New Revision: 1589473

URL: http://svn.apache.org/r1589473
Log:
UIMA-3360 DUCC orchestrator (OR) must report "investment" in work items for each JP so RM can make intelligent preemption decisions

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java?rev=1589473&r1=1589472&r2=1589473&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java Wed Apr 23 18:02:11 2014
@@ -24,12 +24,14 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.uima.ducc.common.Node;
 import org.apache.uima.ducc.common.NodeIdentity;
 import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.jd.files.workitem.RemoteLocation;
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
 import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
@@ -322,6 +324,31 @@ public class StateManager {
 		}
 	}
 	
+	private void copyInvestmentReport(DuccWorkJob job, DriverStatusReport jdStatusReport) {
+		String methodName = "copyInvestmentReport";
+		try {
+			ConcurrentHashMap<RemoteLocation, Long> omMap = jdStatusReport.getOperatingMillisMap();
+			IDuccProcessMap processMap = job.getProcessMap();
+			for(Entry<DuccId, IDuccProcess> entry : processMap.entrySet()) {
+				IDuccProcess process = entry.getValue();
+				Node node = process.getNode();
+				NodeIdentity nodeIdentity = node.getNodeIdentity();
+				String nodeIP = nodeIdentity.getIp();
+				String pid = process.getPID();
+				RemoteLocation remoteLocation = new RemoteLocation(nodeIP, pid);
+				long investment = 0;
+				if(omMap.containsKey(remoteLocation)) {
+					investment = omMap.get(remoteLocation).longValue();
+				}
+				process.setWiMillisInvestment(investment);
+				logger.debug(methodName, job.getDuccId(), process.getDuccId(), "investment:"+investment+" "+"node(IP): "+nodeIP+" "+"pid: "+pid);
+			}
+		}
+		catch(Throwable t) {
+			logger.error(methodName, job.getDuccId(), t);
+		}
+	}
+	
 	private void copyProcessWorkItemsReport(DuccWorkJob job, DriverStatusReport jdStatusReport) {
 		String methodName = "copyProcessWorkItemsReport";
 		try {
@@ -438,6 +465,7 @@ public class StateManager {
 					}
 				}
 				//
+				copyInvestmentReport(duccWorkJob, jdStatusReport);
 				copyProcessWorkItemsReport(duccWorkJob, jdStatusReport);
 				copyDriverWorkItemsReport(duccWorkJob, jdStatusReport);
 				//