You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2015/04/02 23:47:58 UTC
svn commit: r1670986 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator:
OrchestratorCommonArea.java StateManager.java
Author: degenaro
Date: Thu Apr 2 21:47:57 2015
New Revision: 1670986
URL: http://svn.apache.org/r1670986
Log:
UIMA-4323 DUCC Orchestrator (OR) leaks storage slowly
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java?rev=1670986&r1=1670985&r2=1670986&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java Thu Apr 2 21:47:57 2015
@@ -39,7 +39,6 @@ import org.apache.uima.ducc.orchestrator
import org.apache.uima.ducc.orchestrator.utilities.TrackSync;
import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
import org.apache.uima.ducc.transport.event.common.history.HistoryPersistenceManager;
-import org.apache.uima.ducc.transport.event.jd.IDriverStatusReport;
public class OrchestratorCommonArea {
@@ -128,7 +127,6 @@ public class OrchestratorCommonArea {
initSeqNo();
setDuccIdFactory(new DuccIdFactory(propertiesFileManager,constSeqNo));
workMap = new DuccWorkMap();
- driverStatusReportMap = new ConcurrentHashMap<DuccId,IDriverStatusReport>();
processAccounting = new ProcessAccounting();
OrchestratorCheckpoint.getInstance().switchOnOff(commonConfiguration.orchestratorCheckpoint);
OrchestratorCheckpoint.getInstance().restoreState();
@@ -223,18 +221,6 @@ public class OrchestratorCommonArea {
}
// **********
-
- private ConcurrentHashMap<DuccId,IDriverStatusReport> driverStatusReportMap = null;
-
- public ConcurrentHashMap<DuccId,IDriverStatusReport> getDriverStatusReportMap() {
- return driverStatusReportMap;
- }
-
- public void setDriverStatusReportMap(ConcurrentHashMap<DuccId,IDriverStatusReport> driverStatusReportMap) {
- this.driverStatusReportMap = driverStatusReportMap;
- }
-
- // **********
private Messages systemMessages= Messages.getInstance();
private Messages userMessages= Messages.getInstance();
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java?rev=1670986&r1=1670985&r2=1670986&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java Thu Apr 2 21:47:57 2015
@@ -106,7 +106,6 @@ public class StateManager {
private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
private Messages messages = orchestratorCommonArea.getSystemMessages();
private DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
- private ConcurrentHashMap<DuccId,IDriverStatusReport> driverStatusReportMap = orchestratorCommonArea.getDriverStatusReportMap();
private StateJobAccounting stateJobAccounting = StateJobAccounting.getInstance();
HistoryPersistenceManager hpm = orchestratorCommonArea.getHistoryPersistencemanager();
@@ -262,7 +261,6 @@ public class StateManager {
}
if(duccWorkJob.isCompleted() && allProcessesTerminated(duccWorkJob) && isSaved(duccWorkJob) && isAgedOut(duccWorkJob)) {
WorkMapHelper.removeDuccWork(workMap, duccWorkJob, this, methodName);
- driverStatusReportMap.remove(duccId);
logger.info(methodName, duccId, messages.fetch("removed job"));
changes ++;
IDuccProcessMap processMap = duccWorkJob.getProcessMap();
@@ -605,7 +603,9 @@ public class StateManager {
break;
case Completing:
default:
- driverStatusReportMap.put(duccId, jdStatusReport);
+ duccWorkJob.setWiTotal(jdStatusReport.getWorkItemsTotal());
+ duccWorkJob.setWiDone(jdStatusReport.getWorkItemsProcessingCompleted());
+ duccWorkJob.setWiError(jdStatusReport.getWorkItemsProcessingError());
break;
}
//
@@ -745,20 +745,16 @@ public class StateManager {
logger.trace(methodName, null, messages.fetch("exit"));
}
- public boolean isExcessCapacity(DuccWorkJob job, IDriverStatusReport jdStatusReport) {
+ public boolean isExcessCapacity(DuccWorkJob job) {
String methodName = "isExcessCapacity";
boolean retVal = false;
- if(jdStatusReport != null) {
+ long total = job.getWiTotal();
+ long done = job.getWiDone();
+ long error = job.getWiDone();
+ long sum = total + done + error;
+ if(sum > 0) {
long capacity = job.getWorkItemCapacity();
- long total = jdStatusReport.getWorkItemsTotal();
- long done = jdStatusReport.getWorkItemsProcessingCompleted();
- long error = jdStatusReport.getWorkItemsProcessingError();
long todo = total - (done + error);
- if(jdStatusReport instanceof IDriverStatusReportV1) {
- IDriverStatusReportV1 jdStatusReportV1 = (IDriverStatusReportV1) jdStatusReport;
- long lost = jdStatusReportV1.getWorkItemsLost();
- todo = todo - lost;
- }
long tps = job.getSchedulingInfo().getIntThreadsPerShare();
long numShares = 0;
if(todo%tps > 0) {
@@ -803,7 +799,7 @@ public class StateManager {
if(isDeallocatable(jdStatusReport)) {
IDuccProcessMap processMap = job.getProcessMap();
Iterator<DuccId> iterator = processMap.keySet().iterator();
- boolean excessCapacity = isExcessCapacity(job, jdStatusReport);
+ boolean excessCapacity = isExcessCapacity(job);
int count = 0;
while(iterator.hasNext() && excessCapacity) {
count++;
@@ -817,7 +813,7 @@ public class StateManager {
process.setProcessDeallocationType(ProcessDeallocationType.Voluntary);
logger.info(methodName, job.getDuccId(), process.getDuccId(), "deallocated");
retVal = true;
- excessCapacity = isExcessCapacity(job, jdStatusReport);
+ excessCapacity = isExcessCapacity(job);
}
else {
logger.debug(methodName, job.getDuccId(), process.getDuccId(), "operating");
@@ -1249,7 +1245,7 @@ public class StateManager {
break;
default:
// allocation unnecessary if job has excess capacity
- if(isExcessCapacity(duccWorkJob,driverStatusReportMap.get(duccId))) {
+ if(isExcessCapacity(duccWorkJob)) {
OrUtil.setResourceState(duccWorkJob, process, ResourceState.Deallocated);
process.setProcessDeallocationType(ProcessDeallocationType.Voluntary);
process.advanceProcessState(ProcessState.Stopped);