You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2015/04/02 23:47:58 UTC

svn commit: r1670986 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator: OrchestratorCommonArea.java StateManager.java

Author: degenaro
Date: Thu Apr  2 21:47:57 2015
New Revision: 1670986

URL: http://svn.apache.org/r1670986
Log:
UIMA-4323 DUCC Orchestrator (OR) leaks storage slowly

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java?rev=1670986&r1=1670985&r2=1670986&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java Thu Apr  2 21:47:57 2015
@@ -39,7 +39,6 @@ import org.apache.uima.ducc.orchestrator
 import org.apache.uima.ducc.orchestrator.utilities.TrackSync;
 import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
 import org.apache.uima.ducc.transport.event.common.history.HistoryPersistenceManager;
-import org.apache.uima.ducc.transport.event.jd.IDriverStatusReport;
 
 
 public class OrchestratorCommonArea {
@@ -128,7 +127,6 @@ public class OrchestratorCommonArea {
 		initSeqNo();
 		setDuccIdFactory(new DuccIdFactory(propertiesFileManager,constSeqNo));
 		workMap = new DuccWorkMap();
-		driverStatusReportMap = new ConcurrentHashMap<DuccId,IDriverStatusReport>();
 		processAccounting = new ProcessAccounting();
 		OrchestratorCheckpoint.getInstance().switchOnOff(commonConfiguration.orchestratorCheckpoint);
 		OrchestratorCheckpoint.getInstance().restoreState();
@@ -223,18 +221,6 @@ public class OrchestratorCommonArea {
 	}
 	
 	// **********
-	
-	private ConcurrentHashMap<DuccId,IDriverStatusReport> driverStatusReportMap = null;
-	
-	public ConcurrentHashMap<DuccId,IDriverStatusReport> getDriverStatusReportMap() {
-		return driverStatusReportMap;
-	}
-	
-	public void setDriverStatusReportMap(ConcurrentHashMap<DuccId,IDriverStatusReport> driverStatusReportMap) {
-		this.driverStatusReportMap = driverStatusReportMap;
-	}
-	
-	// **********
 	
 	private Messages systemMessages= Messages.getInstance();
 	private Messages userMessages= Messages.getInstance();

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java?rev=1670986&r1=1670985&r2=1670986&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java Thu Apr  2 21:47:57 2015
@@ -106,7 +106,6 @@ public class StateManager {
 	private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
 	private Messages messages = orchestratorCommonArea.getSystemMessages();
 	private DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
-	private ConcurrentHashMap<DuccId,IDriverStatusReport> driverStatusReportMap = orchestratorCommonArea.getDriverStatusReportMap();
 	private StateJobAccounting stateJobAccounting = StateJobAccounting.getInstance();
 	
 	HistoryPersistenceManager hpm = orchestratorCommonArea.getHistoryPersistencemanager();
@@ -262,7 +261,6 @@ public class StateManager {
 					}
 					if(duccWorkJob.isCompleted() && allProcessesTerminated(duccWorkJob) && isSaved(duccWorkJob) && isAgedOut(duccWorkJob)) {
 						WorkMapHelper.removeDuccWork(workMap, duccWorkJob, this, methodName);
-						driverStatusReportMap.remove(duccId);
 						logger.info(methodName, duccId, messages.fetch("removed job"));
 						changes ++;
 						IDuccProcessMap processMap = duccWorkJob.getProcessMap();
@@ -605,7 +603,9 @@ public class StateManager {
 					break;
 				case Completing:
 				default:
-					driverStatusReportMap.put(duccId, jdStatusReport);
+					duccWorkJob.setWiTotal(jdStatusReport.getWorkItemsTotal());
+					duccWorkJob.setWiDone(jdStatusReport.getWorkItemsProcessingCompleted());
+					duccWorkJob.setWiError(jdStatusReport.getWorkItemsProcessingError());
 					break;
 				}
 				//
@@ -745,20 +745,16 @@ public class StateManager {
 		logger.trace(methodName, null, messages.fetch("exit"));
 	}
 	
-	public boolean isExcessCapacity(DuccWorkJob job, IDriverStatusReport jdStatusReport) {
+	public boolean isExcessCapacity(DuccWorkJob job) {
 		String methodName = "isExcessCapacity";
 		boolean retVal = false;
-		if(jdStatusReport != null) {
+		long total = job.getWiTotal();
+		long done = job.getWiDone();
+		long error = job.getWiDone();
+		long sum = total + done + error;
+		if(sum > 0) {
 			long capacity = job.getWorkItemCapacity();
-			long total = jdStatusReport.getWorkItemsTotal();
-			long done = jdStatusReport.getWorkItemsProcessingCompleted();
-			long error = jdStatusReport.getWorkItemsProcessingError();
 			long todo = total - (done + error);
-			if(jdStatusReport instanceof IDriverStatusReportV1) {
-				IDriverStatusReportV1 jdStatusReportV1 = (IDriverStatusReportV1) jdStatusReport;
-				long lost = jdStatusReportV1.getWorkItemsLost();
-				todo = todo - lost;
-			}
 			long tps = job.getSchedulingInfo().getIntThreadsPerShare();
 			long numShares = 0;
 			if(todo%tps > 0) {
@@ -803,7 +799,7 @@ public class StateManager {
 			if(isDeallocatable(jdStatusReport)) {
 				IDuccProcessMap processMap = job.getProcessMap();
 				Iterator<DuccId> iterator = processMap.keySet().iterator();
-				boolean excessCapacity = isExcessCapacity(job, jdStatusReport);
+				boolean excessCapacity = isExcessCapacity(job);
 				int count = 0;
 				while(iterator.hasNext() && excessCapacity) {
 					count++;
@@ -817,7 +813,7 @@ public class StateManager {
 							process.setProcessDeallocationType(ProcessDeallocationType.Voluntary);
 							logger.info(methodName, job.getDuccId(), process.getDuccId(), "deallocated");
 							retVal = true;
-							excessCapacity = isExcessCapacity(job, jdStatusReport);
+							excessCapacity = isExcessCapacity(job);
 						}
 						else {
 							logger.debug(methodName, job.getDuccId(), process.getDuccId(), "operating");
@@ -1249,7 +1245,7 @@ public class StateManager {
 						break;
 					default:
 						// allocation unnecessary if job has excess capacity
-						if(isExcessCapacity(duccWorkJob,driverStatusReportMap.get(duccId))) {
+						if(isExcessCapacity(duccWorkJob)) {
 							OrUtil.setResourceState(duccWorkJob, process, ResourceState.Deallocated);
 							process.setProcessDeallocationType(ProcessDeallocationType.Voluntary);
 							process.advanceProcessState(ProcessState.Stopped);