You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2016/10/14 19:19:31 UTC
svn commit: r1764951 - in
/uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator:
OrchestratorCommonArea.java ProcessAccounting.java ProcessToJobMap.java
StateManager.java
Author: degenaro
Date: Fri Oct 14 19:19:31 2016
New Revision: 1764951
URL: http://svn.apache.org/viewvc?rev=1764951&view=rev
Log:
UIMA-5060 DUCC Orchestrator (OR) "warm" restart issues
- Fix leak of JD entries in OR's map of processId-to-jobId
- Make ProcessToJobMap its own class
- Code refactoring for simplification and clarity in ProcessAccounting
Added:
uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessToJobMap.java (with props)
Modified:
uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java
uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java
uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java
Modified: uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java?rev=1764951&r1=1764950&r2=1764951&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java Fri Oct 14 19:19:31 2016
@@ -183,7 +183,6 @@ public class OrchestratorCommonArea {
// **********
- @SuppressWarnings("unchecked")
public Checkpointable getCheckpointable() {
String methodName = "getCheckpointable";
DuccWorkMap ckptWorkMap;
@@ -192,7 +191,7 @@ public class OrchestratorCommonArea {
synchronized(this) {
ts.using();
ckptWorkMap = (DuccWorkMap)SerializationUtils.clone(workMap);
- ckptProcessToJobMap = (ConcurrentHashMap<DuccId,DuccId>)SerializationUtils.clone(processAccounting.getProcessToJobMap());
+ ckptProcessToJobMap = ProcessToJobMap.getInstance().getMap();
}
ts.ended();
return new Checkpointable(ckptWorkMap,ckptProcessToJobMap);
@@ -204,14 +203,14 @@ public class OrchestratorCommonArea {
synchronized(this) {
ts.using();
workMap = checkpointable.getWorkMap();
- processAccounting = new ProcessAccounting(checkpointable.getProcessToJobMap());
+ ProcessToJobMap.getInstance().putMap(checkpointable.getProcessToJobMap());
}
ts.ended();
}
// **********
- private ProcessAccounting processAccounting;
+ private ProcessAccounting processAccounting = null;
public ProcessAccounting getProcessAccounting() {
return processAccounting;
Modified: uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java?rev=1764951&r1=1764950&r2=1764951&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java Fri Oct 14 19:19:31 2016
@@ -21,7 +21,6 @@ package org.apache.uima.ducc.orchestrato
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.uima.ducc.common.internationalization.Messages;
import org.apache.uima.ducc.common.node.metrics.ProcessGarbageCollectionStats;
@@ -34,6 +33,7 @@ import org.apache.uima.ducc.orchestrator
import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+import org.apache.uima.ducc.transport.event.common.DuccWorkPopDriver;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
import org.apache.uima.ducc.transport.event.common.IDuccSchedulingInfo;
@@ -55,30 +55,17 @@ public class ProcessAccounting {
private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
private Messages messages = orchestratorCommonArea.getSystemMessages();
- private DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
-
- private ConcurrentHashMap<DuccId,DuccId> processToJobMap = new ConcurrentHashMap<DuccId,DuccId>();
+ private ProcessToJobMap processToJobMap = ProcessToJobMap.getInstance();
private StateJobAccounting stateJobAccounting = StateJobAccounting.getInstance();
public ProcessAccounting() {
}
- public ProcessAccounting(ConcurrentHashMap<DuccId,DuccId> processToJobMap) {
- setProcessToJobMap(processToJobMap);
- }
-
- public ConcurrentHashMap<DuccId,DuccId> getProcessToJobMap() {
- return this.processToJobMap;
- }
-
- private void setProcessToJobMap(ConcurrentHashMap<DuccId,DuccId> processToJobMap) {
- this.processToJobMap = processToJobMap;
- }
-
public DuccId getJobId(DuccId processId) {
String methodName = "getJobId";
DuccId retVal;
+ DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
synchronized(workMap) {
ts.using();
@@ -91,6 +78,7 @@ public class ProcessAccounting {
public int processCount() {
String methodName = "processCount";
int retVal;
+ DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
synchronized(workMap) {
ts.using();
@@ -104,6 +92,7 @@ public class ProcessAccounting {
String methodName = "addProcess";
logger.trace(methodName, null, messages.fetch("enter"));
boolean retVal = false;
+ DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
ts.using();
DuccId key = processToJobMap.put(processId, jobId);
@@ -123,6 +112,7 @@ public class ProcessAccounting {
String methodName = "removeProcess";
logger.trace(methodName, null, messages.fetch("enter"));
boolean retVal = false;
+ DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
synchronized(workMap) {
ts.using();
@@ -656,10 +646,37 @@ public class ProcessAccounting {
}
logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
}
+
+ private IDuccProcess getProcess(IDuccWorkExecutable dw, DuccId processId) {
+ IDuccProcess process = null;
+ if(dw != null) {
+ if(processId != null) {
+ IDuccProcessMap map = null;
+ map = dw.getProcessMap();
+ if(map != null) {
+ process = map.get(processId);
+ if(process == null) {
+ if(dw instanceof IDuccWorkJob) {
+ IDuccWorkJob job = (IDuccWorkJob) dw;
+ DuccWorkPopDriver driver = job.getDriver();
+ if(driver != null) {
+ map = driver.getProcessMap();
+ if(map != null) {
+ process = map.get(processId);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ return process;
+ }
public void setStatus(IDuccProcess inventoryProcess) {
String methodName = "setStatus";
logger.trace(methodName, null, messages.fetch("enter"));
+ DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
try {
DuccId processId = inventoryProcess.getDuccId();
logger.debug(methodName, null, processId, messages.fetchLabel("node")+inventoryProcess.getNodeIdentity().getName()+" "+messages.fetchLabel("PID")+inventoryProcess.getPID());
@@ -667,59 +684,58 @@ public class ProcessAccounting {
synchronized(workMap) {
ts.using();
if(processToJobMap.containsKey(processId)) {
+ logger.trace(methodName, null, processId, "key found");
DuccId jobId = getJobId(processId);
- IDuccWork duccWork = WorkMapHelper.findDuccWork(workMap, jobId, this, methodName);
- if(duccWork != null) {
- if(duccWork instanceof IDuccWorkExecutable) {
- IDuccWorkExecutable duccWorkExecutable = (IDuccWorkExecutable) duccWork;
+ logger.trace(methodName, jobId, processId, "jobId from process map");
+ IDuccWork dw = WorkMapHelper.findDuccWork(workMap, jobId, this, methodName);
+ if(dw != null) {
+ logger.trace(methodName, dw.getDuccId(), processId, "entity found in work map");
+ if(dw instanceof IDuccWorkExecutable) {
IDuccWorkJob job = null;
- if(duccWork instanceof IDuccWorkJob) {
- job = (IDuccWorkJob)duccWork;
- }
- IDuccProcessMap processMap = duccWorkExecutable.getProcessMap();
- IDuccProcess process = processMap.get(processId);
- if(process == null) {
- if(job != null) {
- process = job.getDriver().getProcessMap().get(processId);
- OrchestratorHelper.jdDeallocate(job, inventoryProcess);
- }
- }
- if(process != null) {
- if(process.isComplete()) {
- logger.trace(methodName, jobId, process.getDuccId(), "finalized");
+ if(dw instanceof IDuccWorkJob) {
+ job = (IDuccWorkJob)dw;
+ IDuccProcess process = getProcess(job, processId);
+ if(process != null) {
+ logger.trace(methodName, job.getDuccId(), processId, "process found");
+ if(process.isComplete()) {
+ logger.trace(methodName, jobId, process.getDuccId(), "finalized");
+ }
+ else {
+ logger.trace(methodName, jobId, process.getDuccId(), "active");
+ // PID
+ copyInventoryPID(job, inventoryProcess, process);
+ // Scheduler State
+ setResourceStateAndReason(job, inventoryProcess, process);
+ // Process State
+ copyInventoryProcessState(job, inventoryProcess, process);
+ // Process Reason
+ copyReasonForStoppingProcess(job, inventoryProcess, process);
+ // Process Exit code
+ copyProcessExitCode(job, inventoryProcess, process);
+ // Process Init & Run times
+ updateProcessTime(job, inventoryProcess, process);
+ // Process Initialization State
+ updateProcessInitilization(job, inventoryProcess, process);
+ // Process Pipeline Components State
+ copyUimaPipelineComponentsState(job, inventoryProcess, process);
+ // Process Swap Usage
+ copyInventorySwapUsage(job, inventoryProcess, process);
+ // Process Major Faults
+ copyInventoryMajorFaults(job, inventoryProcess, process);
+ // Process Rss
+ copyInventoryRss(job, inventoryProcess, process);
+ // Process GC Stats
+ copyInventoryGCStats(job, inventoryProcess, process);
+ // Process CPU Time
+ copyInventoryCpuTime(job, inventoryProcess, process);
+ }
}
else {
- logger.trace(methodName, jobId, process.getDuccId(), "active");
- // PID
- copyInventoryPID(job, inventoryProcess, process);
- // Scheduler State
- setResourceStateAndReason(job, inventoryProcess, process);
- // Process State
- copyInventoryProcessState(job, inventoryProcess, process);
- // Process Reason
- copyReasonForStoppingProcess(job, inventoryProcess, process);
- // Process Exit code
- copyProcessExitCode(job, inventoryProcess, process);
- // Process Init & Run times
- updateProcessTime(job, inventoryProcess, process);
- // Process Initialization State
- updateProcessInitilization(job, inventoryProcess, process);
- // Process Pipeline Components State
- copyUimaPipelineComponentsState(job, inventoryProcess, process);
- // Process Swap Usage
- copyInventorySwapUsage(job, inventoryProcess, process);
- // Process Major Faults
- copyInventoryMajorFaults(job, inventoryProcess, process);
- // Process Rss
- copyInventoryRss(job, inventoryProcess, process);
- // Process GC Stats
- copyInventoryGCStats(job, inventoryProcess, process);
- // Process CPU Time
- copyInventoryCpuTime(job, inventoryProcess, process);
+ logger.warn(methodName, dw.getDuccId(), processId, messages.fetch("process not found in job's process table"));
}
}
else {
- logger.warn(methodName, jobId, processId, messages.fetch("process not found job's process table"));
+ logger.warn(methodName, dw.getDuccId(), processId, "entity is not job");
}
}
else {
@@ -727,7 +743,7 @@ public class ProcessAccounting {
}
}
else {
- logger.warn(methodName, jobId, processId, messages.fetch("job ID not found"));
+ logger.warn(methodName, jobId, processId, messages.fetch("ID not found"));
}
}
else {
Added: uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessToJobMap.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessToJobMap.java?rev=1764951&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessToJobMap.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessToJobMap.java Fri Oct 14 19:19:31 2016
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+
+/**
+ * Keep a map of processes-to-jobs to minimize searching job process
+ * maps to discover which job a particular process belongs to.
+ */
+
+public class ProcessToJobMap {
+
+ private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(ProcessToJobMap.class.getName());
+ private static final DuccId jobid = null;
+
+ private static ProcessToJobMap instance = new ProcessToJobMap();
+
+ public static ProcessToJobMap getInstance() {
+ return instance;
+ }
+
+ private ConcurrentHashMap<DuccId,DuccId> processToJobMap = new ConcurrentHashMap<DuccId,DuccId>();
+
+ public ConcurrentHashMap<DuccId,DuccId> getMap() {
+ ConcurrentHashMap<DuccId,DuccId> retVal = new ConcurrentHashMap<DuccId,DuccId>();
+ retVal.putAll(processToJobMap);
+ return retVal;
+ }
+
+ public void putMap(ConcurrentHashMap<DuccId,DuccId> map) {
+ String location = "putMap";
+ if(map != null) {
+ logger.debug(location, jobid, map.size());
+ for(Entry<DuccId, DuccId> entry : map.entrySet()) {
+ this.put(entry.getKey(),entry.getValue());
+ }
+ }
+ }
+
+ public boolean containsKey(DuccId key) {
+ return processToJobMap.containsKey(key);
+ }
+
+ public DuccId put(DuccId processId, DuccId jobId) {
+ String location = "put";
+ DuccId retVal = processToJobMap.put(processId, jobId);
+ logger.debug(location, jobId, processId, "size="+processToJobMap.size());
+ return retVal;
+ }
+
+ public DuccId remove(DuccId processId) {
+ String location = "remove";
+ DuccId jobId = processToJobMap.get(processId);
+ DuccId retVal = processToJobMap.remove(processId);
+ logger.debug(location, jobId, processId, "size="+processToJobMap.size());
+ return retVal;
+ }
+
+ public DuccId get(DuccId key) {
+ String location = "get";
+ DuccId retVal = processToJobMap.get(key);
+ logger.debug(location, retVal, key, "size="+processToJobMap.size());
+ return retVal;
+ }
+
+ public int size() {
+ return processToJobMap.size();
+ }
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessToJobMap.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessToJobMap.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java?rev=1764951&r1=1764950&r2=1764951&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java Fri Oct 14 19:19:31 2016
@@ -84,6 +84,7 @@ import org.apache.uima.ducc.transport.ev
public class StateManager {
private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(StateManager.class.getName());
+ private static final DuccId jobid = null;
private static StateManager stateManager = new StateManager();
@@ -238,8 +239,29 @@ public class StateManager {
return retVal;
}
+ private void dumper() {
+ String location = "dumper";
+ try {
+ DuccWorkMap dwMap = orchestratorCommonArea.getWorkMap();
+ for(DuccId duccId : dwMap.keySet()) {
+ IDuccWork dw = dwMap.findDuccWork(duccId);
+ if(dw != null) {
+ logger.trace(location, duccId, "dw: "+dw.getDuccType());
+ }
+ }
+ ConcurrentHashMap<DuccId, DuccId> p2jMap = ProcessToJobMap.getInstance().getMap();
+ for(Entry<DuccId, DuccId> entry : p2jMap.entrySet()) {
+ logger.trace(location, jobid, "p:"+entry.getKey()+" "+"j:"+entry.getValue());
+ }
+ }
+ catch(Exception e) {
+ logger.error(location, jobid, e);
+ }
+ }
+
public int prune(DuccWorkMap workMap) {
String methodName = "prune";
+ dumper();
int changes = 0;
logger.trace(methodName, null, messages.fetch("enter"));
long t0 = System.currentTimeMillis();
@@ -268,15 +290,32 @@ public class StateManager {
WorkMapHelper.removeDuccWork(workMap, duccWorkJob, this, methodName);
logger.info(methodName, duccId, messages.fetch("removed job"));
changes ++;
- IDuccProcessMap processMap = duccWorkJob.getProcessMap();
- Iterator<DuccId> processMapIterator = processMap.keySet().iterator();
- while(processMapIterator.hasNext()) {
- DuccId processDuccId = processMapIterator.next();
- orchestratorCommonArea.getProcessAccounting().removeProcess(processDuccId);
- logger.info(methodName, duccId, messages.fetch("removed process")+" "+processDuccId.toString());
- changes ++;
+ IDuccProcessMap processMap = null;
+ DuccWorkPopDriver driver = duccWorkJob.getDriver();
+ if(driver != null) {
+ processMap = driver.getProcessMap();
+ }
+ if(processMap != null) {
+ Iterator<DuccId> processMapIterator = processMap.keySet().iterator();
+ while(processMapIterator.hasNext()) {
+ DuccId processDuccId = processMapIterator.next();
+ orchestratorCommonArea.getProcessAccounting().removeProcess(processDuccId);
+ logger.info(methodName, duccId, messages.fetch("removed driver process")+" "+processDuccId.toString());
+ changes ++;
+ }
+ logger.info(methodName, duccId, messages.fetch("processes driver inactive"));
+ }
+ processMap = duccWorkJob.getProcessMap();
+ if(processMap != null) {
+ Iterator<DuccId> processMapIterator = processMap.keySet().iterator();
+ while(processMapIterator.hasNext()) {
+ DuccId processDuccId = processMapIterator.next();
+ orchestratorCommonArea.getProcessAccounting().removeProcess(processDuccId);
+ logger.info(methodName, duccId, messages.fetch("removed process")+" "+processDuccId.toString());
+ changes ++;
+ }
+ logger.info(methodName, duccId, messages.fetch("processes inactive"));
}
- logger.info(methodName, duccId, messages.fetch("processes inactive"));
}
else {
logger.debug(methodName, duccId, messages.fetch("processes active"));
@@ -302,7 +341,7 @@ public class StateManager {
if(elapsed > Constants.SYNC_LIMIT) {
logger.debug(methodName, null, "elapsed msecs: "+elapsed);
}
- logger.debug(methodName, null, "processToWorkMap.size()="+orchestratorCommonArea.getProcessAccounting().processCount());
+ logger.debug(methodName, null, "processToWorkMap.size="+orchestratorCommonArea.getProcessAccounting().processCount());
if(changes > 0) {
OrchestratorCheckpoint.getInstance().saveState();
}