You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2016/10/14 19:19:31 UTC

svn commit: r1764951 - in /uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator: OrchestratorCommonArea.java ProcessAccounting.java ProcessToJobMap.java StateManager.java

Author: degenaro
Date: Fri Oct 14 19:19:31 2016
New Revision: 1764951

URL: http://svn.apache.org/viewvc?rev=1764951&view=rev
Log:
UIMA-5060 DUCC Orchestrator (OR) "warm" restart issues

- Fix leak of JD entries in OR's map of processId-to-jobId
- Make ProcessToJobMap its own class
- Code refactoring for simplification and clarity in ProcessAccounting

Added:
    uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessToJobMap.java   (with props)
Modified:
    uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java
    uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java
    uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java

Modified: uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java?rev=1764951&r1=1764950&r2=1764951&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java Fri Oct 14 19:19:31 2016
@@ -183,7 +183,6 @@ public class OrchestratorCommonArea {
 	
 	// **********
 	
-	@SuppressWarnings("unchecked")
 	public Checkpointable getCheckpointable() {
 		String methodName = "getCheckpointable";
 		DuccWorkMap ckptWorkMap;
@@ -192,7 +191,7 @@ public class OrchestratorCommonArea {
 		synchronized(this) {
 			ts.using();
 			ckptWorkMap = (DuccWorkMap)SerializationUtils.clone(workMap);
-			ckptProcessToJobMap = (ConcurrentHashMap<DuccId,DuccId>)SerializationUtils.clone(processAccounting.getProcessToJobMap());
+			ckptProcessToJobMap = ProcessToJobMap.getInstance().getMap();
 		}
 		ts.ended();
 		return new Checkpointable(ckptWorkMap,ckptProcessToJobMap);
@@ -204,14 +203,14 @@ public class OrchestratorCommonArea {
 		synchronized(this) {
 			ts.using();
 			workMap = checkpointable.getWorkMap();
-			processAccounting = new ProcessAccounting(checkpointable.getProcessToJobMap());
+			ProcessToJobMap.getInstance().putMap(checkpointable.getProcessToJobMap());
 		}
 		ts.ended();
 	}
 	
 	// **********
 	
-	private ProcessAccounting processAccounting;
+	private ProcessAccounting processAccounting = null;
 	
 	public ProcessAccounting getProcessAccounting() {
 		return processAccounting;

Modified: uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java?rev=1764951&r1=1764950&r2=1764951&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java Fri Oct 14 19:19:31 2016
@@ -21,7 +21,6 @@ package org.apache.uima.ducc.orchestrato
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.uima.ducc.common.internationalization.Messages;
 import org.apache.uima.ducc.common.node.metrics.ProcessGarbageCollectionStats;
@@ -34,6 +33,7 @@ import org.apache.uima.ducc.orchestrator
 import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
 import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
 import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+import org.apache.uima.ducc.transport.event.common.DuccWorkPopDriver;
 import org.apache.uima.ducc.transport.event.common.IDuccProcess;
 import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
 import org.apache.uima.ducc.transport.event.common.IDuccSchedulingInfo;
@@ -55,30 +55,17 @@ public class ProcessAccounting {
 	
 	private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
 	private Messages messages = orchestratorCommonArea.getSystemMessages();
-	private DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
-	
-	private ConcurrentHashMap<DuccId,DuccId> processToJobMap = new ConcurrentHashMap<DuccId,DuccId>();
 	
+	private ProcessToJobMap processToJobMap = ProcessToJobMap.getInstance();
 	private StateJobAccounting stateJobAccounting = StateJobAccounting.getInstance();
 	
 	public ProcessAccounting() {
 	}
 	
-	public ProcessAccounting(ConcurrentHashMap<DuccId,DuccId> processToJobMap) {
-		setProcessToJobMap(processToJobMap);
-	}
-	
-	public ConcurrentHashMap<DuccId,DuccId> getProcessToJobMap() {
-		return this.processToJobMap;
-	}
-	
-	private void setProcessToJobMap(ConcurrentHashMap<DuccId,DuccId> processToJobMap) {
-		this.processToJobMap = processToJobMap;
-	}
-	
 	public DuccId getJobId(DuccId processId) {
 		String methodName = "getJobId";
 		DuccId retVal;
+		DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
 		TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
 		synchronized(workMap) {
 			ts.using();
@@ -91,6 +78,7 @@ public class ProcessAccounting {
 	public int processCount() {
 		String methodName = "processCount";
 		int retVal;
+		DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
 		TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
 		synchronized(workMap) {
 			ts.using();
@@ -104,6 +92,7 @@ public class ProcessAccounting {
 		String methodName = "addProcess";
 		logger.trace(methodName, null, messages.fetch("enter"));
 		boolean retVal = false;
+		DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
 		TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
 		ts.using();
 		DuccId key = processToJobMap.put(processId, jobId);
@@ -123,6 +112,7 @@ public class ProcessAccounting {
 		String methodName = "removeProcess";
 		logger.trace(methodName, null, messages.fetch("enter"));
 		boolean retVal = false;
+		DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
 		TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
 		synchronized(workMap) {
 			ts.using();
@@ -656,10 +646,37 @@ public class ProcessAccounting {
 		}
 		logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
 	}
+
+	private IDuccProcess getProcess(IDuccWorkExecutable dw, DuccId processId) {
+		IDuccProcess process = null;
+		if(dw != null) {
+			if(processId != null) {
+				IDuccProcessMap map = null;
+				map = dw.getProcessMap();
+				if(map != null) {
+					process = map.get(processId);
+					if(process == null) {
+						if(dw instanceof IDuccWorkJob) {
+							IDuccWorkJob job = (IDuccWorkJob) dw;
+							DuccWorkPopDriver driver = job.getDriver();
+							if(driver != null) {
+								map = driver.getProcessMap();
+								if(map != null) {
+									process = map.get(processId);
+								}
+							}
+						}
+					}
+				}
+			}
+		}
+		return process;
+	}
 	
 	public void setStatus(IDuccProcess inventoryProcess) {
 		String methodName = "setStatus";
 		logger.trace(methodName, null, messages.fetch("enter"));
+		DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
 		try {
 			DuccId processId = inventoryProcess.getDuccId();
 			logger.debug(methodName, null, processId, messages.fetchLabel("node")+inventoryProcess.getNodeIdentity().getName()+" "+messages.fetchLabel("PID")+inventoryProcess.getPID());
@@ -667,59 +684,58 @@ public class ProcessAccounting {
 			synchronized(workMap) {
 				ts.using();
 				if(processToJobMap.containsKey(processId)) {
+					logger.trace(methodName, null, processId, "key found");
 					DuccId jobId = getJobId(processId);
-					IDuccWork duccWork = WorkMapHelper.findDuccWork(workMap, jobId, this, methodName);
-					if(duccWork != null) {
-						if(duccWork instanceof IDuccWorkExecutable) {
-							IDuccWorkExecutable duccWorkExecutable = (IDuccWorkExecutable) duccWork;
+					logger.trace(methodName, jobId, processId, "jobId from process map");
+					IDuccWork dw = WorkMapHelper.findDuccWork(workMap, jobId, this, methodName);
+					if(dw != null) {
+						logger.trace(methodName, dw.getDuccId(), processId, "entity found in work map");
+						if(dw instanceof IDuccWorkExecutable) {
 							IDuccWorkJob job = null;
-							if(duccWork instanceof IDuccWorkJob) { 
-								job = (IDuccWorkJob)duccWork;
-							}
-							IDuccProcessMap processMap = duccWorkExecutable.getProcessMap();
-							IDuccProcess process = processMap.get(processId);
-							if(process == null) {
-								if(job != null) { 
-									process = job.getDriver().getProcessMap().get(processId);
-									OrchestratorHelper.jdDeallocate(job, inventoryProcess);
-								}
-							}
-							if(process != null) {
-								if(process.isComplete()) {
-									logger.trace(methodName, jobId, process.getDuccId(), "finalized");
+							if(dw instanceof IDuccWorkJob) { 
+								job = (IDuccWorkJob)dw;
+								IDuccProcess process = getProcess(job, processId);
+								if(process != null) {
+									logger.trace(methodName, job.getDuccId(), processId, "process found");
+									if(process.isComplete()) {
+										logger.trace(methodName, jobId, process.getDuccId(), "finalized");
+									}
+									else {
+										logger.trace(methodName, jobId, process.getDuccId(), "active");
+										// PID
+										copyInventoryPID(job, inventoryProcess, process);
+										// Scheduler State
+										setResourceStateAndReason(job, inventoryProcess, process);
+										// Process State
+										copyInventoryProcessState(job, inventoryProcess, process);
+										// Process Reason
+										copyReasonForStoppingProcess(job, inventoryProcess, process);
+										// Process Exit code
+										copyProcessExitCode(job, inventoryProcess, process);
+										// Process Init & Run times
+										updateProcessTime(job, inventoryProcess, process);
+										// Process Initialization State
+										updateProcessInitilization(job, inventoryProcess, process);
+										// Process Pipeline Components State
+										copyUimaPipelineComponentsState(job, inventoryProcess, process);
+										// Process Swap Usage
+										copyInventorySwapUsage(job, inventoryProcess, process);
+										// Process Major Faults
+										copyInventoryMajorFaults(job, inventoryProcess, process);
+										// Process Rss
+										copyInventoryRss(job, inventoryProcess, process);
+										// Process GC Stats
+										copyInventoryGCStats(job, inventoryProcess, process);
+										// Process CPU Time
+										copyInventoryCpuTime(job, inventoryProcess, process);
+									}
 								}
 								else {
-									logger.trace(methodName, jobId, process.getDuccId(), "active");
-									// PID
-									copyInventoryPID(job, inventoryProcess, process);
-									// Scheduler State
-									setResourceStateAndReason(job, inventoryProcess, process);
-									// Process State
-									copyInventoryProcessState(job, inventoryProcess, process);
-									// Process Reason
-									copyReasonForStoppingProcess(job, inventoryProcess, process);
-									// Process Exit code
-									copyProcessExitCode(job, inventoryProcess, process);
-									// Process Init & Run times
-									updateProcessTime(job, inventoryProcess, process);
-									// Process Initialization State
-									updateProcessInitilization(job, inventoryProcess, process);
-									// Process Pipeline Components State
-									copyUimaPipelineComponentsState(job, inventoryProcess, process);
-									// Process Swap Usage
-									copyInventorySwapUsage(job, inventoryProcess, process);
-									// Process Major Faults
-									copyInventoryMajorFaults(job, inventoryProcess, process);
-									// Process Rss
-									copyInventoryRss(job, inventoryProcess, process);
-									// Process GC Stats
-									copyInventoryGCStats(job, inventoryProcess, process);
-									// Process CPU Time
-									copyInventoryCpuTime(job, inventoryProcess, process);
+									logger.warn(methodName, dw.getDuccId(), processId, messages.fetch("process not found in job's process table"));
 								}
 							}
 							else {
-								logger.warn(methodName, jobId, processId, messages.fetch("process not found job's process table"));
+								logger.warn(methodName, dw.getDuccId(), processId, "entity is not job");
 							}
 						}
 						else {
@@ -727,7 +743,7 @@ public class ProcessAccounting {
 						}
 					}
 					else {
-						logger.warn(methodName, jobId, processId, messages.fetch("job ID not found"));
+						logger.warn(methodName, jobId, processId, messages.fetch("ID not found"));
 					}
 				}
 				else {

Added: uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessToJobMap.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessToJobMap.java?rev=1764951&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessToJobMap.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessToJobMap.java Fri Oct 14 19:19:31 2016
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+
+/**
+ * Keep a map of processes-to-jobs to minimize searching job process
+ * maps to discover which job a particular process belongs to.
+ */
+
+public class ProcessToJobMap {
+
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(ProcessToJobMap.class.getName());
+	private static final DuccId jobid = null;
+	
+	private static ProcessToJobMap instance = new ProcessToJobMap();
+	
+	public static ProcessToJobMap getInstance() {
+		return instance;
+	}
+	
+	private ConcurrentHashMap<DuccId,DuccId> processToJobMap = new ConcurrentHashMap<DuccId,DuccId>();
+
+	public ConcurrentHashMap<DuccId,DuccId> getMap() {
+		ConcurrentHashMap<DuccId,DuccId> retVal = new ConcurrentHashMap<DuccId,DuccId>();
+		retVal.putAll(processToJobMap);
+		return retVal;
+	}
+	
+	public void putMap(ConcurrentHashMap<DuccId,DuccId> map) {
+		String location = "putMap";
+		if(map != null) {
+			logger.debug(location, jobid, map.size());
+			for(Entry<DuccId, DuccId> entry : map.entrySet()) {
+				this.put(entry.getKey(),entry.getValue());
+			}
+		}
+	}
+	
+	public boolean containsKey(DuccId key) {
+		return processToJobMap.containsKey(key);
+	}
+	
+	public DuccId put(DuccId processId, DuccId jobId) {
+		String location = "put";
+		DuccId retVal = processToJobMap.put(processId, jobId);
+		logger.debug(location, jobId, processId, "size="+processToJobMap.size());
+		return retVal;
+	}
+	
+	public DuccId remove(DuccId processId) {
+		String location = "remove";
+		DuccId jobId = processToJobMap.get(processId);
+		DuccId retVal = processToJobMap.remove(processId);
+		logger.debug(location, jobId, processId, "size="+processToJobMap.size());
+		return retVal;
+	}
+	
+	public DuccId get(DuccId key) {
+		String location = "get";
+		DuccId retVal = processToJobMap.get(key);
+		logger.debug(location, retVal, key, "size="+processToJobMap.size());
+		return retVal;
+	}
+	
+	public int size() {
+		return processToJobMap.size();
+	}
+}

Propchange: uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessToJobMap.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessToJobMap.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java?rev=1764951&r1=1764950&r2=1764951&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java Fri Oct 14 19:19:31 2016
@@ -84,6 +84,7 @@ import org.apache.uima.ducc.transport.ev
 
 public class StateManager {
 	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(StateManager.class.getName());
+	private static final DuccId jobid = null;
 	
 	private static StateManager stateManager = new StateManager();
 	
@@ -238,8 +239,29 @@ public class StateManager {
 		return retVal;
 	}
 	
+	private void dumper() {
+		String location = "dumper";
+		try {
+			DuccWorkMap dwMap = orchestratorCommonArea.getWorkMap();
+			for(DuccId duccId : dwMap.keySet()) {
+				IDuccWork dw = dwMap.findDuccWork(duccId);
+				if(dw != null) {
+					logger.trace(location, duccId, "dw: "+dw.getDuccType());
+				}
+			}
+			ConcurrentHashMap<DuccId, DuccId> p2jMap = ProcessToJobMap.getInstance().getMap();
+			for(Entry<DuccId, DuccId> entry : p2jMap.entrySet()) {
+				logger.trace(location, jobid, "p:"+entry.getKey()+" "+"j:"+entry.getValue());
+			}
+		}
+		catch(Exception e) {
+			logger.error(location, jobid, e);
+		}
+	}
+	
 	public int prune(DuccWorkMap workMap) {
 		String methodName = "prune";
+		dumper();
 		int changes = 0;
 		logger.trace(methodName, null, messages.fetch("enter"));
 		long t0 = System.currentTimeMillis();
@@ -268,15 +290,32 @@ public class StateManager {
 						WorkMapHelper.removeDuccWork(workMap, duccWorkJob, this, methodName);
 						logger.info(methodName, duccId, messages.fetch("removed job"));
 						changes ++;
-						IDuccProcessMap processMap = duccWorkJob.getProcessMap();
-						Iterator<DuccId> processMapIterator = processMap.keySet().iterator();
-						while(processMapIterator.hasNext()) {
-							DuccId processDuccId = processMapIterator.next();
-							orchestratorCommonArea.getProcessAccounting().removeProcess(processDuccId);
-							logger.info(methodName, duccId, messages.fetch("removed process")+" "+processDuccId.toString());
-							changes ++;
+						IDuccProcessMap processMap = null;
+						DuccWorkPopDriver driver = duccWorkJob.getDriver();
+						if(driver != null) {
+							processMap = driver.getProcessMap();
+						}
+						if(processMap != null) {
+							Iterator<DuccId> processMapIterator = processMap.keySet().iterator();
+							while(processMapIterator.hasNext()) {
+								DuccId processDuccId = processMapIterator.next();
+								orchestratorCommonArea.getProcessAccounting().removeProcess(processDuccId);
+								logger.info(methodName, duccId, messages.fetch("removed driver process")+" "+processDuccId.toString());
+								changes ++;
+							}
+							logger.info(methodName, duccId, messages.fetch("processes driver inactive"));
+						}
+						processMap = duccWorkJob.getProcessMap();
+						if(processMap != null) {
+							Iterator<DuccId> processMapIterator = processMap.keySet().iterator();
+							while(processMapIterator.hasNext()) {
+								DuccId processDuccId = processMapIterator.next();
+								orchestratorCommonArea.getProcessAccounting().removeProcess(processDuccId);
+								logger.info(methodName, duccId, messages.fetch("removed process")+" "+processDuccId.toString());
+								changes ++;
+							}
+							logger.info(methodName, duccId, messages.fetch("processes inactive"));
 						}
-						logger.info(methodName, duccId, messages.fetch("processes inactive"));
 					}
 					else {
 						logger.debug(methodName, duccId, messages.fetch("processes active"));
@@ -302,7 +341,7 @@ public class StateManager {
 		if(elapsed > Constants.SYNC_LIMIT) {
 			logger.debug(methodName, null, "elapsed msecs: "+elapsed);
 		}
-		logger.debug(methodName, null, "processToWorkMap.size()="+orchestratorCommonArea.getProcessAccounting().processCount());
+		logger.debug(methodName, null, "processToWorkMap.size="+orchestratorCommonArea.getProcessAccounting().processCount());
 		if(changes > 0) {
 			OrchestratorCheckpoint.getInstance().saveState();
 		}