You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2015/09/02 21:21:30 UTC

svn commit: r1700877 - /uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java

Author: cwiklik
Date: Wed Sep  2 19:21:29 2015
New Revision: 1700877

URL: http://svn.apache.org/r1700877
Log:
UIMA-4585 - fixes JP CPU percentage when running with cgroups and child processes

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java?rev=1700877&r1=1700876&r2=1700877&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java Wed Sep  2 19:21:29 2015
@@ -18,7 +18,6 @@
  */
 package org.apache.uima.ducc.agent.processors;
 
-
 import java.io.BufferedReader;
 import java.io.FileNotFoundException;
 import java.io.InputStream;
@@ -47,460 +46,570 @@ import org.apache.uima.ducc.transport.ev
 import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
 
-public class LinuxProcessMetricsProcessor extends BaseProcessor implements ProcessMetricsProcessor {
-  private RandomAccessFile statmFile;
-
-  // private RandomAccessFile nodeStatFile;
-  private RandomAccessFile processStatFile;
+public class LinuxProcessMetricsProcessor extends BaseProcessor implements
+		ProcessMetricsProcessor {
+	private RandomAccessFile statmFile;
+
+	// private RandomAccessFile nodeStatFile;
+	private RandomAccessFile processStatFile;
+
+	private long totalCpuInitUsage = 0;
+
+	private boolean initializing = true;
+
+	private final ExecutorService pool;
+
+	private IDuccProcess process;
+
+	private DuccGarbageStatsCollector gcStatsCollector;
+
+	private int blockSize = 4096; // default, OS specific
+
+	private DuccLogger logger;
+
+	private ManagedProcess managedProcess;
+
+	private NodeAgent agent;
+
+	private int fudgeFactor = 5; // default is 5%
+
+	private volatile boolean closed = true;
+
+	private long clockAtStartOfRun = 0;
+
+	private long percentCPU = 0;
+
+	public LinuxProcessMetricsProcessor(DuccLogger logger,
+			IDuccProcess process, NodeAgent agent, String statmFilePath,
+			String nodeStatFilePath, String processStatFilePath,
+			ManagedProcess managedProcess) throws FileNotFoundException {
+		this.logger = logger;
+		statmFile = new RandomAccessFile(statmFilePath, "r");
+		// nodeStatFile = new RandomAccessFile(nodeStatFilePath, "r");
+		processStatFile = new RandomAccessFile(processStatFilePath, "r");
+		this.managedProcess = managedProcess;
+		this.agent = agent;
+		pool = Executors.newCachedThreadPool();
+		this.process = process;
+		gcStatsCollector = new DuccGarbageStatsCollector(logger, process);
+
+		// keep a refernce to this so that we can call close() when the process
+		// terminates. We need to
+		// close fds to stat and statm files
+		managedProcess.setMetricsProcessor(this);
+
+		blockSize = agent.getOSPageSize();
+
+		if (System.getProperty("ducc.agent.share.size.fudge.factor") != null) {
+			try {
+				fudgeFactor = Integer.parseInt(System
+						.getProperty("ducc.agent.share.size.fudge.factor"));
+			} catch (NumberFormatException e) {
+				e.printStackTrace();
+			}
+		}
+		closed = false;
+	}
+
+	public void stop() {
+		try {
+			if (pool != null) {
+				pool.shutdown();
+			}
+		} catch (Exception e) {
+			logger.error("LinuxProcessMetricsProcessor.stop()", null, e);
+
+		}
+	}
+
+	public void close() {
+		closed = true;
+		try {
+			if (statmFile != null && statmFile.getFD().valid()) {
+				statmFile.close();
+			}
+			if (processStatFile != null && processStatFile.getFD().valid()) {
+				processStatFile.close();
+			}
+			this.stop();
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+	}
+
+	private boolean collectStats(ProcessState state) {
+		if (process.getProcessState().equals(ProcessState.Stopped)
+				|| process.getProcessState().equals(ProcessState.Killed)
+				|| process.getProcessState().equals(ProcessState.Failed)
+				|| process.getProcessState().equals(ProcessState.Stopping)) {
+			return false; // dont collect stats
+		}
+		return true;
+	}
+
+	public void process(Exchange e) {
+		if (closed) { // files closed
+			return;
+		}
+		// if process is stopping or already dead dont collect metrics. The
+		// Camel
+		// route has just been stopped.
+		if (!collectStats(process.getProcessState())) {
+			return;
+		}
+		if (process.getProcessState().equals(ProcessState.Initializing)
+				|| process.getProcessState().equals(ProcessState.Running))
+			try {
+
+				// executes script
+				// DUCC_HOME/admin/ducc_get_process_swap_usage.sh which sums up
+				// swap used by
+				// a process
+				long totalSwapUsage = 0;
+				long totalFaults = 0;
+				long totalCpuUsage = 0;
+				long totalRss = 0;
+				int currentCpuUsage = 0;
+				Future<ProcessMemoryPageLoadUsage> processMajorFaultUsage = null;
+				Future<ProcessCpuUsage> processCpuUsage = null;
+				String[] cgroupPids = new String[0];
+				try {
+					String swapUsageScript = System
+							.getProperty("ducc.agent.swap.usage.script");
+
+					if (agent.useCgroups) {
+						String containerId = agent.cgroupsManager
+								.getContainerId(managedProcess);
+						cgroupPids = agent.cgroupsManager
+								.getPidsInCgroup(containerId);
+						for (String pid : cgroupPids) {
+							// the swap usage script is defined in
+							// ducc.properties
+							if (swapUsageScript != null) {
+								DuccProcessSwapSpaceUsage processSwapSpaceUsage = new DuccProcessSwapSpaceUsage(
+										pid, managedProcess.getOwner(),
+										swapUsageScript, logger);
+								totalSwapUsage += processSwapSpaceUsage
+										.getSwapUsage();
+							}
+
+							ProcessMajorFaultCollector processMajorFaultUsageCollector = new ProcessMajorFaultCollector(
+									logger, pid);
+							// if process is stopping or already dead dont
+							// collect metrics. The Camel
+							// route has just been stopped.
+							if (!collectStats(process.getProcessState())) {
+								return;
+							}
+
+							processMajorFaultUsage = pool
+									.submit(processMajorFaultUsageCollector);
+							totalFaults += processMajorFaultUsage.get()
+									.getMajorFaults();
+							RandomAccessFile raf = null;
+							try {
+								raf = new RandomAccessFile("/proc/" + pid + "/stat", "r");
+								ProcessCpuUsageCollector processCpuUsageCollector = new ProcessCpuUsageCollector(
+										logger, pid, raf, 42, 0);
+
+								// if process is stopping or already dead dont
+								// collect metrics. The Camel
+								// route has just been stopped.
+								if (!collectStats(process.getProcessState())) {
+									return;
+								}
+
+								processCpuUsage = pool
+										.submit(processCpuUsageCollector);
+								totalCpuUsage += (processCpuUsage.get()
+										.getTotalJiffies() / agent.cpuClockRate);
+								
+							} catch( Exception ee) {
+								if ( raf != null ) {
+									raf.close();
+								}
+							}
+
+							currentCpuUsage += collectProcessCurrentCPU(pid);
+
+							RandomAccessFile rStatmFile = null;
+							try {
+								rStatmFile = new RandomAccessFile("/proc/"
+										+ pid + "/statm", "r");
+							} catch (FileNotFoundException fnfe) {
+								logger.info(
+										"LinuxProcessMetricsProcessor.process",
+										null,
+										"Statm File:"
+												+ "/proc/"
+												+ pid
+												+ "/statm *Not Found*. Process must have already exited");
+								return;
+							}
+							ProcessResidentMemoryCollector collector = new ProcessResidentMemoryCollector(
+									rStatmFile, 2, 0);
+							// if process is stopping or already dead dont
+							// collect metrics. The Camel
+							// route has just been stopped.
+							if (!collectStats(process.getProcessState())) {
+								return;
+							}
+
+							Future<ProcessResidentMemory> prm = pool
+									.submit(collector);
+
+							totalRss += prm.get().get();
+
+							rStatmFile.close();
+						}
+					} else {
+						if (swapUsageScript != null) {
+							DuccProcessSwapSpaceUsage processSwapSpaceUsage = new DuccProcessSwapSpaceUsage(
+									process.getPID(),
+									managedProcess.getOwner(), swapUsageScript,
+									logger);
+							totalSwapUsage = processSwapSpaceUsage
+									.getSwapUsage();
+						}
+
+						ProcessMajorFaultCollector processMajorFaultUsageCollector = new ProcessMajorFaultCollector(
+								logger, process.getPID());
+
+						// if process is stopping or already dead dont collect
+						// metrics. The Camel
+						// route has just been stopped.
+						if (!collectStats(process.getProcessState())) {
+							return;
+						}
+						processMajorFaultUsage = pool
+								.submit(processMajorFaultUsageCollector);
+						totalFaults = processMajorFaultUsage.get()
+								.getMajorFaults();
+
+						ProcessCpuUsageCollector processCpuUsageCollector = new ProcessCpuUsageCollector(
+								logger, process.getPID(), processStatFile, 42,
+								0);
+
+						// if process is stopping or already dead dont collect
+						// metrics. The Camel
+						// route has just been stopped.
+						if (!collectStats(process.getProcessState())) {
+							return;
+						}
+						processCpuUsage = pool.submit(processCpuUsageCollector);
+						totalCpuUsage = processCpuUsage.get().getTotalJiffies()
+								/ agent.cpuClockRate;
+
+						currentCpuUsage = collectProcessCurrentCPU(process
+								.getPID());
+
+						ProcessResidentMemoryCollector collector = new ProcessResidentMemoryCollector(
+								statmFile, 2, 0);
+						// if process is stopping or already dead dont collect
+						// metrics. The Camel
+						// route has just been stopped.
+						if (!collectStats(process.getProcessState())) {
+							return;
+						}
+
+						Future<ProcessResidentMemory> prm = pool
+								.submit(collector);
+
+						totalRss = prm.get().get();
+					}
+
+				} catch (Exception exc) {
+					logger.error("LinuxProcessMetricsProcessor.process", null,
+							exc);
+				}
+
+				// report cpu utilization while the process is running
+				if (managedProcess.getDuccProcess().getProcessState()
+						.equals(ProcessState.Running)) {
+					if (agent.cpuClockRate > 0) {
+						// if the process just change state from Initializing to
+						// Running ...
+						if (initializing) {
+							initializing = false;
+							// cache how much cpu was used up during
+							// initialization of the process
+							totalCpuInitUsage = totalCpuUsage;
+							// capture time when process state changed to
+							// Running
+							clockAtStartOfRun = System.currentTimeMillis();
+						}
+						// normalize time in running state into seconds
+						long timeSinceRunningInSeconds = (System
+								.currentTimeMillis() - clockAtStartOfRun) / 1000;
+						if (timeSinceRunningInSeconds > 0) { // prevent division
+																// by zero
+							// normalize cpu % usage to report in seconds. Also
+							// subtract how much cpu was
+							// used during initialization
+							percentCPU = 100
+									* (totalCpuUsage - totalCpuInitUsage)
+									/ timeSinceRunningInSeconds;
+						}
+
+						// Publish cumulative CPU usage
+						process.setCpuTime(percentCPU);
+					} else {
+						process.setCpuTime(0);
+						logger.info(
+								"process",
+								null,
+								"Agent is unable to determine Node's clock rate. Defaulting CPU Time to 0 For Process with PID:"
+										+ process.getPID());
+					}
+
+				} else if (managedProcess.getDuccProcess().getProcessState()
+						.equals(ProcessState.Initializing)) {
+					// report 0 for CPU while the process is initializing
+					process.setCpuTime(0);
+				} else {
+					// if process is not dead, report the last known percentCPU
+					process.setCpuTime(percentCPU);
+				}
+				if (percentCPU > 0) {
+					process.setCurrentCPU(currentCpuUsage);
+
+					logger.info(
+							"process",
+							null,
+							"----------- PID:" + process.getPID()
+									+ " Average CPU Time:" + percentCPU
+									+ "% Current CPU Time:"
+									+ process.getCurrentCPU());
+				}
+				// long majorFaults =
+				// processMajorFaultUsage.get().getMajorFaults();
+				// collects process Major faults (swap in memory)
+				process.setMajorFaults(totalFaults);
+				// Current Process Swap Usage in bytes
+				long st = System.currentTimeMillis();
+				long processSwapUsage = totalSwapUsage * 1024;
+				// collects swap usage from /proc/<PID>/smaps file via a script
+				// DUCC_HOME/admin/collect_process_swap_usage.sh
+				process.setSwapUsage(processSwapUsage);
+				logger.info(
+						"process",
+						null,
+						"----------- PID:" + process.getPID()
+								+ " Major Faults:" + totalFaults
+								+ " Process Swap Usage:" + processSwapUsage
+								+ " Max Swap Usage Allowed:"
+								+ managedProcess.getMaxSwapThreshold()
+								+ " Time to Collect Swap Usage:"
+								+ (System.currentTimeMillis() - st));
+				if (processSwapUsage > 0
+						&& processSwapUsage > managedProcess
+								.getMaxSwapThreshold()) {
+					/*
+					 * // Disable code that kill a process if it exceeds its
+					 * swap allocation. Per JIRA // UIMA-3320, agent will
+					 * monitor node-wide swap usage and will kill processes that
+					 * // use most of the swap. logger.error( "process", null,
+					 * "\n\n********************************************************\n\tProcess with PID:"
+					 * + managedProcess.getPid() +
+					 * " Exceeded its Max Swap Usage Threshold of " +
+					 * (managedProcess.getMaxSwapThreshold() / 1024) / 1024 +
+					 * " MBs. The Current Swap Usage is: " + (processSwapUsage /
+					 * 1024) / 1024 +
+					 * " MBs .Killing process ...\n********************************************************\n\n"
+					 * ); try { managedProcess.kill(); // mark it for death
+					 * process
+					 * .setReasonForStoppingProcess(ReasonForStoppingProcess
+					 * .ExceededSwapThreshold .toString());
+					 * agent.stopProcess(process);
+					 * 
+					 * if ( agent.useCgroups ) { for( String pid : cgroupPids )
+					 * { // skip the main process that was just killed above.
+					 * Only kill // its child processes. if (
+					 * pid.equals(managedProcess.getDuccProcess().getPID())) {
+					 * continue; } killChildProcess(pid,"-15"); } }
+					 * 
+					 * } catch (Exception ee) { logger.error("process", null,
+					 * ee); } return;
+					 */
+				} else {
+					// Use Memory Guard only if cgroups are disabled and fudge
+					// factor > -1
+
+					if (!agent.useCgroups
+							&& fudgeFactor > -1
+							&& managedProcess.getProcessMemoryAssignment()
+									.getMaxMemoryWithFudge() > 0) {
+						// RSS is in terms of pages(blocks) which size is system
+						// dependent. Default 4096 bytes
+						long rss = (totalRss * (blockSize / 1024)) / 1024; // normalize
+																			// RSS
+																			// into
+																			// MB
+						logger.trace(
+								"process",
+								null,
+								"*** Process with PID:"
+										+ managedProcess.getPid()
+										+ " Assigned Memory (MB): "
+										+ managedProcess
+												.getProcessMemoryAssignment()
+										+ " MBs. Current RSS (MB):" + rss);
+						// check if process resident memory exceeds its memory
+						// assignment calculate in the PM
+						if (rss > managedProcess.getProcessMemoryAssignment()
+								.getMaxMemoryWithFudge()) {
+							logger.error(
+									"process",
+									null,
+									"\n\n********************************************************\n\tProcess with PID:"
+											+ managedProcess.getPid()
+											+ " Exceeded its max memory assignment (including a fudge factor) of "
+											+ managedProcess
+													.getProcessMemoryAssignment()
+													.getMaxMemoryWithFudge()
+											+ " MBs. This Process Resident Memory Size: "
+											+ rss
+											+ " MBs .Killing process ...\n********************************************************\n\n");
+							try {
+								managedProcess.kill(); // mark it for death
+								process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededShareSize
+										.toString());
+								agent.stopProcess(process);
+
+								if (agent.useCgroups) {
+									for (String pid : cgroupPids) {
+										// skip the main process that was just
+										// killed above. Only kill
+										// its child processes.
+										if (pid.equals(managedProcess
+												.getDuccProcess().getPID())) {
+											continue;
+										}
+										killChildProcess(pid, "-15");
+									}
+								}
+							} catch (Exception ee) {
+								logger.error("process", null, ee);
+							}
+							return;
+						}
+					}
+
+				}
+				// Publish resident memory
+				process.setResidentMemory((totalRss * blockSize));
+				// dont collect GC metrics for POPs. May not be java or may not
+				// be a jmx enabled java process
+				if (!process.getProcessType().equals(ProcessType.Pop)) {
+					ProcessGarbageCollectionStats gcStats = gcStatsCollector
+							.collect();
+					process.setGarbageCollectionStats(gcStats);
+					logger.info(
+							"process",
+							null,
+							"PID:" + process.getPID()
+									+ " Total GC Collection Count :"
+									+ gcStats.getCollectionCount()
+									+ " Total GC Collection Time :"
+									+ gcStats.getCollectionTime());
+				}
+
+			} catch (Exception ex) {
+				logger.error("process", null, e);
+				ex.printStackTrace();
+			}
 
-  private long totalCpuInitUsage = 0;
-  
-  private boolean initializing = true;
-  
-  private final ExecutorService pool;
-
-  private IDuccProcess process;
-
-  private DuccGarbageStatsCollector gcStatsCollector;
-
-  private int blockSize = 4096; // default, OS specific
-
-  private DuccLogger logger;
-
-  private ManagedProcess managedProcess;
-
-  private NodeAgent agent;
-
-  private int fudgeFactor = 5; // default is 5%
-
-  private volatile boolean closed = true;
-  
-  private long clockAtStartOfRun=0;
-  
-  private long percentCPU=0;
-  
-    public LinuxProcessMetricsProcessor(DuccLogger logger, IDuccProcess process, NodeAgent agent,
-          String statmFilePath, String nodeStatFilePath, String processStatFilePath,
-          ManagedProcess managedProcess) throws FileNotFoundException {
-    this.logger = logger;
-    statmFile = new RandomAccessFile(statmFilePath, "r");
-    // nodeStatFile = new RandomAccessFile(nodeStatFilePath, "r");
-    processStatFile = new RandomAccessFile(processStatFilePath, "r");
-    this.managedProcess = managedProcess;
-    this.agent = agent;
-    pool = Executors.newCachedThreadPool();
-    this.process = process;
-    gcStatsCollector = new DuccGarbageStatsCollector(logger, process);
-
-    // keep a refernce to this so that we can call close() when the process terminates. We need to 
-    // close fds to stat and statm files
-    managedProcess.setMetricsProcessor(this);
-    
-    blockSize = agent.getOSPageSize();
-    
-    if (System.getProperty("ducc.agent.share.size.fudge.factor") != null) {
-      try {
-        fudgeFactor = Integer.parseInt(System.getProperty("ducc.agent.share.size.fudge.factor"));
-      } catch (NumberFormatException e) {
-        e.printStackTrace();
-      }
-    }
-    closed = false;
-  }
-  public void stop() {
-	  try {
-		  if ( pool != null ) {
-			  pool.shutdown();
-		  }
-	  } catch( Exception e) {
-          logger.error("LinuxProcessMetricsProcessor.stop()", null, e);
-
-	  }
-  }
-  public void close() {
-	  closed = true;
-    try {
-      if (statmFile != null && statmFile.getFD().valid()) {
-        statmFile.close();
-      }
-      if ( processStatFile != null && processStatFile.getFD().valid()) {
-    	  processStatFile.close();
-      }
-      this.stop();
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-  }
-
-  private boolean collectStats(ProcessState state) {
-	    if (process.getProcessState().equals(ProcessState.Stopped)
-	            || process.getProcessState().equals(ProcessState.Killed)
-	            || process.getProcessState().equals(ProcessState.Failed)
-	            || process.getProcessState().equals(ProcessState.Stopping)  ) {
-	    	return false;  // dont collect stats
-	    }
-	  return true;
-  }
-  public void process(Exchange e) {
-	  if ( closed ) { // files closed 
-		  return;
-	  }
-	  // if process is stopping or already dead dont collect metrics. The Camel
-	  // route has just been stopped.
-	  if ( !collectStats(process.getProcessState() ) ) {
-		  return;
-	  }
-	  if (process.getProcessState().equals(ProcessState.Initializing)
-            || process.getProcessState().equals(ProcessState.Running))
-      try {
-
-        // executes script DUCC_HOME/admin/ducc_get_process_swap_usage.sh which sums up swap used by
-        // a process
-        long totalSwapUsage = 0;
-        long totalFaults = 0;
-        long totalCpuUsage = 0;
-        long totalRss = 0;
-        int currentCpuUsage =0;
-        Future<ProcessMemoryPageLoadUsage> processMajorFaultUsage = null;
-        Future<ProcessCpuUsage> processCpuUsage = null;
-        String[] cgroupPids = new String[0];
-        try {
-          String swapUsageScript = System.getProperty("ducc.agent.swap.usage.script");
-          
-          if ( agent.useCgroups ) {
-        	String containerId = agent.cgroupsManager.getContainerId(managedProcess);
-            cgroupPids = 
-                agent.cgroupsManager.getPidsInCgroup(containerId);
-            for( String pid : cgroupPids ) {
-            	// the swap usage script is defined in ducc.properties
-                if ( swapUsageScript != null ) {
-                	DuccProcessSwapSpaceUsage processSwapSpaceUsage = new DuccProcessSwapSpaceUsage(
-                            pid, managedProcess.getOwner(), swapUsageScript, logger);
-                    totalSwapUsage += processSwapSpaceUsage.getSwapUsage();
-                }
-                
-                ProcessMajorFaultCollector processMajorFaultUsageCollector = 
-                		new ProcessMajorFaultCollector(logger, pid);
-          	  // if process is stopping or already dead dont collect metrics. The Camel
-          	  // route has just been stopped.
-          	  if ( !collectStats(process.getProcessState() ) ) {
-          		  return;
-          	  }
-
-                processMajorFaultUsage = pool
-                        .submit(processMajorFaultUsageCollector);
-                totalFaults += processMajorFaultUsage.get().getMajorFaults();
-                
-                ProcessCpuUsageCollector processCpuUsageCollector = new ProcessCpuUsageCollector(logger,
-                        pid, processStatFile, 42, 0);
-
-          	  // if process is stopping or already dead dont collect metrics. The Camel
-          	  // route has just been stopped.
-          	  if ( !collectStats(process.getProcessState() ) ) {
-          		  return;
-          	  }
-
-                processCpuUsage = pool.submit(processCpuUsageCollector);
-                totalCpuUsage += (processCpuUsage.get().getTotalJiffies()/ agent.cpuClockRate);
-                
-                currentCpuUsage += collectProcessCurrentCPU(pid);
-                
-                RandomAccessFile rStatmFile =
-                        new RandomAccessFile("/proc/" + pid + "/statm", "r");
-                ProcessResidentMemoryCollector collector = new ProcessResidentMemoryCollector(rStatmFile, 2,
-                        0);
-          	  // if process is stopping or already dead dont collect metrics. The Camel
-          	  // route has just been stopped.
-          	  if ( !collectStats(process.getProcessState() ) ) {
-          		  return;
-          	  }
-
-                Future<ProcessResidentMemory> prm = pool.submit(collector);
-               
-                totalRss += prm.get().get();
-                
-                rStatmFile.close();
-            }
-          } else {
-              if ( swapUsageScript != null ) {
-            	  DuccProcessSwapSpaceUsage processSwapSpaceUsage = new DuccProcessSwapSpaceUsage(
-            	             process.getPID(), managedProcess.getOwner(), swapUsageScript, logger);
-            	  totalSwapUsage = processSwapSpaceUsage.getSwapUsage();
-              }
-
-             ProcessMajorFaultCollector processMajorFaultUsageCollector = 
-            		 new ProcessMajorFaultCollector(logger, process.getPID());
-
-       	  // if process is stopping or already dead dont collect metrics. The Camel
-       	  // route has just been stopped.
-       	  if ( !collectStats(process.getProcessState() ) ) {
-       		  return;
-       	  }
-             processMajorFaultUsage = pool
-                     .submit(processMajorFaultUsageCollector);
-             totalFaults = processMajorFaultUsage.get().getMajorFaults();
-             
-             ProcessCpuUsageCollector processCpuUsageCollector = new ProcessCpuUsageCollector(logger,
-                     process.getPID(), processStatFile, 42, 0);
-
-       	  // if process is stopping or already dead dont collect metrics. The Camel
-       	  // route has just been stopped.
-       	  if ( !collectStats(process.getProcessState() ) ) {
-       		  return;
-       	  }
-             processCpuUsage = pool.submit(processCpuUsageCollector);
-             totalCpuUsage = processCpuUsage.get().getTotalJiffies()/ agent.cpuClockRate;
-             
-             currentCpuUsage = collectProcessCurrentCPU(process.getPID());
-             
-             ProcessResidentMemoryCollector collector = new ProcessResidentMemoryCollector(statmFile, 2,
-                     0);
-       	  // if process is stopping or already dead dont collect metrics. The Camel
-       	  // route has just been stopped.
-       	  if ( !collectStats(process.getProcessState() ) ) {
-       		  return;
-       	  }
-
-             Future<ProcessResidentMemory> prm = pool.submit(collector);
-             
-             totalRss = prm.get().get();
-          }
-          
-        } catch( Exception exc) {
-          logger.error("LinuxProcessMetricsProcessor.process", null, exc);
-        }
-          
-
-        // report cpu utilization while the process is running
-        if ( managedProcess.getDuccProcess().getProcessState().equals(ProcessState.Running)) {
-          if (agent.cpuClockRate > 0) {
-        	  // if the process just change state from Initializing to Running ...
-            if ( initializing ) {
-              initializing = false;
-              // cache how much cpu was used up during initialization of the process
-              totalCpuInitUsage = totalCpuUsage;
-              // capture time when process state changed to Running
-              clockAtStartOfRun = System.currentTimeMillis();
-            }
-            // normalize time in running state into seconds
-            long timeSinceRunningInSeconds = (System.currentTimeMillis() - clockAtStartOfRun)/1000;
-            if ( timeSinceRunningInSeconds > 0) { // prevent division by zero
-                //  normalize cpu % usage to report in seconds. Also subtract how much cpu was
-                //  used during initialization
-                percentCPU = 100* ( totalCpuUsage - totalCpuInitUsage )/timeSinceRunningInSeconds;
-            }
-            
-            // Publish cumulative CPU usage
-            process.setCpuTime(percentCPU);
-          } else {
-            process.setCpuTime(0);
-            logger.info("process", null,
-                    "Agent is unable to determine Node's clock rate. Defaulting CPU Time to 0 For Process with PID:"
-                            + process.getPID());
-          }
-          
-        } else if ( managedProcess.getDuccProcess().getProcessState().equals(ProcessState.Initializing)) {
-            //   report 0 for CPU while the process is initializing
-            process.setCpuTime(0);
-        }
-        else {
-          //   if process is not dead, report the last known percentCPU
-          process.setCpuTime(percentCPU);
-        }
-        if ( percentCPU > 0 ) {
-        	process.setCurrentCPU(currentCpuUsage);
-        	 
-            logger.info("process", null, "----------- PID:" + process.getPID()
-                    + " Average CPU Time:" + percentCPU+"% Current CPU Time:"+process.getCurrentCPU());
-        }
-       // long majorFaults = processMajorFaultUsage.get().getMajorFaults();
-        // collects process Major faults (swap in memory)
-        process.setMajorFaults(totalFaults);
-        // Current Process Swap Usage in bytes
-        long st = System.currentTimeMillis();
-        long processSwapUsage = totalSwapUsage * 1024;
-        // collects swap usage from /proc/<PID>/smaps file via a script
-        // DUCC_HOME/admin/collect_process_swap_usage.sh
-        process.setSwapUsage(processSwapUsage);
-        logger.info("process", null, "----------- PID:" + process.getPID() + " Major Faults:"
-                + totalFaults + " Process Swap Usage:" + processSwapUsage
-                + " Max Swap Usage Allowed:" + managedProcess.getMaxSwapThreshold()
-                + " Time to Collect Swap Usage:" + (System.currentTimeMillis() - st));
-        if (processSwapUsage > 0 && processSwapUsage > managedProcess.getMaxSwapThreshold()) {
-        /*
-            // Disable code that kill a process if it exceeds its swap allocation. Per JIRA 
-            // UIMA-3320, agent will monitor node-wide swap usage and will kill processes that
-            // use most of the swap.
-        	logger.error(
-                  "process",
-                  null,
-                  "\n\n********************************************************\n\tProcess with PID:"
-                          + managedProcess.getPid()
-                          + " Exceeded its Max Swap Usage Threshold of "
-                          + (managedProcess.getMaxSwapThreshold() / 1024)
-                          / 1024
-                          + " MBs. The Current Swap Usage is: "
-                          + (processSwapUsage / 1024)
-                          / 1024
-                          + " MBs .Killing process ...\n********************************************************\n\n");
-          try {
-            managedProcess.kill(); // mark it for death
-            process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededSwapThreshold
-                    .toString());
-            agent.stopProcess(process);
-            
-            if ( agent.useCgroups ) {
-              for( String pid : cgroupPids ) {
-                //  skip the main process that was just killed above. Only kill
-                //  its child processes.
-                if ( pid.equals(managedProcess.getDuccProcess().getPID())) {
-                  continue;
-                }
-                killChildProcess(pid,"-15");
-              }
-            } 
-            
-          } catch (Exception ee) {
-            logger.error("process", null, ee);
-          }
-          return;
-          */
-        } else {
-          // Use Memory Guard only if cgroups are disabled and fudge factor > -1
-
-          if ( !agent.useCgroups && fudgeFactor > -1
-                  && managedProcess.getProcessMemoryAssignment().getMaxMemoryWithFudge() > 0) {
-            // RSS is in terms of pages(blocks) which size is system dependent. Default 4096 bytes
-            long rss = (totalRss * (blockSize / 1024)) / 1024; // normalize RSS into MB
-            logger.trace("process", null, "*** Process with PID:" + managedProcess.getPid()
-                    + " Assigned Memory (MB): " + managedProcess.getProcessMemoryAssignment()
-                    + " MBs. Current RSS (MB):" + rss);
-            // check if process resident memory exceeds its memory assignment calculate in the PM
-            if (rss > managedProcess.getProcessMemoryAssignment().getMaxMemoryWithFudge()) {
-              logger.error(
-                      "process",
-                      null,
-                      "\n\n********************************************************\n\tProcess with PID:"
-                              + managedProcess.getPid()
-                              + " Exceeded its max memory assignment (including a fudge factor) of "
-                              + managedProcess.getProcessMemoryAssignment().getMaxMemoryWithFudge()
-                              + " MBs. This Process Resident Memory Size: "
-                              + rss
-                              + " MBs .Killing process ...\n********************************************************\n\n");
-              try {
-                managedProcess.kill(); // mark it for death
-                process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededShareSize
-                        .toString());
-                agent.stopProcess(process);
-                
-                if ( agent.useCgroups ) {
-                  for( String pid : cgroupPids ) {
-                    //  skip the main process that was just killed above. Only kill
-                    //  its child processes.
-                    if ( pid.equals(managedProcess.getDuccProcess().getPID())) {
-                      continue;
-                    }
-                    killChildProcess(pid,"-15");
-                  }
-                } 
-              } catch (Exception ee) {
-                logger.error("process", null, ee);
-              }
-              return;
-            }
-          }
-
-        }
-        // Publish resident memory
-        process.setResidentMemory((totalRss * blockSize));
-        //	dont collect GC metrics for POPs. May not be java or may not be a jmx enabled java process
-        if ( !process.getProcessType().equals(ProcessType.Pop)) {
-            ProcessGarbageCollectionStats gcStats = gcStatsCollector.collect();
-            process.setGarbageCollectionStats(gcStats);
-            logger.info(
-                    "process",
-                    null,
-                    "PID:" + process.getPID() + " Total GC Collection Count :"
-                            + gcStats.getCollectionCount() + " Total GC Collection Time :"
-                            + gcStats.getCollectionTime());
-        }
-
-      } catch (Exception ex) {
-        logger.error("process", null, e);
-        ex.printStackTrace();
-      }
+	}
 
-  }
-  private int  collectProcessCurrentCPU(String pid) throws Exception {
+	private int collectProcessCurrentCPU(String pid) throws Exception {
 		InputStream stream = null;
-	    BufferedReader reader = null;
-	    String cpuTime = "0";
-	    ProcessBuilder pb;
-        int cpuint = 0;
-
-	     if ( process != null && 
-	          (process.getProcessState().equals(ProcessState.Running) ||
-  	          (process.getProcessState().equals(ProcessState.Initializing) ) ) ) {
-		     // run top in batch mode and filter just the CPU
-		     pb = new ProcessBuilder("/bin/sh", "-c", "top -b -n 1 -p "+pid+" | tail -n 2 | head -n 1 | awk '{print $9}'");
-
-		      pb.redirectErrorStream(true);
-		      Process proc = pb.start();
-		      proc.waitFor();
-		      //  spawn ps command and scrape the output
-		      stream = proc.getInputStream();
-		      reader = new BufferedReader(new InputStreamReader(stream));
-		      String line;
-		      String regex = "\\s+";
-		      // read the next line from ps output
-		      while ((line = reader.readLine()) != null) {
-		          String tokens[] = line.split(regex);
-		          if ( tokens.length > 0 ) {
-		        	  logger.info("collectProcessCurrentCPU",null, line+" == CPUTIME:"+tokens[0]);
-		        	  cpuTime = tokens[0];
-		          }
-		      }	
-		      if ( cpuTime.indexOf(".") > -1) {
-		    	  cpuTime = cpuTime.substring(0, cpuTime.indexOf("."));
-		      }
-		      stream.close();
-		      try {
-		    	 cpuint = Integer.valueOf(cpuTime);
-		      } catch( NumberFormatException e) {
-		    	  // ignore, return 0
-		      }
-
-	     }
-	     return cpuint;
-  }
-  private void killChildProcess(final String pid, final String signal) {
-    // spawn a thread that will do kill -15, wait for 1 minute and kill the process
-    // hard if it is still alive
-    (new Thread() {
-      public void run() {
-        String c_launcher_path = 
-                Utils.resolvePlaceholderIfExists(
-                        System.getProperty("ducc.agent.launcher.ducc_spawn_path"),System.getProperties());
-        try {
-          String[] killCmd=null;
-          String useSpawn = System.getProperty("ducc.agent.launcher.use.ducc_spawn");
-          if ( useSpawn != null && useSpawn.toLowerCase().equals("true")) {
-              killCmd = new String[] { c_launcher_path,
-                      "-u", ((ManagedProcess)managedProcess).getOwner(), "--","/bin/kill",signal,((ManagedProcess) managedProcess).getDuccProcess().getPID() };  
-          } else {
-              killCmd = new String[] { "/bin/kill","-15",((ManagedProcess) managedProcess).getDuccProcess().getPID() };  
-          }
-          ProcessBuilder pb = new ProcessBuilder(killCmd);
-          Process p = pb.start();
-          p.wait(1000 * 60);  // wait for 1 minute and whack the process if still alive
-          p.destroy();
-        } catch( Exception e) {
-          logger.error("killChildProcess", managedProcess.getWorkDuccId(), e);
-        }
-      }
-     }).start();
-     
-    
-  }
-  
+		BufferedReader reader = null;
+		String cpuTime = "0";
+		ProcessBuilder pb;
+		int cpuint = 0;
+
+		if (process != null
+				&& (process.getProcessState().equals(ProcessState.Running) || (process
+						.getProcessState().equals(ProcessState.Initializing)))) {
+			// run top in batch mode and filter just the CPU
+			pb = new ProcessBuilder("/bin/sh", "-c", "top -b -n 1 -p " + pid
+					+ " | tail -n 2 | head -n 1 | awk '{print $9}'");
+
+			pb.redirectErrorStream(true);
+			Process proc = pb.start();
+			proc.waitFor();
+			// spawn ps command and scrape the output
+			stream = proc.getInputStream();
+			reader = new BufferedReader(new InputStreamReader(stream));
+			String line;
+			String regex = "\\s+";
+			// read the next line from ps output
+			while ((line = reader.readLine()) != null) {
+				String tokens[] = line.split(regex);
+				if (tokens.length > 0) {
+					logger.info("collectProcessCurrentCPU", null, line
+							+ " == CPUTIME:" + tokens[0]);
+					cpuTime = tokens[0];
+				}
+			}
+			if (cpuTime.indexOf(".") > -1) {
+				cpuTime = cpuTime.substring(0, cpuTime.indexOf("."));
+			}
+			stream.close();
+			try {
+				cpuint = Integer.valueOf(cpuTime);
+			} catch (NumberFormatException e) {
+				// ignore, return 0
+			}
+
+		}
+		return cpuint;
+	}
+
+	private void killChildProcess(final String pid, final String signal) {
+		// spawn a thread that will do kill -15, wait for 1 minute and kill the
+		// process
+		// hard if it is still alive
+		(new Thread() {
+			public void run() {
+				String c_launcher_path = Utils
+						.resolvePlaceholderIfExists(
+								System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
+								System.getProperties());
+				try {
+					String[] killCmd = null;
+					String useSpawn = System
+							.getProperty("ducc.agent.launcher.use.ducc_spawn");
+					if (useSpawn != null
+							&& useSpawn.toLowerCase().equals("true")) {
+						killCmd = new String[] {
+								c_launcher_path,
+								"-u",
+								((ManagedProcess) managedProcess).getOwner(),
+								"--",
+								"/bin/kill",
+								signal,
+								((ManagedProcess) managedProcess)
+										.getDuccProcess().getPID() };
+					} else {
+						killCmd = new String[] {
+								"/bin/kill",
+								"-15",
+								((ManagedProcess) managedProcess)
+										.getDuccProcess().getPID() };
+					}
+					ProcessBuilder pb = new ProcessBuilder(killCmd);
+					Process p = pb.start();
+					p.wait(1000 * 60); // wait for 1 minute and whack the
+										// process if still alive
+					p.destroy();
+				} catch (Exception e) {
+					logger.error("killChildProcess",
+							managedProcess.getWorkDuccId(), e);
+				}
+			}
+		}).start();
+
+	}
+
 }