You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2019/08/02 17:27:49 UTC

svn commit: r1864247 [10/10] - in /uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent: ./ config/ deploy/ deploy/uima/ event/ exceptions/ launcher/ metrics/collectors/ monitor/ processors/

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java Fri Aug  2 17:27:48 2019
@@ -37,222 +37,227 @@ import org.apache.uima.ducc.transport.ev
  * 
  */
 public class DefaultNodeInventoryProcessor implements NodeInventoryProcessor {
-	DuccLogger logger = new DuccLogger(this.getClass(), "AGENT");
-	boolean inventoryChanged = true;
-	private NodeAgent agent;
-	private Map<DuccId, IDuccProcess> previousInventory;
-	private int forceInventoryUpdateMaxThreshold = 0;
-	private long counter = 0;
-
-	public DefaultNodeInventoryProcessor(NodeAgent agent,
-			String inventoryPublishRateSkipCount) {
-		this.agent = agent;
-		try {
-			forceInventoryUpdateMaxThreshold = Integer
-					.parseInt(inventoryPublishRateSkipCount);
-		} catch (Exception e) {
-		}
-		// Dont allow 0
-		if (forceInventoryUpdateMaxThreshold == 0) {
-			forceInventoryUpdateMaxThreshold = 1;
-		}
-	}
-
-	/**
-	 * Get a copy of agent {@code Process} inventory
-	 */
-	public Map<DuccId, IDuccProcess> getInventory() {
-		return agent.getInventoryCopy();
-	}
-
-	public void dispatchInventoryUpdate(DuccEventDispatcher dispatcher, String targetEndpoint, Map<DuccId, IDuccProcess> inventory) throws Exception {
-		NodeInventoryUpdateDuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(inventory,agent.getLastORSequence(), agent.getIdentity());
-		dispatcher.dispatch(targetEndpoint, duccEvent);
-		logger.info("dispatchInventoryUpdate", null, "Agent dispatched inventory update event to endpoint:"+targetEndpoint);
-	}
-
-	/**
-	 * 
-	 */
-	public void process(Exchange outgoingMessage) throws Exception {
-		String methodName = "process";
-		// Get a deep copy of agent's inventory
-		Map<DuccId, IDuccProcess> inventory = getInventory();
-		// Determine if the inventory changed since the last publishing was done
-		// First check if the inventory expanded or shrunk. If the same in size,
-		// compare process states and PID. If either of the two changed for any
-		// of the processes trigger immediate publish. If no changes found,
-		// publish
-		// according to skip counter
-		// (ducc.agent.node.inventory.publish.rate.skip)
-		// configured in ducc.properties.
-		if (previousInventory != null) {
-			if (agent.getEventListener().forceInvotoryUpdate()) {
-				inventoryChanged = true;
-				agent.getEventListener().resetForceInventoryUpdateFlag();
-			}
-			if (inventory.size() != previousInventory.size()) {
-				inventoryChanged = true;
-			} else {
-				// Inventory maps are equal in size, check if all processes in
-				// the current
-				// inventory exist in the previous inventory snapshot. If not,
-				// it means that
-				// that perhaps a new process was added and one was removed. In
-				// this case,
-				// force the publish, since there was a change.
-				for (Map.Entry<DuccId, IDuccProcess> currentProcess : inventory.entrySet()) {
-					// Check if a process in the current inventory exists in a
-					// previous
-					// inventory snapshot
-					if (previousInventory.containsKey(currentProcess.getKey())) {
-						IDuccProcess previousProcess = previousInventory.get(currentProcess.getKey());
-						// check if either PID or process state has changed
-						if (currentProcess.getValue().getPID() != null && previousProcess.getPID() == null) {
-							inventoryChanged = true;
-							break;
-						} else if (!currentProcess.getValue().getProcessState()
-								.equals(previousProcess.getProcessState())) {
-							inventoryChanged = true;
-							break;
-						} else {
-							List<IUimaPipelineAEComponent> breakdown = currentProcess.getValue()
-									.getUimaPipelineComponents();
-							if (breakdown != null && breakdown.size() > 0) {
-								List<IUimaPipelineAEComponent> previousBreakdown = previousProcess
-										.getUimaPipelineComponents();
-								if (previousBreakdown == null || previousBreakdown.size() == 0
-										|| breakdown.size() != previousBreakdown.size()) {
-									inventoryChanged = true;
-								} else {
-									for (IUimaPipelineAEComponent uimaAeState : breakdown) {
-										boolean found = false;
-										for (IUimaPipelineAEComponent previousUimaAeState : previousBreakdown) {
-											if (uimaAeState.getAeName().equals(previousUimaAeState.getAeName())) {
-												found = true;
-												if (!uimaAeState.getAeState().equals(previousUimaAeState.getAeState())
-														|| uimaAeState.getInitializationTime() != previousUimaAeState
-																.getInitializationTime()) {
-													inventoryChanged = true;
-													break;
-												}
-											}
-										}
-										if (!found) {
-											inventoryChanged = true;
-										}
-
-										if (inventoryChanged) {
-											break;
-										}
-
-									}
-								}
-
-							}
-						}
-					} else {
-						// New inventory contains a process not in the previous
-						// snapshot
-						inventoryChanged = true;
-						break;
-					}
-				}
-			}
-		}
-
-		// Get this inventory snapshot
-		previousInventory = inventory;
-		// Broadcast inventory if there is a change or configured number of
-		// epochs
-		// passed since the last broadcast. This is configured in
-		// ducc.properties with
-		// property ducc.agent.node.inventory.publish.rate.skip
-		try {
-			if (inventory.size() > 0 && (inventoryChanged || // if there is
-																// inventory
-																// change,
-																// publish
-					forceInventoryUpdateMaxThreshold == 0 || // skip rate in
-																// ducc.properties
-																// is zero,
-																// publish
-					(counter > 0 && (counter % forceInventoryUpdateMaxThreshold) == 0))) { // if
-																							// reached
-																							// skip
-																							// rate,
-																							// publish
-
-				StringBuffer sb = new StringBuffer("Node Inventory (" + inventory.size() + ")");
-				for (Map.Entry<DuccId, IDuccProcess> p : inventory.entrySet()) {
-					/*
-					 * long endInitLong = 0; String endInit = ""; ITimeWindow wInit =
-					 * p.getValue().getTimeWindowInit(); if(wInit != null) { endInit =
-					 * wInit.getEnd(); endInitLong = wInit.getEndLong(); } long startRunLong = 0;
-					 * String startRun = ""; ITimeWindow wRun = p.getValue().getTimeWindowRun();
-					 * if(wRun != null) { startRun = wRun.getStart(); startRunLong =
-					 * wRun.getStartLong(); } if(endInitLong > startRunLong) {
-					 * logger.warn(methodName, null,
-					 * "endInit:"+endInitLong+" "+"startRun:"+startRunLong); }
-					 */
-					if (p.getValue().getUimaPipelineComponents() == null) {
-						p.getValue().setUimaPipelineComponents(new ArrayList<IUimaPipelineAEComponent>());
-					}
-					if (!p.getValue().getProcessState().equals(ProcessState.Initializing)) {
-						p.getValue().getUimaPipelineComponents().clear();
-					}
-					int pipelineInitStats = (p.getValue().getUimaPipelineComponents() == null) ? 0
-							: p.getValue().getUimaPipelineComponents().size();
-					StringBuffer gcInfo = new StringBuffer();
-					if (p.getValue().getGarbageCollectionStats() != null) {
-						gcInfo.append(" GC Total=")
-								.append(p.getValue().getGarbageCollectionStats().getCollectionCount())
-								.append(" GC Time=")
-								.append(p.getValue().getGarbageCollectionStats().getCollectionTime()).append(" ");
-
-					}
-					sb.append("\n\t[Process Type=").append(p.getValue().getProcessType()).append(" DUCC ID=")
-							.append(p.getValue().getDuccId()).append(" PID=").append(p.getValue().getPID())
-							.append(" State=").append(p.getValue().getProcessState()).append(" Resident Memory=")
-							.append(p.getValue().getResidentMemory()).append(gcInfo.toString())
-							.append(" Init Stats List Size:" + pipelineInitStats)
-							.append(" Reason: " + p.getValue().getReasonForStoppingProcess()).append("] ");
-					if (p.getValue().getProcessState().equals(ProcessState.Stopped)
-							|| p.getValue().getProcessState().equals(ProcessState.Failed)
-							|| p.getValue().getProcessState().equals(ProcessState.Killed)) {
-						sb.append(" Reason:" + p.getValue().getReasonForStoppingProcess());
-						sb.append(" Extended Reason:" + p.getValue().getExtendedReasonForStoppingProcess());
-					}
-
-					if (!p.getValue().getProcessState().equals(ProcessState.Running)
-							&& !p.getValue().getProcessState().equals(ProcessState.Initializing)) {
-						sb.append(" Exit Code=" + p.getValue().getProcessExitCode());
-					}
-
-				}
-				logger.info(methodName, null,
-						"Agent " + agent.getIdentity().getCanonicalName() + " Posting Inventory:" + sb.toString());
-				outgoingMessage.getIn().setBody(
-						new NodeInventoryUpdateDuccEvent(inventory, agent.getLastORSequence(), agent.getIdentity()));
-
-			} else {
-				// Add null to the body of the message. A filter
-				// defined in the Camel route (AgentConfiguration.java)
-				// has a predicate to check for null body and throws
-				// away such a message.
-				outgoingMessage.getIn().setBody(null);
-			}
-		} catch (Exception e) {
-			logger.error(methodName, null, e);
-		} finally {
-			if (inventoryChanged) {
-				counter = 0;
-			} else {
-				counter++;
-			}
-			inventoryChanged = false;
-		}
+  DuccLogger logger = new DuccLogger(this.getClass(), "AGENT");
 
-	}
+  boolean inventoryChanged = true;
+
+  private NodeAgent agent;
+
+  private Map<DuccId, IDuccProcess> previousInventory;
+
+  private int forceInventoryUpdateMaxThreshold = 0;
+
+  private long counter = 0;
+
+  public DefaultNodeInventoryProcessor(NodeAgent agent, String inventoryPublishRateSkipCount) {
+    this.agent = agent;
+    try {
+      forceInventoryUpdateMaxThreshold = Integer.parseInt(inventoryPublishRateSkipCount);
+    } catch (Exception e) {
+    }
+    // Dont allow 0
+    if (forceInventoryUpdateMaxThreshold == 0) {
+      forceInventoryUpdateMaxThreshold = 1;
+    }
+  }
+
+  /**
+   * Get a copy of agent {@code Process} inventory
+   */
+  public Map<DuccId, IDuccProcess> getInventory() {
+    return agent.getInventoryCopy();
+  }
+
+  public void dispatchInventoryUpdate(DuccEventDispatcher dispatcher, String targetEndpoint,
+          Map<DuccId, IDuccProcess> inventory) throws Exception {
+    NodeInventoryUpdateDuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(inventory,
+            agent.getLastORSequence(), agent.getIdentity());
+    dispatcher.dispatch(targetEndpoint, duccEvent);
+    logger.info("dispatchInventoryUpdate", null,
+            "Agent dispatched inventory update event to endpoint:" + targetEndpoint);
+  }
+
+  /**
+   * 
+   */
+  public void process(Exchange outgoingMessage) throws Exception {
+    String methodName = "process";
+    // Get a deep copy of agent's inventory
+    Map<DuccId, IDuccProcess> inventory = getInventory();
+    // Determine if the inventory changed since the last publishing was done
+    // First check if the inventory expanded or shrunk. If the same in size,
+    // compare process states and PID. If either of the two changed for any
+    // of the processes trigger immediate publish. If no changes found,
+    // publish
+    // according to skip counter
+    // (ducc.agent.node.inventory.publish.rate.skip)
+    // configured in ducc.properties.
+    if (previousInventory != null) {
+      if (agent.getEventListener().forceInvotoryUpdate()) {
+        inventoryChanged = true;
+        agent.getEventListener().resetForceInventoryUpdateFlag();
+      }
+      if (inventory.size() != previousInventory.size()) {
+        inventoryChanged = true;
+      } else {
+        // Inventory maps are equal in size, check if all processes in
+        // the current
+        // inventory exist in the previous inventory snapshot. If not,
+        // it means that
+        // that perhaps a new process was added and one was removed. In
+        // this case,
+        // force the publish, since there was a change.
+        for (Map.Entry<DuccId, IDuccProcess> currentProcess : inventory.entrySet()) {
+          // Check if a process in the current inventory exists in a
+          // previous
+          // inventory snapshot
+          if (previousInventory.containsKey(currentProcess.getKey())) {
+            IDuccProcess previousProcess = previousInventory.get(currentProcess.getKey());
+            // check if either PID or process state has changed
+            if (currentProcess.getValue().getPID() != null && previousProcess.getPID() == null) {
+              inventoryChanged = true;
+              break;
+            } else if (!currentProcess.getValue().getProcessState()
+                    .equals(previousProcess.getProcessState())) {
+              inventoryChanged = true;
+              break;
+            } else {
+              List<IUimaPipelineAEComponent> breakdown = currentProcess.getValue()
+                      .getUimaPipelineComponents();
+              if (breakdown != null && breakdown.size() > 0) {
+                List<IUimaPipelineAEComponent> previousBreakdown = previousProcess
+                        .getUimaPipelineComponents();
+                if (previousBreakdown == null || previousBreakdown.size() == 0
+                        || breakdown.size() != previousBreakdown.size()) {
+                  inventoryChanged = true;
+                } else {
+                  for (IUimaPipelineAEComponent uimaAeState : breakdown) {
+                    boolean found = false;
+                    for (IUimaPipelineAEComponent previousUimaAeState : previousBreakdown) {
+                      if (uimaAeState.getAeName().equals(previousUimaAeState.getAeName())) {
+                        found = true;
+                        if (!uimaAeState.getAeState().equals(previousUimaAeState.getAeState())
+                                || uimaAeState.getInitializationTime() != previousUimaAeState
+                                        .getInitializationTime()) {
+                          inventoryChanged = true;
+                          break;
+                        }
+                      }
+                    }
+                    if (!found) {
+                      inventoryChanged = true;
+                    }
+
+                    if (inventoryChanged) {
+                      break;
+                    }
+
+                  }
+                }
+
+              }
+            }
+          } else {
+            // New inventory contains a process not in the previous
+            // snapshot
+            inventoryChanged = true;
+            break;
+          }
+        }
+      }
+    }
+
+    // Get this inventory snapshot
+    previousInventory = inventory;
+    // Broadcast inventory if there is a change or configured number of
+    // epochs
+    // passed since the last broadcast. This is configured in
+    // ducc.properties with
+    // property ducc.agent.node.inventory.publish.rate.skip
+    try {
+      if (inventory.size() > 0 && (inventoryChanged || // if there is
+      // inventory
+      // change,
+      // publish
+              forceInventoryUpdateMaxThreshold == 0 || // skip rate in
+              // ducc.properties
+              // is zero,
+              // publish
+              (counter > 0 && (counter % forceInventoryUpdateMaxThreshold) == 0))) { // if
+        // reached
+        // skip
+        // rate,
+        // publish
+
+        StringBuffer sb = new StringBuffer("Node Inventory (" + inventory.size() + ")");
+        for (Map.Entry<DuccId, IDuccProcess> p : inventory.entrySet()) {
+          /*
+           * long endInitLong = 0; String endInit = ""; ITimeWindow wInit =
+           * p.getValue().getTimeWindowInit(); if(wInit != null) { endInit = wInit.getEnd();
+           * endInitLong = wInit.getEndLong(); } long startRunLong = 0; String startRun = "";
+           * ITimeWindow wRun = p.getValue().getTimeWindowRun(); if(wRun != null) { startRun =
+           * wRun.getStart(); startRunLong = wRun.getStartLong(); } if(endInitLong > startRunLong) {
+           * logger.warn(methodName, null, "endInit:"+endInitLong+" "+"startRun:"+startRunLong); }
+           */
+          if (p.getValue().getUimaPipelineComponents() == null) {
+            p.getValue().setUimaPipelineComponents(new ArrayList<IUimaPipelineAEComponent>());
+          }
+          if (!p.getValue().getProcessState().equals(ProcessState.Initializing)) {
+            p.getValue().getUimaPipelineComponents().clear();
+          }
+          int pipelineInitStats = (p.getValue().getUimaPipelineComponents() == null) ? 0
+                  : p.getValue().getUimaPipelineComponents().size();
+          StringBuffer gcInfo = new StringBuffer();
+          if (p.getValue().getGarbageCollectionStats() != null) {
+            gcInfo.append(" GC Total=")
+                    .append(p.getValue().getGarbageCollectionStats().getCollectionCount())
+                    .append(" GC Time=")
+                    .append(p.getValue().getGarbageCollectionStats().getCollectionTime())
+                    .append(" ");
+
+          }
+          sb.append("\n\t[Process Type=").append(p.getValue().getProcessType()).append(" DUCC ID=")
+                  .append(p.getValue().getDuccId()).append(" PID=").append(p.getValue().getPID())
+                  .append(" State=").append(p.getValue().getProcessState())
+                  .append(" Resident Memory=").append(p.getValue().getResidentMemory())
+                  .append(gcInfo.toString()).append(" Init Stats List Size:" + pipelineInitStats)
+                  .append(" Reason: " + p.getValue().getReasonForStoppingProcess()).append("] ");
+          if (p.getValue().getProcessState().equals(ProcessState.Stopped)
+                  || p.getValue().getProcessState().equals(ProcessState.Failed)
+                  || p.getValue().getProcessState().equals(ProcessState.Killed)) {
+            sb.append(" Reason:" + p.getValue().getReasonForStoppingProcess());
+            sb.append(" Extended Reason:" + p.getValue().getExtendedReasonForStoppingProcess());
+          }
+
+          if (!p.getValue().getProcessState().equals(ProcessState.Running)
+                  && !p.getValue().getProcessState().equals(ProcessState.Initializing)) {
+            sb.append(" Exit Code=" + p.getValue().getProcessExitCode());
+          }
+
+        }
+        logger.info(methodName, null, "Agent " + agent.getIdentity().getCanonicalName()
+                + " Posting Inventory:" + sb.toString());
+        outgoingMessage.getIn().setBody(new NodeInventoryUpdateDuccEvent(inventory,
+                agent.getLastORSequence(), agent.getIdentity()));
+
+      } else {
+        // Add null to the body of the message. A filter
+        // defined in the Camel route (AgentConfiguration.java)
+        // has a predicate to check for null body and throws
+        // away such a message.
+        outgoingMessage.getIn().setBody(null);
+      }
+    } catch (Exception e) {
+      logger.error(methodName, null, e);
+    } finally {
+      if (inventoryChanged) {
+        counter = 0;
+      } else {
+        counter++;
+      }
+      inventoryChanged = false;
+    }
+
+  }
 
 }

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeMetricsProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeMetricsProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeMetricsProcessor.java Fri Aug  2 17:27:48 2019
@@ -40,68 +40,67 @@ import org.apache.uima.ducc.common.node.
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.transport.event.NodeMetricsUpdateDuccEvent;
 
+public class DefaultNodeMetricsProcessor extends BaseProcessor implements NodeMetricsProcessor {
+  private NodeAgent agent;
 
-public class DefaultNodeMetricsProcessor extends BaseProcessor implements
-		NodeMetricsProcessor {
-	private  NodeAgent agent;
-	
-	private ExecutorService pool = Executors.newFixedThreadPool(1);
-	
-	DuccLogger logger = DuccLogger.getLogger(this.getClass(), Agent.COMPONENT_NAME);
-	
-	/*
-	public DefaultNodeMetricsProcessor(final NodeAgent agent) throws Exception {
-		this.agent = agent;
-	}
-	*/
-	public void setAgent(NodeAgent agent ) {
-		this.agent = agent;
-	}
-	public void process(Exchange exchange) throws Exception {
-	  String methodName = "process";
-	  try {
-
-	    DefaultNodeMemoryCollector collector = new DefaultNodeMemoryCollector();
-	    Future<NodeMemory> nmiFuture = pool.submit(collector);
-
-	    DefaultNodeLoadAverageCollector loadAvgCollector = 
-	            new DefaultNodeLoadAverageCollector();
-	    Future<NodeLoadAverage> loadFuture = pool.submit(loadAvgCollector);
-
-	    NodeCpuCollector cpuCollector = new NodeCpuCollector();
-//	    Future<NodeCpuInfo> cpuFuture = pool.submit(cpuCollector);
-
-	    NodeCpuInfo cpuInfo = new NodeCpuInfo(agent.numProcessors, cpuCollector.call().getCurrentLoad());
-	    
-	    NodeUsersCollector nodeUsersCollector = new NodeUsersCollector(agent, logger);
-	    Future<TreeMap<String,NodeUsersInfo>> nuiFuture = pool.submit(nodeUsersCollector);
-		boolean cpuReportingEnabled = agent.cgroupsManager.isCpuReportingEnabled();
-
-	    NodeMetrics nodeMetrics = 
-	            new NodeMetrics(agent.getIdentity(), nmiFuture.get(), loadFuture.get(), 
-	                    cpuInfo, nuiFuture.get(), cpuReportingEnabled);
-	    if ( agent.isStopping()) {
-	    	nodeMetrics.disableNode();  // sends Unavailable status to clients (RM,WS)
-	    	logger.info(methodName, null,">>>>>>>>>>>>>>>>> Agent publishing State="+nodeMetrics.getNodeStatus()+" in Outgoing NodeMetrics");
-	    }
-	    //Node node = new DuccNode(new NodeIdentity(), nodeMetrics);
-	    // jrc 2011-07-30 I think this needs to be agent.getIdentity(), not create a new identity.
-	    Node node = new DuccNode(agent.getIdentity(), nodeMetrics, agent.useCgroups);
-		
-	    // Make the agent aware how much memory is available on the node. Do this once.
-		if ( agent.getNodeInfo() == null ) {
-			agent.setNodeInfo(node);
-		}
-	    logger.info(methodName, null, "... Agent "+node.getNodeIdentity().getCanonicalName()+" Posting Users:"+
-	            node.getNodeMetrics().getNodeUsersMap().size());
-	    
-	    NodeMetricsUpdateDuccEvent event = new NodeMetricsUpdateDuccEvent(node,agent.getInventoryRef().size());
-	    exchange.getIn().setBody(event, NodeMetricsUpdateDuccEvent.class);
-
-	  } catch( Exception e) {
-	    e.printStackTrace();
-	  }
+  private ExecutorService pool = Executors.newFixedThreadPool(1);
 
-	}
+  DuccLogger logger = DuccLogger.getLogger(this.getClass(), Agent.COMPONENT_NAME);
+
+  /*
+   * public DefaultNodeMetricsProcessor(final NodeAgent agent) throws Exception { this.agent =
+   * agent; }
+   */
+  public void setAgent(NodeAgent agent) {
+    this.agent = agent;
+  }
+
+  public void process(Exchange exchange) throws Exception {
+    String methodName = "process";
+    try {
+
+      DefaultNodeMemoryCollector collector = new DefaultNodeMemoryCollector();
+      Future<NodeMemory> nmiFuture = pool.submit(collector);
+
+      DefaultNodeLoadAverageCollector loadAvgCollector = new DefaultNodeLoadAverageCollector();
+      Future<NodeLoadAverage> loadFuture = pool.submit(loadAvgCollector);
+
+      NodeCpuCollector cpuCollector = new NodeCpuCollector();
+      // Future<NodeCpuInfo> cpuFuture = pool.submit(cpuCollector);
+
+      NodeCpuInfo cpuInfo = new NodeCpuInfo(agent.numProcessors,
+              cpuCollector.call().getCurrentLoad());
+
+      NodeUsersCollector nodeUsersCollector = new NodeUsersCollector(agent, logger);
+      Future<TreeMap<String, NodeUsersInfo>> nuiFuture = pool.submit(nodeUsersCollector);
+      boolean cpuReportingEnabled = agent.cgroupsManager.isCpuReportingEnabled();
+
+      NodeMetrics nodeMetrics = new NodeMetrics(agent.getIdentity(), nmiFuture.get(),
+              loadFuture.get(), cpuInfo, nuiFuture.get(), cpuReportingEnabled);
+      if (agent.isStopping()) {
+        nodeMetrics.disableNode(); // sends Unavailable status to clients (RM,WS)
+        logger.info(methodName, null, ">>>>>>>>>>>>>>>>> Agent publishing State="
+                + nodeMetrics.getNodeStatus() + " in Outgoing NodeMetrics");
+      }
+      // Node node = new DuccNode(new NodeIdentity(), nodeMetrics);
+      // jrc 2011-07-30 I think this needs to be agent.getIdentity(), not create a new identity.
+      Node node = new DuccNode(agent.getIdentity(), nodeMetrics, agent.useCgroups);
+
+      // Make the agent aware how much memory is available on the node. Do this once.
+      if (agent.getNodeInfo() == null) {
+        agent.setNodeInfo(node);
+      }
+      logger.info(methodName, null, "... Agent " + node.getNodeIdentity().getCanonicalName()
+              + " Posting Users:" + node.getNodeMetrics().getNodeUsersMap().size());
+
+      NodeMetricsUpdateDuccEvent event = new NodeMetricsUpdateDuccEvent(node,
+              agent.getInventoryRef().size());
+      exchange.getIn().setBody(event, NodeMetricsUpdateDuccEvent.class);
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+  }
 
 }

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultProcessMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultProcessMetricsProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultProcessMetricsProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultProcessMetricsProcessor.java Fri Aug  2 17:27:48 2019
@@ -22,21 +22,22 @@ import org.apache.camel.Exchange;
 import org.apache.uima.ducc.agent.NodeAgent;
 import org.apache.uima.ducc.transport.event.common.IDuccProcess;
 
-
 public class DefaultProcessMetricsProcessor implements ProcessMetricsProcessor {
-   // private NodeAgent agent;
-   // private IDuccProcess process;
-    
-	public DefaultProcessMetricsProcessor( IDuccProcess process, NodeAgent agent) {
-	//	this.agent = agent;
-	//	this.process = process;
-	}
-	public void process(Exchange arg0) throws Exception {
-	}
-	@Override
-	public void stop() {
-		// TODO Auto-generated method stub
-		
-	}
+  // private NodeAgent agent;
+  // private IDuccProcess process;
+
+  public DefaultProcessMetricsProcessor(IDuccProcess process, NodeAgent agent) {
+    // this.agent = agent;
+    // this.process = process;
+  }
+
+  public void process(Exchange arg0) throws Exception {
+  }
+
+  @Override
+  public void stop() {
+    // TODO Auto-generated method stub
+
+  }
 
 }

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java Fri Aug  2 17:27:48 2019
@@ -41,160 +41,171 @@ import org.apache.uima.ducc.common.node.
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.transport.event.NodeMetricsUpdateDuccEvent;
 
+public class LinuxNodeMetricsProcessor extends BaseProcessor implements NodeMetricsProcessor {
+  DuccLogger logger = DuccLogger.getLogger(this.getClass(), Agent.COMPONENT_NAME);
 
-public class LinuxNodeMetricsProcessor extends BaseProcessor implements
-		NodeMetricsProcessor {
-	DuccLogger logger = DuccLogger.getLogger(this.getClass(), Agent.COMPONENT_NAME);
-    public static String[] MeminfoTargetFields = new String[] {"MemTotal:","MemFree:","SwapTotal:","SwapFree:"};
-        
-	private NodeAgent agent;
-    private String osname;
-    private String osversion;
-    private String osarch;
-	private final ExecutorService pool;
-	private RandomAccessFile memInfoFile;
-	private RandomAccessFile loadAvgFile;
-	//private Node node;
-	private int swapThreshold = 0;
-//	public LinuxNodeMetricsProcessor(NodeAgent agent, String memInfoFilePath,
-//			String loadAvgFilePath) throws FileNotFoundException {
-	public LinuxNodeMetricsProcessor() {
-		super();
-//		this.agent = agent;
-		pool = Executors.newCachedThreadPool();
-		// open files and keep them open until stop() is called
-//		memInfoFile = new RandomAccessFile(memInfoFilePath, "r");
-//		loadAvgFile = new RandomAccessFile(loadAvgFilePath, "r");
-		//node = new DuccNode(agent.getIdentity(), null);
-
-		osname = System.getProperty("os.name");
-		osversion = System.getProperty("os.version");
-		osarch = System.getProperty("os.arch");
-
-		if ( System.getProperty("ducc.node.min.swap.threshold") != null ) {
-	    try {
-	      swapThreshold = Integer.valueOf(System.getProperty("ducc.node.min.swap.threshold"));
-	      logger.info("ctor", null, "Ducc Node Min Swap Threshold:"+swapThreshold);
-	    } catch( Exception e) {
-	    }
-		}
-	}
-	public void setAgent(NodeAgent agent) {
-		this.agent = agent;
-	}
-	public void initMemInfo(String memInfoFilePath) throws Exception {
-		this.memInfoFile = new RandomAccessFile(memInfoFilePath, "r");
-		
-	}
-    public void initLoadAvg(String loadAvgFilePath) throws Exception {
-    	this.loadAvgFile = new RandomAccessFile(loadAvgFilePath, "r");
+  public static String[] MeminfoTargetFields = new String[] { "MemTotal:", "MemFree:", "SwapTotal:",
+      "SwapFree:" };
+
+  private NodeAgent agent;
+
+  private String osname;
+
+  private String osversion;
+
+  private String osarch;
+
+  private final ExecutorService pool;
+
+  private RandomAccessFile memInfoFile;
+
+  private RandomAccessFile loadAvgFile;
+
+  // private Node node;
+  private int swapThreshold = 0;
+
+  // public LinuxNodeMetricsProcessor(NodeAgent agent, String memInfoFilePath,
+  // String loadAvgFilePath) throws FileNotFoundException {
+  public LinuxNodeMetricsProcessor() {
+    super();
+    // this.agent = agent;
+    pool = Executors.newCachedThreadPool();
+    // open files and keep them open until stop() is called
+    // memInfoFile = new RandomAccessFile(memInfoFilePath, "r");
+    // loadAvgFile = new RandomAccessFile(loadAvgFilePath, "r");
+    // node = new DuccNode(agent.getIdentity(), null);
+
+    osname = System.getProperty("os.name");
+    osversion = System.getProperty("os.version");
+    osarch = System.getProperty("os.arch");
+
+    if (System.getProperty("ducc.node.min.swap.threshold") != null) {
+      try {
+        swapThreshold = Integer.valueOf(System.getProperty("ducc.node.min.swap.threshold"));
+        logger.info("ctor", null, "Ducc Node Min Swap Threshold:" + swapThreshold);
+      } catch (Exception e) {
+      }
     }
+  }
 
-	public void stop() {
-		try {
-			if (memInfoFile != null) {
-				memInfoFile.close();
-			}
-			if (loadAvgFile != null) {
-				loadAvgFile.close();
-			}
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-	}
-
-	/**
-	 * Collects node's metrics and dumps it to a JMS topic. Currently collects
-	 * memory utilization from /proc/meminfo and average load from
-	 * /proc/loadavg. This method is called from NodeAgentStatsGenerator at
-	 * fixed intervals.
-	 * 
-	 */
+  public void setAgent(NodeAgent agent) {
+    this.agent = agent;
+  }
+
+  public void initMemInfo(String memInfoFilePath) throws Exception {
+    this.memInfoFile = new RandomAccessFile(memInfoFilePath, "r");
+
+  }
+
+  public void initLoadAvg(String loadAvgFilePath) throws Exception {
+    this.loadAvgFile = new RandomAccessFile(loadAvgFilePath, "r");
+  }
+
+  public void stop() {
+    try {
+      if (memInfoFile != null) {
+        memInfoFile.close();
+      }
+      if (loadAvgFile != null) {
+        loadAvgFile.close();
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Collects node's metrics and dumps it to a JMS topic. Currently collects memory utilization from
+   * /proc/meminfo and average load from /proc/loadavg. This method is called from
+   * NodeAgentStatsGenerator at fixed intervals.
+   * 
+   */
   public void process(Exchange e) {
-		String methodName = "process";
-		try {
-			// every 10th node metrics publication log the status of CGroups
-			if ( ( NodeAgent.logCounter.incrementAndGet() % 10 ) == 0 ) {
-				if ( agent.useCgroups ) {
-				    logger.info(methodName, null, "\t****\n\t**** Agent CGroups status: enabled");
-					
-				} else {
-				    logger.info(methodName, null, "\t****\n\t**** Agent CGroups status: disabled. Reason:"+NodeAgent.cgroupFailureReason);
-					
-				}
-			}
-
-			NodeMemInfoCollector memCollector = new NodeMemInfoCollector(MeminfoTargetFields);
-			Future<NodeMemory> nmiFuture = pool.submit(memCollector);
-
-			NodeLoadAverageCollector loadAvgCollector = new NodeLoadAverageCollector();
-
-			Future<NodeLoadAverage> loadFuture = pool.submit(loadAvgCollector);
-			NodeCpuCollector cpuCollector = new NodeCpuCollector();
-//			Future<NodeCpuInfo> cpuFuture = pool.submit(cpuCollector);
-			NodeCpuInfo cpuInfo = new NodeCpuInfo(agent.numProcessors, String.valueOf(cpuCollector.call()));
-		    
-			e.getIn().setHeader("node", agent.getIdentity().getCanonicalName());
-			NodeMemory memInfo = nmiFuture.get();
-			TreeMap<String, NodeUsersInfo> users = null;
-			// begin collecting user processes and activate rogue process detector
-			// only after the agent receives the first Ducc state publication.
-			if ( agent.receivedDuccState ) {
-			    NodeUsersCollector nodeUsersCollector = new NodeUsersCollector(agent, logger);
-			    
-			    logger.debug(methodName, null, "... Agent Collecting User Processes");
-			    
-			    Future<TreeMap<String,NodeUsersInfo>> nuiFuture = 
-			            pool.submit(nodeUsersCollector);
-			    users = nuiFuture.get();
-			} else {
-				users = new TreeMap<String, NodeUsersInfo>();
-			}
-			NodeLoadAverage lav = loadFuture.get();
-			boolean cpuReportingEnabled = false;
-			if ( agent.cgroupsManager != null ) {
-				cpuReportingEnabled = agent.cgroupsManager.isCpuReportingEnabled();
-			}
-            NodeMetrics nodeMetrics = new NodeMetrics(agent.getIdentity(), memInfo, lav,
-              cpuInfo, users, cpuReportingEnabled);
-    	    if ( agent.isStopping()) {
-    	    	nodeMetrics.disableNode();  // sends Unavailable status to clients (RM,WS)
-    	    }
-
-			Node node = new DuccNode(agent.getIdentity(), nodeMetrics, agent.useCgroups);
-			// Make the agent aware how much memory is available on the node. Do this once.
-			if ( agent.getNodeInfo() == null ) {
-				agent.setNodeInfo(node);
-			}
-						
-			((DuccNode)node).duccLingExists(agent.duccLingExists());
-			((DuccNode)node).runWithDuccLing(agent.runWithDuccLing());
-			logger.info(methodName, null, "... Agent "+node.getNodeIdentity().getCanonicalName()+
-                                        " OS Name:" + osname +
-                                        " OS Version:" + osversion +
-                                        " OS Arch:" + osarch +
-					" CPU Count:" + cpuInfo.getAvailableProcessors() +
-					" CPU Load Average:" +lav.getLoadAvg1() +
-					" Posting Memory (KB):"
-					+ node.getNodeMetrics().getNodeMemory().getMemTotal()+
-					" Memory Free (KB):"+node.getNodeMetrics().getNodeMemory().getMemFree()+
-					" Swap Total (KB):"+node.getNodeMetrics().getNodeMemory().getSwapTotal()+
-					" Swap Free (KB):"+node.getNodeMetrics().getNodeMemory().getSwapFree()+
-					" Low Swap Threshold Defined in ducc.properties (KB):"+swapThreshold +
-					" CPU Reporting Enabled:"+cpuReportingEnabled +
-					" Node Status:"+nodeMetrics.getNodeStatus()) ;
-			
-			logger.trace(methodName, null, "... Agent "+node.getNodeIdentity().getCanonicalName()+" Posting Users:"+
-					node.getNodeMetrics().getNodeUsersMap().size());
-			// Check if swap free is less than defined minimum threshold (check ducc.properties) 
-			if ( swapThreshold > 0 && ( node.getNodeMetrics().getNodeMemory().getSwapFree() < swapThreshold)) {
-			  agent.killProcessDueToLowSwapSpace(swapThreshold);
-			}
-			NodeMetricsUpdateDuccEvent updateEvent = new NodeMetricsUpdateDuccEvent(node,agent.getInventoryRef().size());
-			e.getIn().setBody(updateEvent, NodeMetricsUpdateDuccEvent.class);
-
-		} catch (Exception ex) {
-			logger.error(methodName, null, ex, new Object[] { "Agent" });
-		}
-	}
+    String methodName = "process";
+    try {
+      // every 10th node metrics publication log the status of CGroups
+      if ((NodeAgent.logCounter.incrementAndGet() % 10) == 0) {
+        if (agent.useCgroups) {
+          logger.info(methodName, null, "\t****\n\t**** Agent CGroups status: enabled");
+
+        } else {
+          logger.info(methodName, null, "\t****\n\t**** Agent CGroups status: disabled. Reason:"
+                  + NodeAgent.cgroupFailureReason);
+
+        }
+      }
+
+      NodeMemInfoCollector memCollector = new NodeMemInfoCollector(MeminfoTargetFields);
+      Future<NodeMemory> nmiFuture = pool.submit(memCollector);
+
+      NodeLoadAverageCollector loadAvgCollector = new NodeLoadAverageCollector();
+
+      Future<NodeLoadAverage> loadFuture = pool.submit(loadAvgCollector);
+      NodeCpuCollector cpuCollector = new NodeCpuCollector();
+      // Future<NodeCpuInfo> cpuFuture = pool.submit(cpuCollector);
+      NodeCpuInfo cpuInfo = new NodeCpuInfo(agent.numProcessors,
+              String.valueOf(cpuCollector.call()));
+
+      e.getIn().setHeader("node", agent.getIdentity().getCanonicalName());
+      NodeMemory memInfo = nmiFuture.get();
+      TreeMap<String, NodeUsersInfo> users = null;
+      // begin collecting user processes and activate rogue process detector
+      // only after the agent receives the first Ducc state publication.
+      if (agent.receivedDuccState) {
+        NodeUsersCollector nodeUsersCollector = new NodeUsersCollector(agent, logger);
+
+        logger.debug(methodName, null, "... Agent Collecting User Processes");
+
+        Future<TreeMap<String, NodeUsersInfo>> nuiFuture = pool.submit(nodeUsersCollector);
+        users = nuiFuture.get();
+      } else {
+        users = new TreeMap<String, NodeUsersInfo>();
+      }
+      NodeLoadAverage lav = loadFuture.get();
+      boolean cpuReportingEnabled = false;
+      if (agent.cgroupsManager != null) {
+        cpuReportingEnabled = agent.cgroupsManager.isCpuReportingEnabled();
+      }
+      NodeMetrics nodeMetrics = new NodeMetrics(agent.getIdentity(), memInfo, lav, cpuInfo, users,
+              cpuReportingEnabled);
+      if (agent.isStopping()) {
+        nodeMetrics.disableNode(); // sends Unavailable status to clients (RM,WS)
+      }
+
+      Node node = new DuccNode(agent.getIdentity(), nodeMetrics, agent.useCgroups);
+      // Make the agent aware how much memory is available on the node. Do this once.
+      if (agent.getNodeInfo() == null) {
+        agent.setNodeInfo(node);
+      }
+
+      ((DuccNode) node).duccLingExists(agent.duccLingExists());
+      ((DuccNode) node).runWithDuccLing(agent.runWithDuccLing());
+      logger.info(methodName, null,
+              "... Agent " + node.getNodeIdentity().getCanonicalName() + " OS Name:" + osname
+                      + " OS Version:" + osversion + " OS Arch:" + osarch + " CPU Count:"
+                      + cpuInfo.getAvailableProcessors() + " CPU Load Average:" + lav.getLoadAvg1()
+                      + " Posting Memory (KB):"
+                      + node.getNodeMetrics().getNodeMemory().getMemTotal() + " Memory Free (KB):"
+                      + node.getNodeMetrics().getNodeMemory().getMemFree() + " Swap Total (KB):"
+                      + node.getNodeMetrics().getNodeMemory().getSwapTotal() + " Swap Free (KB):"
+                      + node.getNodeMetrics().getNodeMemory().getSwapFree()
+                      + " Low Swap Threshold Defined in ducc.properties (KB):" + swapThreshold
+                      + " CPU Reporting Enabled:" + cpuReportingEnabled + " Node Status:"
+                      + nodeMetrics.getNodeStatus());
+
+      logger.trace(methodName, null, "... Agent " + node.getNodeIdentity().getCanonicalName()
+              + " Posting Users:" + node.getNodeMetrics().getNodeUsersMap().size());
+      // Check if swap free is less than defined minimum threshold (check ducc.properties)
+      if (swapThreshold > 0
+              && (node.getNodeMetrics().getNodeMemory().getSwapFree() < swapThreshold)) {
+        agent.killProcessDueToLowSwapSpace(swapThreshold);
+      }
+      NodeMetricsUpdateDuccEvent updateEvent = new NodeMetricsUpdateDuccEvent(node,
+              agent.getInventoryRef().size());
+      e.getIn().setBody(updateEvent, NodeMetricsUpdateDuccEvent.class);
+
+    } catch (Exception ex) {
+      logger.error(methodName, null, ex, new Object[] { "Agent" });
+    }
+  }
 }

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java Fri Aug  2 17:27:48 2019
@@ -43,164 +43,159 @@ import org.apache.uima.ducc.transport.ev
 import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
 
-public class LinuxProcessMetricsProcessor extends BaseProcessor implements
-		ProcessMetricsProcessor {
+public class LinuxProcessMetricsProcessor extends BaseProcessor implements ProcessMetricsProcessor {
+
+  private long previousCPUReadingInNanos = 0;
+
+  private long previousSnapshotTime = 0;
+
+  private final ExecutorService pool;
+
+  private IDuccProcess process;
+
+  private DuccGarbageStatsCollector gcStatsCollector;
+
+  private int blockSize = 4096; // default, OS specific
+
+  private DuccLogger logger;
+
+  private ManagedProcess managedProcess;
+
+  private NodeAgent agent;
+
+  private int fudgeFactor = 5; // default is 5%
+
+  private volatile boolean closed = true;
+
+  private long percentCPU = 0;
+
+  public LinuxProcessMetricsProcessor(DuccLogger logger, IDuccProcess process, NodeAgent agent,
+          ManagedProcess managedProcess) throws FileNotFoundException {
+    this.logger = logger;
+    this.managedProcess = managedProcess;
+    this.agent = agent;
+    pool = Executors.newCachedThreadPool();
+    this.process = process;
+    gcStatsCollector = new DuccGarbageStatsCollector(logger, process);
+
+    // keep a refernce to this so that we can call close() when the process
+    // terminates. We need to
+    // close fds to stat and statm files
+    managedProcess.setMetricsProcessor(this);
+
+    blockSize = agent.getOSPageSize();
+
+    if (System.getProperty("ducc.agent.share.size.fudge.factor") != null) {
+      try {
+        fudgeFactor = Integer.parseInt(System.getProperty("ducc.agent.share.size.fudge.factor"));
+      } catch (NumberFormatException e) {
+        e.printStackTrace();
+      }
+    }
+    closed = false;
+  }
+
+  public void stop() {
+    try {
+      if (pool != null) {
+        pool.shutdown();
+      }
+    } catch (Exception e) {
+      logger.error("LinuxProcessMetricsProcessor.stop()", null, e);
+
+    }
+  }
+
+  public void close() {
+    closed = true;
+    try {
+      this.stop();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  private boolean collectStats(ProcessState state) {
+    if (process.getProcessState().equals(ProcessState.Stopped)
+            || process.getProcessState().equals(ProcessState.Killed)
+            || process.getProcessState().equals(ProcessState.Failed)
+            || process.getProcessState().equals(ProcessState.Stopping)) {
+      return false; // dont collect stats
+    }
+    return true;
+  }
+
+  private long getSwapUsage() throws Exception {
+    long swapUsage = -1;
+    if (agent.useCgroups) {
+
+      String containerId = agent.cgroupsManager.getContainerId(managedProcess);
+
+      ProcessSwapUsageCollector processSwapCollector = new ProcessSwapUsageCollector(logger,
+              agent.cgroupsManager, containerId);
+      logger.debug("LinuxProcessMetricsProcessor.getSwapUsage", null,
+              "Fetching Swap Usage PID:" + process.getPID());
+      Future<ProcessSwapSpaceUsage> processFaults = pool.submit(processSwapCollector);
+      swapUsage = processFaults.get().getSwapUsage();
+      logger.debug("LinuxProcessMetricsProcessor.getSwapUsage", null,
+              " Process Swap Usage:" + swapUsage);
+    }
+    return swapUsage;
+  }
+
+  private long getFaults() throws Exception {
+    long faults = -1;
+    if (agent.useCgroups) {
+      String containerId = agent.cgroupsManager.getContainerId(managedProcess);
+
+      ProcessMajorFaultCollector processFaultsCollector = new ProcessMajorFaultCollector(logger,
+              agent.cgroupsManager, containerId);
+      logger.debug("LinuxProcessMetricsProcessor.getFaults", null,
+              "Fetching Page Faults PID:" + process.getPID());
+      Future<ProcessMemoryPageLoadUsage> processFaults = pool.submit(processFaultsCollector);
+      faults = processFaults.get().getMajorFaults();
+      logger.debug("LinuxProcessMetricsProcessor.getFaults", null,
+              " Process Faults (pgpgin):" + faults);
+    }
+    return faults;
+  }
+
+  private long getRss() throws Exception {
+    long rss = -1;
+    if (agent.useCgroups) {
+      String containerId = agent.cgroupsManager.getContainerId(managedProcess);
+
+      ProcessResidentMemoryCollector processRSSCollector = new ProcessResidentMemoryCollector(
+              logger, agent.cgroupsManager, containerId);
+      logger.debug("LinuxProcessMetricsProcessor.getRss", null,
+              "Fetching RSS Usage for PID:" + process.getPID());
+      Future<ProcessResidentMemory> processRss = pool.submit(processRSSCollector);
+      rss = processRss.get().get();
+      logger.debug("LinuxProcessMetricsProcessor.getRss", null, " Process RSS:" + rss);
+    }
+    return rss;
+  }
+
+  private long getCpuUsage() throws Exception {
+    long cpuUsage = -1;
+    if (agent.useCgroups) {
+      String containerId = agent.cgroupsManager.getContainerId(managedProcess);
+
+      Future<ProcessCpuUsage> processCpuUsage = null;
+      ProcessCpuUsageCollector processCpuUsageCollector = new ProcessCpuUsageCollector(logger,
+              agent.cgroupsManager, containerId);
+      logger.debug("LinuxProcessMetricsProcessor.getCpuUsage", null,
+              "Fetching CPU Usage for PID:" + process.getPID());
+      processCpuUsage = pool.submit(processCpuUsageCollector);
+      long cpuUsageInNanos = processCpuUsage.get().getCpuUsage();
+      // cpuUsage comes from cpuacct.usage and is in nanos
+      cpuUsage = cpuUsageInNanos;// / 1000000 ; // normalize into millis
+      logger.debug("LinuxProcessMetricsProcessor.getCpuUsage", null, "CPU USAGE:" + cpuUsageInNanos
+              + " CLOCK RATE:" + agent.cpuClockRate + " Total CPU USAGE:" + cpuUsage);
+    }
+    return cpuUsage;
+  }
 
-	private long previousCPUReadingInNanos = 0;
-	
-	private long previousSnapshotTime = 0;
-
-	private final ExecutorService pool;
-
-	private IDuccProcess process;
-
-	private DuccGarbageStatsCollector gcStatsCollector;
-
-	private int blockSize = 4096; // default, OS specific
-
-	private DuccLogger logger;
-
-	private ManagedProcess managedProcess;
-
-	private NodeAgent agent;
-
-	private int fudgeFactor = 5; // default is 5%
-
-	private volatile boolean closed = true;
-
-
-	private long percentCPU = 0;
-	
-	
-	public LinuxProcessMetricsProcessor(DuccLogger logger,
-			IDuccProcess process, NodeAgent agent, ManagedProcess managedProcess) throws FileNotFoundException {
-		this.logger = logger;
-		this.managedProcess = managedProcess;
-		this.agent = agent;
-		pool = Executors.newCachedThreadPool();
-		this.process = process;
-		gcStatsCollector = new DuccGarbageStatsCollector(logger, process);
-
-		// keep a refernce to this so that we can call close() when the process
-		// terminates. We need to
-		// close fds to stat and statm files
-		managedProcess.setMetricsProcessor(this);
-
-		blockSize = agent.getOSPageSize();
-
-		if (System.getProperty("ducc.agent.share.size.fudge.factor") != null) {
-			try {
-				fudgeFactor = Integer.parseInt(System
-						.getProperty("ducc.agent.share.size.fudge.factor"));
-			} catch (NumberFormatException e) {
-				e.printStackTrace();
-			}
-		}
-		closed = false;
-	}
-
-	public void stop() {
-		try {
-			if (pool != null) {
-				pool.shutdown();
-			}
-		} catch (Exception e) {
-			logger.error("LinuxProcessMetricsProcessor.stop()", null, e);
-
-		}
-	}
-
-	public void close() {
-		closed = true;
-		try {
-			this.stop();
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-	}
-
-	private boolean collectStats(ProcessState state) {
-		if (process.getProcessState().equals(ProcessState.Stopped)
-				|| process.getProcessState().equals(ProcessState.Killed)
-				|| process.getProcessState().equals(ProcessState.Failed)
-				|| process.getProcessState().equals(ProcessState.Stopping)) {
-			return false; // dont collect stats
-		}
-		return true;
-	}
-
-	private long getSwapUsage() throws Exception {
-		long swapUsage = -1;
-		if (agent.useCgroups) {
-
-			String containerId = agent.cgroupsManager
-					.getContainerId(managedProcess);
-
-			ProcessSwapUsageCollector processSwapCollector = new ProcessSwapUsageCollector(
-					logger, agent.cgroupsManager, containerId);
-			logger.debug("LinuxProcessMetricsProcessor.getSwapUsage", null,
-					"Fetching Swap Usage PID:" + process.getPID());
-			Future<ProcessSwapSpaceUsage> processFaults = pool
-					.submit(processSwapCollector);
-			swapUsage = processFaults.get().getSwapUsage();
-			logger.debug("LinuxProcessMetricsProcessor.getSwapUsage", null,
-					" Process Swap Usage:" + swapUsage);
-		}
-		return swapUsage;
-	}
-	
-	private long getFaults() throws Exception {
-		long faults = -1;
-		if (agent.useCgroups) {
-			String containerId = agent.cgroupsManager.getContainerId(managedProcess);
-
-			ProcessMajorFaultCollector processFaultsCollector = 
-					new ProcessMajorFaultCollector(logger, agent.cgroupsManager, containerId);
-	        logger.debug("LinuxProcessMetricsProcessor.getFaults",null,"Fetching Page Faults PID:"+process.getPID());
-	        Future<ProcessMemoryPageLoadUsage> processFaults = pool.submit(processFaultsCollector);
-		    faults = processFaults.get().getMajorFaults();
-			logger.debug(
-					"LinuxProcessMetricsProcessor.getFaults",null," Process Faults (pgpgin):"+faults);
-		}
-		return faults;
-	}
-	private long getRss() throws Exception {
-		long rss = -1;
-		if (agent.useCgroups) {
-			String containerId = agent.cgroupsManager.getContainerId(managedProcess);
-
-			ProcessResidentMemoryCollector processRSSCollector = 
-					new ProcessResidentMemoryCollector(logger, agent.cgroupsManager, containerId);
-	        logger.debug("LinuxProcessMetricsProcessor.getRss",null,"Fetching RSS Usage for PID:"+process.getPID());
-	        Future<ProcessResidentMemory> processRss = pool.submit(processRSSCollector);
-		    rss = processRss.get().get();
-			logger.debug(
-					"LinuxProcessMetricsProcessor.getRss",null," Process RSS:"+rss);
-		}
-		return rss;
-	}
-	
-	private long getCpuUsage() throws Exception {
-		long cpuUsage=-1;
-		if (agent.useCgroups) {
-			String containerId = agent.cgroupsManager.getContainerId(managedProcess);
-
-			Future<ProcessCpuUsage> processCpuUsage = null;
-			ProcessCpuUsageCollector processCpuUsageCollector = 
-					new ProcessCpuUsageCollector(logger, agent.cgroupsManager, containerId);
-	        logger.debug("LinuxProcessMetricsProcessor.getCpuUsage",null,"Fetching CPU Usage for PID:"+process.getPID());
-			processCpuUsage = pool
-					.submit(processCpuUsageCollector);
-			long cpuUsageInNanos = processCpuUsage.get().getCpuUsage();
-			// cpuUsage comes from cpuacct.usage and is in nanos
-			cpuUsage =  cpuUsageInNanos;// / 1000000 ;  // normalize into millis
-			logger.debug(
-					"LinuxProcessMetricsProcessor.getCpuUsage",null,
-					"CPU USAGE:"+cpuUsageInNanos+ " CLOCK RATE:"+agent.cpuClockRate+" Total CPU USAGE:"+cpuUsage);
-		}
-		return cpuUsage;
-	}
-	
   private long getCpuTime(long totalCpuUsageInNanos) throws Exception {
     long cp = -1;
     if (agent.useCgroups) {
@@ -215,9 +210,9 @@ public class LinuxProcessMetricsProcesso
         if (process.getTimeWindowRun() != null) {
           timeRunning += process.getTimeWindowRun().getElapsedMillis();
         }
-        long totalCpuUsageInMillis = totalCpuUsageInNanos/1000000;
+        long totalCpuUsageInMillis = totalCpuUsageInNanos / 1000000;
         // normalize time in running state into seconds
-        percentCPU = Math.round(100*( (totalCpuUsageInMillis*1.0)/ (timeRunning*1.0)));
+        percentCPU = Math.round(100 * ((totalCpuUsageInMillis * 1.0) / (timeRunning * 1.0)));
 
         cp = percentCPU;
 
@@ -229,229 +224,192 @@ public class LinuxProcessMetricsProcesso
     return cp;
   }
 
-	private long getCurrentCpu(long totalCpuUsageInNanos ) {
-		long currentCpu=-1;
-		// publish current CPU usage by computing a delta from the last time
-		// CPU data was fetched.
-		if ( agent.useCgroups) {
-		  //long totalCpuUsageInMillis = totalCpuUsageInNanos/1000000;
-			long millisCPU =  (totalCpuUsageInNanos - previousCPUReadingInNanos)/1000000 ;
-			long millisRun = System.currentTimeMillis() - previousSnapshotTime ;
-			currentCpu = Math.round(100*( (millisCPU*1.0)/(millisRun*1.0)) );
-			previousCPUReadingInNanos = totalCpuUsageInNanos;
-			previousSnapshotTime = System.currentTimeMillis();
-		} 
-		
-		return currentCpu;
-	}
-	
-
-	private void killProcsIfExceedingMemoryThreshold() throws Exception {
-		if ( !agent.useCgroups ) {
-			return;
-		}
-		if (process.getSwapUsage() > 0
-				&& process.getSwapUsage() > managedProcess
-						.getMaxSwapThreshold()) {
-		} else {
-			String containerId = agent.cgroupsManager
-					.getContainerId(managedProcess);
-
-			String[] cgroupPids = agent.cgroupsManager
-			        .getPidsInCgroup(containerId);
-            logger.debug("LinuxProcessMetricsProcessor.process",null,"Container ID:"+containerId+" cgroup pids "+cgroupPids.length);
-
-			// Use Memory Guard only if cgroups are disabled and fudge
-			// factor > -1
-
-			if ( fudgeFactor > -1
-				 && managedProcess.getProcessMemoryAssignment()
-							.getMaxMemoryWithFudge() > 0) {
-
-				long rss = (process.getResidentMemory() / 1024) / 1024; // normalize RSS into MB
-
-				logger.trace(
-						"process",
-						null,
-						"*** Process with PID:"
-								+ managedProcess.getPid()
-								+ " Assigned Memory (MB): "
-								+ managedProcess
-										.getProcessMemoryAssignment()
-								+ " MBs. Current RSS (MB):" + rss);
-				// check if process resident memory exceeds its memory
-				// assignment calculate in the PM
-				if (rss > managedProcess.getProcessMemoryAssignment()
-						.getMaxMemoryWithFudge()) {
-					logger.error(
-							"process",
-							null,
-							"\n\n********************************************************\n\tProcess with PID:"
-									+ managedProcess.getPid()
-									+ " Exceeded its max memory assignment (including a fudge factor) of "
-									+ managedProcess
-											.getProcessMemoryAssignment()
-											.getMaxMemoryWithFudge()
-									+ " MBs. This Process Resident Memory Size: "
-									+ rss
-									+ " MBs .Killing process ...\n********************************************************\n\n");
-					try {
-						managedProcess.kill(); // mark it for death
-						process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededShareSize
-								.toString());
-						agent.stopProcess(process);
-
-						if (agent.useCgroups) {
-							for (String pid : cgroupPids) {
-								// skip the main process that was just
-								// killed above. Only kill
-								// its child processes.
-								if (pid.equals(managedProcess
-										.getDuccProcess().getPID())) {
-									continue;
-								}
-								killChildProcess(pid, "-15");
-							}
-						}
-					} catch (Exception ee) {
-						if (!collectStats(process.getProcessState())) {
-							return;
-						}
-						logger.error("process", null, ee);
-					}
-					return;
-				}
-			}
-
-		}
-
-	}
-
-	private ProcessGarbageCollectionStats getGCStats() throws Exception {
-//		if (!process.getProcessType().equals(ProcessType.Pop)) {
-		if ( process.getProcessJmxUrl() != null
-				&& process.getProcessJmxUrl().trim().length() > 0 ) {
-			logger.debug("LinuxProcessMetricsProcessor.getGCStats",	null, "Collecting GC Stats");
-			ProcessGarbageCollectionStats gcStats = gcStatsCollector
-					.collect();
-		   return gcStats;
-		}
-		return new ProcessGarbageCollectionStats();
-	}
-	public boolean processIsActive() {
-		return process.getProcessState().equals(ProcessState.Starting)
-               ||
-               process.getProcessState().equals(ProcessState.Started)
-               ||
-			   process.getProcessState().equals(ProcessState.Initializing)
-			   || 
-			   process.getProcessState().equals(ProcessState.Running);
-	}
-	public void process(Exchange e) {
-		// if process is stopping or already dead dont collect metrics. The
-		// Camel route has just been stopped.
-		if (closed || !processIsActive()) {
- 		    logger.info("LinuxProcessMetricsProcessor.process",	null,"Process with PID:"+process.getPID() +" not in Running or Initializing state. Terminating Process Metrics Collector");	
- 		    // Spin a thread to terminate Camel route which called this processor.
- 		    // This thread will stop the route since the process for which metrics
- 		    // collection was started is no longer running. This is a defensive
- 		    // measure in cases when a route is not stopped as part of process 
- 		    // deallocation. 
- 		    Thread t = new Thread( new Runnable() {
- 		    	public void run() {
- 		    		agent.onProcessExit(process);
- 		    	}
- 		    });
- 		    t.start();
- 		    return;
-		}
-		try {
-			
-			process.setSwapUsage(getSwapUsage());
-			process.setMajorFaults(getFaults());
-
-			long rssInBytes = getRss();
-			process.setResidentMemory(rssInBytes);
-
-			long totalCpuUsageInNanos = getCpuUsage();
-
-			// set CPU time in terms of %
-			process.setCpuTime(getCpuTime(totalCpuUsageInNanos));
-
-			process.setCurrentCPU(getCurrentCpu(totalCpuUsageInNanos));
-
-			ProcessGarbageCollectionStats gcStats = getGCStats();
-			process.setGarbageCollectionStats(gcStats);
-			logger.info(
-					"process",
-					null,
-					"----------- PID:" + process.getPID() + " RSS:" 
-							+ ((rssInBytes > -1) ? (rssInBytes / (1024 * 1024))+ " MB" : "-1")
-							+ " Total CPU Time (%):" + process.getCpuTime()
-							+ " Delta CPU Time (%):" + process.getCurrentCPU()
-							+ " Major Faults:" + process.getMajorFaults()
-							+ " Process Swap Usage:" + process.getSwapUsage()
-							+ " Max Swap Usage Allowed:"
-							+ managedProcess.getMaxSwapThreshold()
-							+ " Total GC Collection Count :"
-							+ gcStats.getCollectionCount()
-							+ " Total GC Collection Time :"
-							+ gcStats.getCollectionTime());
-
-			killProcsIfExceedingMemoryThreshold();
-
-		} catch (Exception exc) {
-			if (!collectStats(process.getProcessState())) {
-				return;
-			}
-			logger.error("LinuxProcessMetricsProcessor.process", null, exc);
-		}
-	}
-
-	private void killChildProcess(final String pid, final String signal) {
-		// spawn a thread that will do kill -15, wait for 1 minute and kill the
-		// process
-		// hard if it is still alive
-		(new Thread() {
-			public void run() {
-				String c_launcher_path = Utils
-						.resolvePlaceholderIfExists(
-								System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
-								System.getProperties());
-				try {
-					String[] killCmd = null;
-					String useSpawn = System
-							.getProperty("ducc.agent.launcher.use.ducc_spawn");
-					if (useSpawn != null
-							&& useSpawn.toLowerCase().equals("true")) {
-						killCmd = new String[] {
-								c_launcher_path,
-								"-u",
-								((ManagedProcess) managedProcess).getOwner(),
-								"--",
-								"/bin/kill",
-								signal,
-								((ManagedProcess) managedProcess)
-										.getDuccProcess().getPID() };
-					} else {
-						killCmd = new String[] {
-								"/bin/kill",
-								"-15",
-								((ManagedProcess) managedProcess)
-										.getDuccProcess().getPID() };
-					}
-					ProcessBuilder pb = new ProcessBuilder(killCmd);
-					Process p = pb.start();
-					p.wait(1000 * 60); // wait for 1 minute and whack the
-										// process if still alive
-					p.destroy();
-				} catch (Exception e) {
-					logger.error("killChildProcess",
-							managedProcess.getWorkDuccId(), e);
-				}
-			}
-		}).start();
+  private long getCurrentCpu(long totalCpuUsageInNanos) {
+    long currentCpu = -1;
+    // publish current CPU usage by computing a delta from the last time
+    // CPU data was fetched.
+    if (agent.useCgroups) {
+      // long totalCpuUsageInMillis = totalCpuUsageInNanos/1000000;
+      long millisCPU = (totalCpuUsageInNanos - previousCPUReadingInNanos) / 1000000;
+      long millisRun = System.currentTimeMillis() - previousSnapshotTime;
+      currentCpu = Math.round(100 * ((millisCPU * 1.0) / (millisRun * 1.0)));
+      previousCPUReadingInNanos = totalCpuUsageInNanos;
+      previousSnapshotTime = System.currentTimeMillis();
+    }
+
+    return currentCpu;
+  }
+
+  private void killProcsIfExceedingMemoryThreshold() throws Exception {
+    if (!agent.useCgroups) {
+      return;
+    }
+    if (process.getSwapUsage() > 0
+            && process.getSwapUsage() > managedProcess.getMaxSwapThreshold()) {
+    } else {
+      String containerId = agent.cgroupsManager.getContainerId(managedProcess);
+
+      String[] cgroupPids = agent.cgroupsManager.getPidsInCgroup(containerId);
+      logger.debug("LinuxProcessMetricsProcessor.process", null,
+              "Container ID:" + containerId + " cgroup pids " + cgroupPids.length);
+
+      // Use Memory Guard only if cgroups are disabled and fudge
+      // factor > -1
+
+      if (fudgeFactor > -1
+              && managedProcess.getProcessMemoryAssignment().getMaxMemoryWithFudge() > 0) {
+
+        long rss = (process.getResidentMemory() / 1024) / 1024; // normalize RSS into MB
+
+        logger.trace("process", null,
+                "*** Process with PID:" + managedProcess.getPid() + " Assigned Memory (MB): "
+                        + managedProcess.getProcessMemoryAssignment() + " MBs. Current RSS (MB):"
+                        + rss);
+        // check if process resident memory exceeds its memory
+        // assignment calculate in the PM
+        if (rss > managedProcess.getProcessMemoryAssignment().getMaxMemoryWithFudge()) {
+          logger.error("process", null,
+                  "\n\n********************************************************\n\tProcess with PID:"
+                          + managedProcess.getPid()
+                          + " Exceeded its max memory assignment (including a fudge factor) of "
+                          + managedProcess.getProcessMemoryAssignment().getMaxMemoryWithFudge()
+                          + " MBs. This Process Resident Memory Size: " + rss
+                          + " MBs .Killing process ...\n********************************************************\n\n");
+          try {
+            managedProcess.kill(); // mark it for death
+            process.setReasonForStoppingProcess(
+                    ReasonForStoppingProcess.ExceededShareSize.toString());
+            agent.stopProcess(process);
+
+            if (agent.useCgroups) {
+              for (String pid : cgroupPids) {
+                // skip the main process that was just
+                // killed above. Only kill
+                // its child processes.
+                if (pid.equals(managedProcess.getDuccProcess().getPID())) {
+                  continue;
+                }
+                killChildProcess(pid, "-15");
+              }
+            }
+          } catch (Exception ee) {
+            if (!collectStats(process.getProcessState())) {
+              return;
+            }
+            logger.error("process", null, ee);
+          }
+          return;
+        }
+      }
+
+    }
+
+  }
+
+  private ProcessGarbageCollectionStats getGCStats() throws Exception {
+    // if (!process.getProcessType().equals(ProcessType.Pop)) {
+    if (process.getProcessJmxUrl() != null && process.getProcessJmxUrl().trim().length() > 0) {
+      logger.debug("LinuxProcessMetricsProcessor.getGCStats", null, "Collecting GC Stats");
+      ProcessGarbageCollectionStats gcStats = gcStatsCollector.collect();
+      return gcStats;
+    }
+    return new ProcessGarbageCollectionStats();
+  }
+
+  public boolean processIsActive() {
+    return process.getProcessState().equals(ProcessState.Starting)
+            || process.getProcessState().equals(ProcessState.Started)
+            || process.getProcessState().equals(ProcessState.Initializing)
+            || process.getProcessState().equals(ProcessState.Running);
+  }
+
+  public void process(Exchange e) {
+    // if process is stopping or already dead dont collect metrics. The
+    // Camel route has just been stopped.
+    if (closed || !processIsActive()) {
+      logger.info("LinuxProcessMetricsProcessor.process", null, "Process with PID:"
+              + process.getPID()
+              + " not in Running or Initializing state. Terminating Process Metrics Collector");
+      // Spin a thread to terminate Camel route which called this processor.
+      // This thread will stop the route since the process for which metrics
+      // collection was started is no longer running. This is a defensive
+      // measure in cases when a route is not stopped as part of process
+      // deallocation.
+      Thread t = new Thread(new Runnable() {
+        public void run() {
+          agent.onProcessExit(process);
+        }
+      });
+      t.start();
+      return;
+    }
+    try {
+
+      process.setSwapUsage(getSwapUsage());
+      process.setMajorFaults(getFaults());
+
+      long rssInBytes = getRss();
+      process.setResidentMemory(rssInBytes);
+
+      long totalCpuUsageInNanos = getCpuUsage();
+
+      // set CPU time in terms of %
+      process.setCpuTime(getCpuTime(totalCpuUsageInNanos));
+
+      process.setCurrentCPU(getCurrentCpu(totalCpuUsageInNanos));
+
+      ProcessGarbageCollectionStats gcStats = getGCStats();
+      process.setGarbageCollectionStats(gcStats);
+      logger.info("process", null,
+              "----------- PID:" + process.getPID() + " RSS:"
+                      + ((rssInBytes > -1) ? (rssInBytes / (1024 * 1024)) + " MB" : "-1")
+                      + " Total CPU Time (%):" + process.getCpuTime() + " Delta CPU Time (%):"
+                      + process.getCurrentCPU() + " Major Faults:" + process.getMajorFaults()
+                      + " Process Swap Usage:" + process.getSwapUsage() + " Max Swap Usage Allowed:"
+                      + managedProcess.getMaxSwapThreshold() + " Total GC Collection Count :"
+                      + gcStats.getCollectionCount() + " Total GC Collection Time :"
+                      + gcStats.getCollectionTime());
+
+      killProcsIfExceedingMemoryThreshold();
+
+    } catch (Exception exc) {
+      if (!collectStats(process.getProcessState())) {
+        return;
+      }
+      logger.error("LinuxProcessMetricsProcessor.process", null, exc);
+    }
+  }
+
+  private void killChildProcess(final String pid, final String signal) {
+    // spawn a thread that will do kill -15, wait for 1 minute and kill the
+    // process
+    // hard if it is still alive
+    (new Thread() {
+      public void run() {
+        String c_launcher_path = Utils.resolvePlaceholderIfExists(
+                System.getProperty("ducc.agent.launcher.ducc_spawn_path"), System.getProperties());
+        try {
+          String[] killCmd = null;
+          String useSpawn = System.getProperty("ducc.agent.launcher.use.ducc_spawn");
+          if (useSpawn != null && useSpawn.toLowerCase().equals("true")) {
+            killCmd = new String[] { c_launcher_path, "-u",
+                ((ManagedProcess) managedProcess).getOwner(), "--", "/bin/kill", signal,
+                ((ManagedProcess) managedProcess).getDuccProcess().getPID() };
+          } else {
+            killCmd = new String[] { "/bin/kill", "-15",
+                ((ManagedProcess) managedProcess).getDuccProcess().getPID() };
+          }
+          ProcessBuilder pb = new ProcessBuilder(killCmd);
+          Process p = pb.start();
+          p.wait(1000 * 60); // wait for 1 minute and whack the
+          // process if still alive
+          p.destroy();
+        } catch (Exception e) {
+          logger.error("killChildProcess", managedProcess.getWorkDuccId(), e);
+        }
+      }
+    }).start();
 
-	}
+  }
 
 }

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/NodeInventoryProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/NodeInventoryProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/NodeInventoryProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/NodeInventoryProcessor.java Fri Aug  2 17:27:48 2019
@@ -24,7 +24,6 @@ import org.apache.camel.Processor;
 import org.apache.uima.ducc.common.utils.id.DuccId;
 import org.apache.uima.ducc.transport.event.common.IDuccProcess;
 
-
 public interface NodeInventoryProcessor extends Processor {
-	public Map<DuccId, IDuccProcess> getInventory();
+  public Map<DuccId, IDuccProcess> getInventory();
 }

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/NodeMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/NodeMetricsProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/NodeMetricsProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/NodeMetricsProcessor.java Fri Aug  2 17:27:48 2019
@@ -22,5 +22,5 @@ import org.apache.camel.Processor;
 import org.apache.uima.ducc.agent.NodeAgent;
 
 public interface NodeMetricsProcessor extends Processor {
-	public void setAgent(NodeAgent agent);
+  public void setAgent(NodeAgent agent);
 }

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/ProcessLifecycleProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/ProcessLifecycleProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/ProcessLifecycleProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/ProcessLifecycleProcessor.java Fri Aug  2 17:27:48 2019
@@ -21,25 +21,32 @@ package org.apache.uima.ducc.agent.proce
 import org.apache.camel.Exchange;
 
 public class ProcessLifecycleProcessor extends BaseProcessor {
-  
-  //private static final String windowsCommandSeparator = "&";
-  //private static final String nixCommandSeparator = " ; ";
-	public ProcessLifecycleProcessor() {
-		super();
-	}
-	/**
-	 * Method called by Camel if a message is of type:
 
-	 * <ul> StartProcessEvent</ul>
-	 * <ul> StopProcessEvent</ul>
-	 * <ul> PurgeProcessEvent</ul>
-	 * 
-	 * This method checks if a message contains this host name (or IP) as a target. If this agent
-	 * is not a target, the message is ignored.
-	 * 
-	 */
-	public void process(final Exchange e) throws Exception {
-	  System.out.println("... Agent Received Request");
-	}
+  // private static final String windowsCommandSeparator = "&";
+  // private static final String nixCommandSeparator = " ; ";
+  public ProcessLifecycleProcessor() {
+    super();
+  }
+
+  /**
+   * Method called by Camel if a message is of type:
+   * 
+   * <ul>
+   * StartProcessEvent
+   * </ul>
+   * <ul>
+   * StopProcessEvent
+   * </ul>
+   * <ul>
+   * PurgeProcessEvent
+   * </ul>
+   * 
+   * This method checks if a message contains this host name (or IP) as a target. If this agent is
+   * not a target, the message is ignored.
+   * 
+   */
+  public void process(final Exchange e) throws Exception {
+    System.out.println("... Agent Received Request");
+  }
 
 }

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/ProcessMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/ProcessMetricsProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/ProcessMetricsProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/ProcessMetricsProcessor.java Fri Aug  2 17:27:48 2019
@@ -21,5 +21,5 @@ package org.apache.uima.ducc.agent.proce
 import org.apache.camel.Processor;
 
 public interface ProcessMetricsProcessor extends Processor {
-	public void stop();
+  public void stop();
 }