You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/04/12 18:46:55 UTC

svn commit: r1467346 - in /uima/sandbox/uima-ducc/trunk: src/main/admin/ uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/ uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/...

Author: cwiklik
Date: Fri Apr 12 16:46:54 2013
New Revision: 1467346

URL: http://svn.apache.org/r1467346
Log:
UIMA-2804 Support for CGroups

Added:
    uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_get_process_swap_usage.sh
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/
    uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessSwapSpaceUsage.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/ProcessMemoryAssignment.java
Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessLifecycleController.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-pm/src/main/java/org/apache/uima/ducc/pm/ProcessManagerComponent.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccJobDeployment.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccProcess.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccJobDeployment.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccProcess.java

Added: uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_get_process_swap_usage.sh
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_get_process_swap_usage.sh?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_get_process_swap_usage.sh (added)
+++ uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_get_process_swap_usage.sh Fri Apr 12 16:46:54 2013
@@ -0,0 +1,28 @@
+#!/bin/bash
+# -----------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# -----------------------------------------------------------------------
+
+# Sums up swap use of a process with a given PID as reported by /proc/<PID>/smaps file
+
+for swap in `grep Swap /proc/$1/smaps 2>/dev/null|awk '{print $2}'`; 
+do  
+let sum=$sum+$swap; 
+done 
+echo $sum
+

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java Fri Apr 12 16:46:54 2013
@@ -18,9 +18,11 @@
 */
 package org.apache.uima.ducc.agent;
 
+import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileReader;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.net.InetAddress;
@@ -42,9 +44,11 @@ import org.apache.camel.builder.RouteBui
 import org.apache.commons.lang.SerializationUtils;
 import org.apache.uima.ducc.agent.config.AgentConfiguration;
 import org.apache.uima.ducc.agent.event.ProcessLifecycleObserver;
+import org.apache.uima.ducc.agent.launcher.CGroupsManager;
 import org.apache.uima.ducc.agent.launcher.Launcher;
 import org.apache.uima.ducc.agent.launcher.ManagedProcess;
 import org.apache.uima.ducc.agent.metrics.collectors.NodeUsersCollector;
+import org.apache.uima.ducc.common.Node;
 import org.apache.uima.ducc.common.NodeIdentity;
 import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties;
 import org.apache.uima.ducc.common.component.AbstractDuccComponent;
@@ -74,6 +78,7 @@ import org.apache.uima.ducc.transport.ev
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
 import org.apache.uima.ducc.transport.event.common.IResourceState.ResourceState;
 import org.apache.uima.ducc.transport.event.common.ITimeWindow;
+import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment;
 import org.apache.uima.ducc.transport.event.common.TimeWindow;
 
 
@@ -124,6 +129,15 @@ public class NodeAgent extends AbstractD
   private RogueProcessReaper rogueProcessReaper = 
           new RogueProcessReaper(logger, 5, 10);
   
+  public volatile  boolean useCgroups = false;
+  
+  public CGroupsManager cgroupsManager = null;
+  
+  public Node node = null;
+  
+  public volatile boolean excludeAPs = false;
+
+  public int shareQuantum;
   /**
    * Ctor used exclusively for black-box testing of this class.
    */
@@ -133,6 +147,7 @@ public class NodeAgent extends AbstractD
   public NodeAgent(NodeIdentity ni) {
     this();
     this.nodeIdentity = ni;
+    
   }
 
   /**
@@ -152,6 +167,66 @@ public class NodeAgent extends AbstractD
     this.launcher = launcher;
     this.configurationFactory = factory;
     this.commonProcessDispatcher = factory.getCommonProcessDispatcher(context);
+    
+    if ( System.getProperty("ducc.rm.share.quantum") != null && System.getProperty("ducc.rm.share.quantum").trim().length() > 0 ) {
+	    shareQuantum = Integer.parseInt(System.getProperty("ducc.rm.share.quantum").trim());
+	}
+    /* Enable CGROUPS */ 
+    String cgroups;
+    boolean excludeNodeFromCGroups=false;
+    if ( ( cgroups = System.getProperty("ducc.agent.launcher.cgroups.enable")) != null ) {
+    	if ( cgroups.equalsIgnoreCase("true")) {
+    		// Load exclusion file. Some nodes may be excluded from cgroups
+    		String exclusionFile; 
+    		
+    		// get the name of the exclusion file from ducc.properties
+    		if ( ( exclusionFile = System.getProperty("ducc.agent.exclusion.file")) != null ) {
+    			//	Parse node exclusion file and determine if cgroups and AP deployment
+    			//  is allowed on this node
+    			NodeExclusionParser exclusionParser = new NodeExclusionParser();
+    			exclusionParser.parse(exclusionFile);
+    			excludeNodeFromCGroups = exclusionParser.cgroupsExcluded();
+    			excludeAPs = exclusionParser.apExcluded();
+    			
+    			System.out.println("excludeNodeFromCGroups="+excludeNodeFromCGroups+" excludeAPs="+excludeAPs);
+    		 } else {
+    			 System.out.println("Running with No exclusion File");
+    		 }
+    		// node not in the exclusion list for cgroups
+    		if ( !excludeNodeFromCGroups ) {
+    			// get the top level cgroup folder from ducc.properties. If not defined, use /cgroup/ducc as default
+    			String cgroupsBaseDir = 
+        				System.getProperty("ducc.agent.launcher.cgroups.basedir");
+        		if ( cgroupsBaseDir == null ) {
+        			cgroupsBaseDir = "/cgroup/ducc";
+        		}
+        		// get the cgroup subsystems. If not defined, default to the memory subsystem
+        		String cgroupsSubsystems = 
+        				System.getProperty("ducc.agent.launcher.cgroups.subsystems");
+        		if ( cgroupsSubsystems == null ) {
+        			cgroupsSubsystems = "memory";
+        		}
+        		cgroupsManager = new CGroupsManager(cgroupsBaseDir, cgroupsSubsystems, logger);
+        		// check if cgroups base directory exists in the filesystem which means that cgroups 
+        		// and cgroups convenience package are installed and the daemon is up and running.
+        		if ( cgroupsManager.cgroupExists(cgroupsBaseDir) ) {
+            		useCgroups = true;
+            		logger.info("nodeAgent", null,
+                            "------- Agent Running with CGroups Enabled");
+            		
+        		} else {
+        			logger.info("nodeAgent", null,
+                            "------- CGroups Not Installed on this Machine");
+        		}
+    		}
+    		
+    	}
+    } else {
+    	logger.info("nodeAgent", null,
+                "------- CGroups Not Enabled on this Machine");
+    }
+    logger.info("nodeAgent", null,"CGroup Support="+useCgroups+" excludeNodeFromCGroups="+excludeNodeFromCGroups+" excludeAPs="+excludeAPs);
+
     String useSpawn = System.getProperty("ducc.agent.launcher.use.ducc_spawn");
     if (useSpawn != null && useSpawn.toLowerCase().equals("true")) {
       runWithDuccLing = true;
@@ -184,6 +259,25 @@ public class NodeAgent extends AbstractD
 
   }
  
+  public void setNodeInfo( Node node ) {
+	  this.node = node;
+  }
+  
+  public Node getNodeInfo() {
+	  return node;
+  }
+  
+  public int getNodeTotalNumberOfShares() {
+	  int shareQuantum = 0;
+	  int shares = 1;
+	  if ( System.getProperty("ducc.rm.share.quantum") != null && System.getProperty("ducc.rm.share.quantum").trim().length() > 0 ) {
+		    shareQuantum = Integer.parseInt(System.getProperty("ducc.rm.share.quantum").trim());
+	     shares = (int)getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal()/shareQuantum;  // get number of shares
+		 if ( (getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal() % shareQuantum) > 0 ) shares++; // ciel
+	  }
+	  
+	  return shares;
+	}
   public void start(DuccService service) throws Exception {
 		super.start(service, null);
 		String name = nodeIdentity.getName();
@@ -326,7 +420,7 @@ public class NodeAgent extends AbstractD
    * @param workDuccId - job id
    */
   public void reconcileProcessStateAndTakeAction(ProcessLifecycleController lifecycleController,
-          IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info, long processMemoryAssignment, DuccId workDuccId) {
+          IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info, ProcessMemoryAssignment processMemoryAssignment, DuccId workDuccId) {
     String methodName = "reconcileProcessStateAndTakeAction";
     try {
       inventorySemaphore.acquire();
@@ -395,7 +489,7 @@ public class NodeAgent extends AbstractD
                 + process.getDuccId() + " is already in agent's inventory.");
         return;
       }
-      startProcess(process, commandLine, info, workDuccId,0);
+      startProcess(process, commandLine, info, workDuccId, new ProcessMemoryAssignment());
     } catch (InterruptedException e) {
       logger.error(methodName, null, e);
     } finally {
@@ -451,7 +545,7 @@ public class NodeAgent extends AbstractD
    * 
    */
   public void startProcess(IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info,
-          DuccId workDuccId, long processMemoryAssignment) {
+          DuccId workDuccId, ProcessMemoryAssignment processMemoryAssignment) {
     String methodName = "startProcess";
 
     try {
@@ -772,7 +866,7 @@ public class NodeAgent extends AbstractD
    *          - fully defined command line that will be used to exec the process.
    */
   private void deployProcess(IDuccProcess process, ICommandLine commandLine,
-          IDuccStandardInfo info, DuccId workDuccId, long processMemoryAssignment) {
+          IDuccStandardInfo info, DuccId workDuccId, ProcessMemoryAssignment processMemoryAssignment) {
     String methodName = "deployProcess";
     synchronized (monitor) {
       boolean deployProcess = true;
@@ -878,7 +972,7 @@ public class NodeAgent extends AbstractD
                 + " Not in Agent's inventory. Adding to the inventory with state=Stopped");
         process.setProcessState(ProcessState.Stopped);
         inventory.put(process.getDuccId(), process);
-        deployedProcesses.add(new ManagedProcess(process, null, this, logger,0));
+        deployedProcesses.add(new ManagedProcess(process, null, this, logger, new ProcessMemoryAssignment()));
       }
     }
   }
@@ -1356,4 +1450,49 @@ public class NodeAgent extends AbstractD
       e.printStackTrace();
     }
   }
+
+	private class NodeExclusionParser {
+		private boolean excludeNodeFromCGroups = false;
+		private boolean excludeAP = false;
+
+		public void parse(String exclFile) throws Exception {
+		// <node>=cgroup,ap
+			File exclusionFile = new File(exclFile);
+			if ( !exclusionFile.exists() ) {
+				return;
+			}
+			BufferedReader br = new BufferedReader(new FileReader(exclusionFile));
+			String line;
+			NodeIdentity node = getIdentity();
+			String nodeName = node.getName();
+			if ( nodeName.indexOf(".") > -1 ) {
+				nodeName = nodeName.substring(0, nodeName.indexOf("."));
+			}
+			
+			while ((line = br.readLine()) != null) {
+			   if ( line.startsWith(nodeName ) ) {
+				   String exclusions = line.substring( line.indexOf("=")+1);
+				   String[] parsedExclusions = exclusions.split(",");
+				   for( String exclusion : parsedExclusions ) {
+					  
+					   if ( exclusion.trim().equals("cgroup")  ) {
+						   excludeNodeFromCGroups = true;
+						  
+					   } else if  ( exclusion.trim().equals("ap")) {
+						   excludeAP = true;
+						   
+					   }
+				   }
+				   break;
+			   }
+			}
+			br.close();
+	  }
+		public boolean apExcluded() {
+			return excludeAP;
+		}
+		public boolean cgroupsExcluded() {
+			return excludeNodeFromCGroups;
+		}
+	}
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessLifecycleController.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessLifecycleController.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessLifecycleController.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessLifecycleController.java Fri Apr 12 16:46:54 2013
@@ -22,9 +22,10 @@ import org.apache.uima.ducc.common.utils
 import org.apache.uima.ducc.transport.cmdline.ICommandLine;
 import org.apache.uima.ducc.transport.event.common.IDuccProcess;
 import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
+import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment;
 
 
 public interface ProcessLifecycleController {
-	public void startProcess(IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info, DuccId workDuccId, long shareMemorySize);
+	public void startProcess(IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info, DuccId workDuccId, ProcessMemoryAssignment pma);
 	public void stopProcess( IDuccProcess process );
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java Fri Apr 12 16:46:54 2013
@@ -48,6 +48,9 @@ import org.springframework.beans.factory
 public class AgentEventListener implements DuccEventDelegateListener {
 	DuccLogger logger = DuccLogger.getLogger(this.getClass(), Agent.COMPONENT_NAME);
 	ProcessLifecycleController lifecycleController = null;
+	// On startup of the Agent we may need to do cleanup of cgroups.
+	// This cleanup will happen once right after processing of the first OR publication.
+	private boolean cleanupPhase = true;  
 	
 	private NodeAgent agent;
 	public AgentEventListener(NodeAgent agent, ProcessLifecycleController lifecycleController) {
@@ -98,39 +101,46 @@ public class AgentEventListener implemen
 //	  }
 	  
 		try {
-		  //  print JP report targeted for this node
-		  reportIncomingStateForThisNode(duccEvent);
-		  
-		  List<DuccUserReservation> reservations = 
-		           duccEvent.getUserReservations();
-		   agent.setReservations(reservations);
-			//	Stop any process that is in this Agent's inventory but not associated with any
-			//  of the jobs we just received
-			agent.takeDownProcessWithNoJob(agent,duccEvent.getJobList());
-			//	iterate over all jobs and reconcile those processes that are assigned to this agent. First,
-			//  look at the job's JD process and than JPs.
-			for( IDuccJobDeployment jobDeployment : duccEvent.getJobList()) {
-				//	check if this node is a target for this job's JD 
-				if ( isTargetNodeForProcess(jobDeployment.getJdProcess()) ) {
-					// agent will check the state of JD process and either start, stop, or take no action
-					ICommandLine jdCommandLine = jobDeployment.getJdCmdLine();
-					if(jdCommandLine != null) {
-						agent.reconcileProcessStateAndTakeAction(lifecycleController, jobDeployment.getJdProcess(), jobDeployment.getJdCmdLine(), 
-							jobDeployment.getStandardInfo(), jobDeployment.getProcessMemoryAssignment(), jobDeployment.getJobId());
-					}
-					else {
-						logger.error("onDuccJobsStateEvent", null, "job is service");
-					}
-				} 
-				// check JPs
-				for( IDuccProcess process : jobDeployment.getJpProcessList() ) {
-					if ( isTargetNodeForProcess(process) ) {
-	          // agent will check the state of JP process and either start, stop, or take no action 
-						agent.reconcileProcessStateAndTakeAction(lifecycleController, process, jobDeployment.getJpCmdLine(), 
+
+		  synchronized( this ) {
+			  //  print JP report targeted for this node
+			  reportIncomingStateForThisNode(duccEvent);
+
+			  List<DuccUserReservation> reservations = 
+			           duccEvent.getUserReservations();
+			  if ( cleanupPhase ) {   // true on Agent startup
+				  // cleanup reservation cgroups
+			  }
+			   agent.setReservations(reservations);
+				//	Stop any process that is in this Agent's inventory but not associated with any
+				//  of the jobs we just received
+				agent.takeDownProcessWithNoJob(agent,duccEvent.getJobList());
+				//	iterate over all jobs and reconcile those processes that are assigned to this agent. First,
+				//  look at the job's JD process and than JPs.
+				for( IDuccJobDeployment jobDeployment : duccEvent.getJobList()) {
+					//	check if this node is a target for this job's JD 
+					if ( isTargetNodeForProcess(jobDeployment.getJdProcess()) ) {
+						// agent will check the state of JD process and either start, stop, or take no action
+						ICommandLine jdCommandLine = jobDeployment.getJdCmdLine();
+						if(jdCommandLine != null) {
+							agent.reconcileProcessStateAndTakeAction(lifecycleController, jobDeployment.getJdProcess(), jobDeployment.getJdCmdLine(), 
 								jobDeployment.getStandardInfo(), jobDeployment.getProcessMemoryAssignment(), jobDeployment.getJobId());
+						}
+						else {
+							logger.error("onDuccJobsStateEvent", null, "job is service");
+						}
+					} 
+					// check JPs
+					for( IDuccProcess process : jobDeployment.getJpProcessList() ) {
+						if ( isTargetNodeForProcess(process) ) {
+		          // agent will check the state of JP process and either start, stop, or take no action 
+							agent.reconcileProcessStateAndTakeAction(lifecycleController, process, jobDeployment.getJpCmdLine(), 
+									jobDeployment.getStandardInfo(), jobDeployment.getProcessMemoryAssignment(), jobDeployment.getJobId());
+						}
 					}
 				}
-			}
+			  
+		  }
 		} catch( Exception e ) {
 			logger.error("onDuccJobsStateEvent", null, e);
 		}

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java Fri Apr 12 16:46:54 2013
@@ -0,0 +1,220 @@
+package org.apache.uima.ducc.agent.launcher;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+
+/**
+ * Manages cgroup container on a node
+ * 
+ * Supported operations:
+ *   - cgcreate - creates cgroup container
+ *   - cgset - sets max memory limit for an existing container
+ *   
+ *
+ */
+public class CGroupsManager {
+	private DuccLogger agentLogger = null;
+	
+	private Set<String> containerIds = new LinkedHashSet<String>();
+	private String cgroupBaseDir = "";
+	private String cgroupSubsystems = "";  // comma separated list of subsystems eg. memory,cpu
+	/**
+	 * @param args
+	 */
+	public static void main(String[] args) {
+		try {
+			
+			CGroupsManager cgMgr = new CGroupsManager("/cgroup/ducc", "memory", null);
+			System.out.println("Cgroups Installed:"+cgMgr.cgroupExists("/cgroup/ducc"));
+			Set<String> containers = cgMgr.collectExistingContainers();
+			for ( String containerId : containers ) {
+				System.out.println("Existing CGroup Container ID:"+containerId);
+			}
+			cgMgr.createContainer(args[0], args[2], true);
+			cgMgr.setContainerMaxMemoryLimit(args[0], args[2], true, Long.parseLong(args[1]));
+		    synchronized( cgMgr ) {
+		    	cgMgr.wait(60000);
+		    }
+		    cgMgr.destroyContainer(args[0]);
+		    
+		} catch( Exception e) {
+			e.printStackTrace();
+		}
+	}
+	
+	public CGroupsManager(String cgroupBaseDir, String cgroupSubsystems, DuccLogger agentLogger ) {
+		this.cgroupBaseDir = cgroupBaseDir;
+		this.cgroupSubsystems = cgroupSubsystems;
+		this.agentLogger = agentLogger;
+	}
+    /**
+     * Creates cgroup container with a given id and owner.
+     * 
+     * @param containerId - new cgroup container id
+     * @param userId - owner of the cgroup container
+     * @param useDuccSpawn - use duccling to run 'cgcreate' command
+     * 
+     * @return - true on success, false otherwise
+     * 
+     * @throws Exception
+     */
+	public boolean createContainer(String containerId, String userId, boolean useDuccSpawn ) throws Exception {
+		
+		try {
+			String [] command = new String[] {"/usr/bin/cgcreate","-g", cgroupSubsystems+":ducc/"+containerId};
+			int retCode = launchCommand(command, useDuccSpawn, userId, containerId);
+			if ( retCode == 0 ) {
+				containerIds.add(containerId);
+				return true;
+			} else {
+				return false;
+			}
+		} catch ( Exception e ) {
+			return false;
+		}
+	}
+	/**
+	 * Sets the max memory use for an existing cgroup container. 
+	 * 
+	 * @param containerId - existing container id for which limit will be set
+	 * @param userId - container owner
+	 * @param useDuccSpawn - run 'cgset' command as a user
+	 * @param containerMaxSize - max memory limit 
+	 * 
+	 * @return - true on success, false otherwise
+	 * 
+	 * @throws Exception
+	 */
+	public boolean setContainerMaxMemoryLimit( String containerId, String userId, boolean useDuccSpawn, long containerMaxSize) throws Exception {
+		try {
+			String [] command = new String[] {"/usr/bin/cgset","-r", "memory.limit_in_bytes="+containerMaxSize, "ducc/"+containerId};
+			int retCode = launchCommand(command, useDuccSpawn, userId, containerId);
+			return retCode == 0 ? true : false;
+		} catch ( Exception e ) {
+			return false;
+		}
+	}
+		
+	/**
+	 * Removes cgroup container with a given id. Cgroups are implemented as
+	 * a virtual file system. All is needed here is just rmdir. 
+	 * 
+	 * @param containerId - cgroup to remove
+	 * @return - true on success, false otherwise
+	 * 
+	 * @throws Exception
+	 */
+	public boolean destroyContainer(String containerId) throws Exception {
+		try {
+			if ( cgroupExists(cgroupBaseDir+"/"+containerId)) {
+				String [] command = new String[] {"/bin/rmdir", cgroupBaseDir+"/"+containerId};
+				int retCode = launchCommand(command, false, "ducc", containerId);
+				if ( retCode == 0 ) {
+					containerIds.remove(containerId);
+					return true;
+				} else {
+					return false;
+				}
+			}
+			return true; // nothing to do, cgroup does not exist
+		} catch ( Exception e ) {
+			return false;
+		}
+	}
+	
+	private int launchCommand(String[] command, boolean useDuccSpawn, String userId, String containerId) throws Exception {
+		String[] commandLine = null;
+		try {
+			//							
+			//	Use ducc_ling (c code) as a launcher for the actual process. The ducc_ling
+			//  allows the process to run as a specified user in order to write out logs in
+			//  user's space as oppose to ducc space.
+			String c_launcher_path = 
+					Utils.resolvePlaceholderIfExists(
+							System.getProperty("ducc.agent.launcher.ducc_spawn_path"),System.getProperties());
+
+			
+
+			if ( useDuccSpawn && c_launcher_path != null ) {
+				commandLine = new String[4+command.length];
+				commandLine[0] = c_launcher_path;
+				commandLine[1] = "-u";
+				commandLine[2] = userId;
+				commandLine[3] = "--";
+				
+				int j=0;
+				for(int i=4; i < commandLine.length;i++) {
+					commandLine[i] = command[j++];
+				}
+			} else {
+				commandLine = command;
+			}
+			ProcessBuilder processLauncher = new ProcessBuilder();
+			processLauncher.command(commandLine);
+			processLauncher.redirectErrorStream();
+			
+			java.lang.Process process = processLauncher.start();
+			
+			InputStreamReader in = new InputStreamReader(process.getInputStream());
+			BufferedReader reader = new BufferedReader(in);
+			String line;
+			while ((line = reader.readLine()) != null) {
+				System.out.println(">>>>"+line);
+			}
+			int retCode = process.waitFor();
+			return retCode;
+			
+		} catch( Exception e) {
+			StringBuffer sb = new StringBuffer();
+			if ( commandLine != null ) {
+               for ( String cmdPart : commandLine ) {
+		          sb.append(cmdPart).append(" ");	  
+		       }
+			}
+           if ( agentLogger != null ) {
+        	   agentLogger.error("launchCommand", null, "Unable to Launch Command:"+sb.toString(),e);
+           } else {
+        	   System.out.println("CGroupsManager.launchCommand()- Unable to Launch Command:"+sb.toString());
+   			   e.printStackTrace();
+           }
+
+		} 
+		return -1;  // failure
+	}
+	/**
+	 * Return a Set of existing cgroup Ids found in the filesystem identified
+	 * by 'cgroupBaseDir'.
+	 * 
+	 * @return - set of cgroup ids
+	 * 
+	 * @throws Exception
+	 */
+	public Set<String> collectExistingContainers() throws Exception {
+		File duccCGroupBaseDir = new File(cgroupBaseDir);
+		if ( duccCGroupBaseDir.exists()) {
+			File[] existingCGroups = duccCGroupBaseDir.listFiles();
+			for (File cgroup : existingCGroups ) {
+				if ( cgroup.isDirectory() ) {
+					containerIds.add(cgroup.getName());
+				}
+			}
+		} 
+		return containerIds;
+	}
+	public String getDuccCGroupBaseDir() {
+		return cgroupBaseDir;
+	}
+	public String getSubsystems() {
+		return cgroupSubsystems;
+	}
+	public boolean cgroupExists(String cgroup) throws Exception {
+		File duccCGroupBaseDir = new File(cgroup);
+		return duccCGroupBaseDir.exists();
+	}
+}

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java Fri Apr 12 16:46:54 2013
@@ -40,10 +40,10 @@ import org.apache.uima.ducc.transport.cm
 import org.apache.uima.ducc.transport.cmdline.JavaCommandLine;
 import org.apache.uima.ducc.transport.event.ProcessStopDuccEvent;
 import org.apache.uima.ducc.transport.event.common.IDuccProcess;
-import org.apache.uima.ducc.transport.event.common.ITimeWindow;
-import org.apache.uima.ducc.transport.event.common.TimeWindow;
 import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+import org.apache.uima.ducc.transport.event.common.ITimeWindow;
+import org.apache.uima.ducc.transport.event.common.TimeWindow;
 
 
 public class DuccCommandExecutor extends CommandExecutor {
@@ -73,24 +73,182 @@ public class DuccCommandExecutor extends
 		// default 
 		return false;
 	}
-
+	/**
+	 * Test if a given DuccProcess owns a cgroup on this node. Return false if the process is JD since JDs dont own 
+	 * a cgroup in which they run. 
+	 * 
+	 * An owner process DuccId matches cgroup name. 
+	 *  
+	 * @param duccProcess
+	 * @return
+	 * @throws Exception
+	 */
+    private boolean cgroupOwner(IDuccProcess duccProcess ) throws Exception {
+    	if (  duccProcess.getProcessType().equals(ProcessType.Pop) ) {
+    		return false;   // JD
+    	} else if ( !agent.cgroupsManager.cgroupExists(agent.cgroupsManager.getDuccCGroupBaseDir()+"/"+duccProcess.getCGroup().getId() ) ) {
+    		return true;
+    	}
+    	return false;
+    }
+    private boolean createCGroupContainer(IDuccProcess duccProcess, String containerId, String owner ) throws Exception {
+  	//	create cgroups container and assign limits
+    	if ( agent.cgroupsManager.createContainer( containerId, owner, useDuccSpawn()) ) {
+    		return agent.cgroupsManager.setContainerMaxMemoryLimit(containerId,
+					owner, useDuccSpawn(), duccProcess.getCGroup().getMaxMemoryLimit());
+
+//			 if ( isJD(duccProcess) ) {
+//		    		agent.cgroupsManager.setContainerMaxMemoryLimit(containerId,
+//							owner, useDuccSpawn(), duccProcess.getCGroup().getShares()* agent.shareQuantum); //duccProcess.getCGroup().getMaxMemoryLimit());
+//			 } else {
+//		    		agent.cgroupsManager.setContainerMaxMemoryLimit(containerId,
+//							owner, useDuccSpawn(),duccProcess.getCGroup().getShares()* agent.shareQuantum); //((ManagedProcess)super.managedProcess).getProcessMemoryAssignment().getNormalizedMemoryInMBs());
+//			 }
+//			 float percentOfTotal =  ((ManagedProcess)super.managedProcess).getProcessMemoryAssignment().getShares()/agent.getNodeTotalNumberOfShares();
+//		     long maxProcessSwapUsage = (long) (agent.getNodeInfo().getNodeMetrics().getNodeMemory().getSwapTotal()*percentOfTotal);
+		     
+	     
+		     /** NEED TO START DAEMON THREAD TO CHECK SWAP USAGE OF THIS PROCESS */
+		     //return true;
+		} 
+		return false;
+    }
+    private boolean isJD(IDuccProcess duccProcess ) {
+    	return duccProcess.getProcessType().equals(ProcessType.Pop);
+    }
+    private String getContainerId() {
+    	String containerId;
+		if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Service)) {
+			containerId = String.valueOf(((ManagedProcess) managedProcess).getDuccProcess().getCGroup().getId());
+		} else {
+			containerId = ((ManagedProcess) managedProcess).getWorkDuccId().getFriendly()+"."+((ManagedProcess) managedProcess).getDuccProcess().getCGroup().getId();
+		}
+		return containerId;
+    }
 	public Process exec(ICommandLine cmdLine, Map<String, String> processEnv)
 			throws Exception {
 		String methodName = "exec";
 		try {
 			String[] cmd = getDeployableCommandLine(cmdLine,processEnv);			
 			if ( isKillCommand(cmdLine) ) {
-        logger.info(methodName, null, "Killing process");
+				logger.info(methodName, null, "Killing process");
 				stopProcess(cmdLine, cmd);
 			} else {
-				startProcess(cmdLine, cmd, processEnv);
+				IDuccProcess duccProcess = ((ManagedProcess) managedProcess).getDuccProcess();
+  			    
+				
+				// Calculate how much swap space the process is allowed to use. The calculation is based on
+				// the percentage of real memory the process is assigned. The process is entitled the
+				// same percentage of the swap.
+				// Normalize node's total memory as it is expressed in KB. The calculation below is based on bytes.
+				double percentOfTotal =  ((double)duccProcess.getCGroup().getMaxMemoryLimit())/
+						(agent.getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal()*1024); // need bytes
+
+				
+				//  substract 1Gig from total swap on this node to accommodate OS needs for swapping. The 
+				//  getSwapTotal() returns swap space in KBs so normalize 1Gig
+				long adjustedTotalSwapAvailable =
+						agent.getNodeInfo().getNodeMetrics().getNodeMemory().getSwapTotal() - 1048576;
+				// calculate the portion (in bytes) of swap this process is entitled to
+				long maxProcessSwapUsage = 
+						 (long) (adjustedTotalSwapAvailable*percentOfTotal)*1024;
+				 // assigned how much swap this process is entitled to. If it exceeds this number the Agent
+				 // will kill the process.
+				 ((ManagedProcess) managedProcess).setMaxSwapThreshold(maxProcessSwapUsage);
+  			    logger.info(methodName, null, "---Process DuccId:"+duccProcess.getDuccId()+
+	  			    		" CGroup.getMaxMemoryLimit():"+((duccProcess.getCGroup().getMaxMemoryLimit()/1024)/1024)+" MBs"+
+	  			    		" Node Memory Total:"+(agent.getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal()/1024)+" MBs"+
+	  			    		" Percentage Of Real Memory:"+percentOfTotal+
+	  			    		" Adjusted Total Swap Available On Node:"+adjustedTotalSwapAvailable/1024+" MBs"+
+	  			    		" Process Entitled To Max:"+(maxProcessSwapUsage/1024)/1024+" MBs of Swap"
+	  			    		);
+
+  			    //logger.info(methodName, null, "The Process With ID:"+duccProcess.getDuccId()+" is Entitled to the Max "+( (maxProcessSwapUsage/1024)/1024)+" Megs of Swap Space");
+				     
+				// if configured to use cgroups and the process is the cgroup owner, create a cgroup
+				// using Process DuccId as a name. Additional processes may be injected into the
+				// cgroup by declaring cgroup owner id.
+				if ( agent.useCgroups ) {
+					  
+					
+					// cgroup container id
+					//long containerId;
+					//	JDs are of type Pop (Plain Old Process). JDs run in a reservation. The cgroup container
+					//  is created for the reservation and we co-locate as many JDs as we can fit in it.
+					//String containerId = ((ManagedProcess) managedProcess).getWorkDuccId()+"."+duccProcess.getCGroup().getId().getFriendly();
+					String containerId = getContainerId();
+					logger.info(methodName, null, "Creating CGroup with ID:"+containerId);					
+					if ( !agent.cgroupsManager.cgroupExists(agent.cgroupsManager.getDuccCGroupBaseDir()+"/"+containerId) ) {
+						
+						// create cgroup container for JDs
+						try {
+							if ( createCGroupContainer(duccProcess, containerId, ((ManagedProcess)super.managedProcess).getOwner()) ) {
+								logger.info(methodName, null, "Created CGroup with ID:"+containerId+" With Memory Limit="+((ManagedProcess)super.managedProcess).getDuccProcess().getCGroup().getMaxMemoryLimit()+" Bytes");
+							} else {
+								logger.info(methodName, null, "Failed To Create CGroup with ID:"+containerId);
+							}
+						} catch( Exception e) {
+							logger.error(methodName, null, e);
+							
+						}
+					} else {
+						logger.info(methodName, null, "CGroup Exists with ID:"+containerId);					
+
+					}
+/*					
+					if (  isJD(duccProcess) ) {
+						//	For JDs the container is the reservation id
+						containerId = duccProcess.getCGroup().getId().getFriendly();
+						//	check if we need to create a cgroup for JDs. First JD deployment will force creation 
+						//  of cgroup container
+						if ( !agent.cgroupsManager.cgroupExists(agent.cgroupsManager.getDuccCGroupBaseDir()+"/"+duccProcess.getCGroup().getId().getFriendly()) ) {
+							// create cgroup container for JDs
+							try {
+								if ( createCGroupContainer(duccProcess, containerId, "ducc") ) {
+									logger.info(methodName, null, "Created CGroup with ID:"+containerId);
+								} else {
+									logger.info(methodName, null, "Failed To Create CGroup with ID:"+containerId);
+								}
+							} catch( Exception e) {
+								logger.error(methodName, null, e);
+								
+							}
+						}
+ 					} else 	if ( cgroupOwner(duccProcess)) {  
+ 						
+ 						containerId = duccProcess.getCGroup().getId().getFriendly();
+ 						//  create cgroup container for JP/AP
+ 						createCGroupContainer(duccProcess, containerId, ((ManagedProcess)super.managedProcess).getOwner());
+						     
+//						     // NEED TO START DAEMON THREAD TO CHECK SWAP USAGE OF THIS PROCESS 
+//						}
+
+						
+					} else {
+						containerId = duccProcess.getCGroup().getId().getFriendly();
+					}
+	*/
+				String[] cgroupCmd = new String[cmd.length+3];
+					cgroupCmd[0] = "/usr/bin/cgexec";
+					cgroupCmd[1] = "-g";
+					cgroupCmd[2] = agent.cgroupsManager.getSubsystems()+":ducc/"+containerId;
+					int inx = 3;
+					for ( String cmdPart : cmd ) {
+						cgroupCmd[inx++] = cmdPart;
+					}
+					startProcess(cmdLine, cgroupCmd, processEnv);
+				} else {
+					// dont use CGroups 
+					startProcess(cmdLine, cmd, processEnv);
+				}
+
 			}
 			return managedProcess;
 		} catch (Exception e) {
-		  if ( ((ManagedProcess)super.managedProcess).getDuccProcess() != null ) {
-  	    DuccId duccId = ((ManagedProcess)super.managedProcess).getDuccId();
-	      logger.error(methodName, duccId, ((ManagedProcess)super.managedProcess).getDuccProcess().getDuccId(), e, new Object[]{});
-		  }
+			if ( ((ManagedProcess)super.managedProcess).getDuccProcess() != null ) {
+				DuccId duccId = ((ManagedProcess)super.managedProcess).getDuccId();
+				logger.error(methodName, duccId, ((ManagedProcess)super.managedProcess).getDuccProcess().getDuccId(), e, new Object[]{});
+			}
 			throw e;
 		} 
 	}
@@ -160,6 +318,7 @@ public class DuccCommandExecutor extends
 	          } else {
 	            logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Dispatched STOP Request to Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" Process State: "+((ManagedProcess) managedProcess).getDuccProcess().getProcessState()+" .Process Not In Running State");
 	          }
+
 	        } catch (TimeoutException tex) { // on time out kill the process
 	          if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Running)) {
 	            logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Timed-out Waiting for Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" to Stop. Process State:"+((ManagedProcess) managedProcess).getDuccProcess().getProcessState()+" .Process did not stop in alloted time of "+maxTimeToWaitForProcessToStop+" millis");
@@ -178,47 +337,47 @@ public class DuccCommandExecutor extends
 	
 	private void startProcess(ICommandLine cmdLine,String[] cmd, Map<String, String> processEnv) throws Exception {
 		String methodName = "startProcess";
-		
+
 		ITimeWindow twr = new TimeWindow();
-			String millis;
-			millis = TimeStamp.getCurrentMillis();
-			
-			((ManagedProcess) managedProcess).getDuccProcess().setTimeWindowRun(twr);
-			twr.setStart(millis);
-			ProcessBuilder pb = new ProcessBuilder(cmd);
-
-			if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Pop) ||
- 	         ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Service)   ) {
-				ITimeWindow twi = new TimeWindow();
-				((ManagedProcess) managedProcess).getDuccProcess().setTimeWindowInit(twi);
-				twi.setStart(millis);
-				twi.setEnd(millis);
-			}
-			Map<String, String> env = pb.environment();
-			// enrich Process environment
-			env.putAll(processEnv);
-			if( cmdLine instanceof ACommandLine ) {
-				// enrich Process environment with one from a given command line
-				env.putAll(((ACommandLine)cmdLine).getEnvironment());
-			}
-			if ( logger.isTrace()) {
-	      // <dump>
-	      Iterator<String> iterator = env.keySet().iterator();
-	      while(iterator.hasNext()) {
-	        String key = iterator.next();
-	        String value = env.get(key);
-	        String message = "key:"+key+" "+"value:"+value;
-	        logger.trace(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), message);
-	      }
-			}
-			try {
-				doExec(pb, cmd, isKillCommand(cmdLine));
-			} catch(Exception e) {
-				throw e;
-			} finally {
-				millis = TimeStamp.getCurrentMillis();
-				twr.setEnd(millis);
+		String millis;
+		millis = TimeStamp.getCurrentMillis();
+        
+		((ManagedProcess) managedProcess).getDuccProcess().setTimeWindowRun(twr);
+		twr.setStart(millis);
+		ProcessBuilder pb = new ProcessBuilder(cmd);
+
+		if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Pop) ||
+				((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Service)   ) {
+			ITimeWindow twi = new TimeWindow();
+			((ManagedProcess) managedProcess).getDuccProcess().setTimeWindowInit(twi);
+			twi.setStart(millis);
+			twi.setEnd(millis);
+		}
+		Map<String, String> env = pb.environment();
+		// enrich Process environment
+		env.putAll(processEnv);
+		if( cmdLine instanceof ACommandLine ) {
+			// enrich Process environment with one from a given command line
+			env.putAll(((ACommandLine)cmdLine).getEnvironment());
+		}
+		if ( logger.isTrace()) {
+			// <dump>
+			Iterator<String> iterator = env.keySet().iterator();
+			while(iterator.hasNext()) {
+				String key = iterator.next();
+				String value = env.get(key);
+				String message = "key:"+key+" "+"value:"+value;
+				logger.trace(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), message);
 			}
+		}
+		try {
+			doExec(pb, cmd, isKillCommand(cmdLine));
+		} catch(Exception e) {
+			throw e;
+		} finally {
+			millis = TimeStamp.getCurrentMillis();
+			twr.setEnd(millis);
+		}
 	}
 	private void doExec(ProcessBuilder pb, String[] cmd, boolean isKillCmd) throws Exception {
 		String methodName = "doExec";
@@ -240,7 +399,14 @@ public class DuccCommandExecutor extends
 			exitCode = process.waitFor();
 			if ( !isKillCommand(cmdLine) ) {
 				logger.info(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), ">>>>>>>>>>>>> Process with PID:"+((ManagedProcess)super.managedProcess).getDuccProcess().getPID()+" Terminated");
+				//	 Process is dead, determine if the cgroup container should be destroyed as well.
+				if ( agent.useCgroups ) { 
+					String containerId = getContainerId();
+					agent.cgroupsManager.destroyContainer(containerId);
+					logger.info(methodName, null, "Removed CGroup Container with ID:"+containerId);
+				}
 			}
+			
 		} catch( NullPointerException ex) {
 			((ManagedProcess)super.managedProcess).getDuccProcess().setProcessState(ProcessState.Failed);
 			StringBuffer sb = new StringBuffer();

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java Fri Apr 12 16:46:54 2013
@@ -38,6 +38,7 @@ import org.apache.uima.ducc.transport.ev
 import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
 import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment;
 
 
 
@@ -137,18 +138,24 @@ public class ManagedProcess implements P
 	
 	private transient DuccLogger logger;
 	
-	private long processMemoryAssignment;
+	//private long processMemoryAssignment;
+	
+	private long maxSwapThreshold;
+	
+	
+
+	private ProcessMemoryAssignment processMemoryAssignment;
 	
 	public ManagedProcess(IDuccProcess process, ICommandLine commandLine) {
-		this(process, commandLine, null, null,0);
+		this(process, commandLine, null, null, new ProcessMemoryAssignment());
 	}
 
 	public ManagedProcess(IDuccProcess process, ICommandLine commandLine,boolean agentProcess) {
-		this(process, commandLine, null, null,0);
+		this(process, commandLine, null, null,new ProcessMemoryAssignment());
 		this.agentProcess = agentProcess;
 	}
 
-	public ManagedProcess(IDuccProcess process, ICommandLine commandLine,ProcessLifecycleObserver observer, DuccLogger logger, long processMemoryAssignment) {
+	public ManagedProcess(IDuccProcess process, ICommandLine commandLine,ProcessLifecycleObserver observer, DuccLogger logger, ProcessMemoryAssignment processMemoryAssignment) {
 		this.commandLine = commandLine;
 		this.duccProcess = process;
 		this.observer = observer;
@@ -603,7 +610,7 @@ public class ManagedProcess implements P
 		this.future = future;
 	}
 
-  public long getProcessMemoryAssignment() {
+  public ProcessMemoryAssignment getProcessMemoryAssignment() {
     return processMemoryAssignment;
   }
 
@@ -614,4 +621,12 @@ public class ManagedProcess implements P
   public void setSocketEndpoint(String socketEndpoint) {
     this.socketEndpoint = socketEndpoint;
   }
+  
+  public long getMaxSwapThreshold() {
+		return maxSwapThreshold;
+	}
+
+  public void setMaxSwapThreshold(long maxSwapThreshold) {
+		this.maxSwapThreshold = maxSwapThreshold;
+	}
 }

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java Fri Apr 12 16:46:54 2013
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.io.RandomAccessFile;
+import java.util.concurrent.Callable;
+
+import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessMemoryPageLoadUsage;
+import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+
+public class ProcessMajorFaultCollector extends AbstractMetricCollector implements
+		Callable<ProcessMemoryPageLoadUsage> {
+
+	public ProcessMajorFaultCollector(DuccLogger logger, String pid,
+			RandomAccessFile fileHandle, int howMany, int offset) {
+		super(fileHandle, howMany, offset);
+	}
+
+	public ProcessMemoryPageLoadUsage call() throws Exception {
+		try {
+			super.parseMetricFile();
+			return new DuccProcessMemoryPageLoadUsage(super.metricFileContents,
+			     super.metricFieldOffsets, super.metricFieldLengths);
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw e;
+		}
+	}
+/*
+	private String execTopShell() throws Exception {
+		List<String> command = new ArrayList<String>();
+		command.add("top");
+		command.add("-b");
+		command.add("-n");
+		command.add("1");
+		command.add("-p");
+		command.add(pid);
+
+		ProcessBuilder builder = new ProcessBuilder(command);
+		Process process = builder.start();
+		InputStream is = process.getInputStream();
+		InputStreamReader isr = new InputStreamReader(is);
+		BufferedReader br = new BufferedReader(isr);
+		String line;
+		int count = 0;
+		String cpu = "";
+		try {
+			while ((line = br.readLine()) != null) {
+				if (count == 7) {
+					String[] values = line.trim().split("\\s+");
+					cpu = values[9];
+					process.destroy();
+					break;
+				}
+				count++;
+			}
+		} finally {
+			if (is != null) {
+				is.close();
+			}
+		}
+		process.waitFor();
+		return cpu;
+	}
+	*/
+}

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java Fri Apr 12 16:46:54 2013
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.io.RandomAccessFile;
+import java.util.concurrent.Callable;
+
+import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessMemoryPageLoadUsage;
+import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+
+public class ProcessSwapUsageCollector extends AbstractMetricCollector implements
+		Callable<ProcessMemoryPageLoadUsage> {
+
+	public ProcessSwapUsageCollector(DuccLogger logger, String pid,
+			RandomAccessFile fileHandle, int howMany, int offset) {
+		super(fileHandle, howMany, offset);
+	}
+
+	public ProcessMemoryPageLoadUsage call() throws Exception {
+		try {
+			super.parseMetricFile();
+			return new DuccProcessMemoryPageLoadUsage(super.metricFileContents,
+			     super.metricFieldOffsets, super.metricFieldLengths);
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw e;
+		}
+	}
+/*
+	private String execTopShell() throws Exception {
+		List<String> command = new ArrayList<String>();
+		command.add("top");
+		command.add("-b");
+		command.add("-n");
+		command.add("1");
+		command.add("-p");
+		command.add(pid);
+
+		ProcessBuilder builder = new ProcessBuilder(command);
+		Process process = builder.start();
+		InputStream is = process.getInputStream();
+		InputStreamReader isr = new InputStreamReader(is);
+		BufferedReader br = new BufferedReader(isr);
+		String line;
+		int count = 0;
+		String cpu = "";
+		try {
+			while ((line = br.readLine()) != null) {
+				if (count == 7) {
+					String[] values = line.trim().split("\\s+");
+					cpu = values[9];
+					process.destroy();
+					break;
+				}
+				count++;
+			}
+		} finally {
+			if (is != null) {
+				is.close();
+			}
+		}
+		process.waitFor();
+		return cpu;
+	}
+	*/
+}

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java Fri Apr 12 16:46:54 2013
@@ -118,6 +118,11 @@ public class LinuxNodeMetricsProcessor e
               cpuFuture.get(), nuiFuture.get());
       
 			Node node = new DuccNode(agent.getIdentity(), nodeMetrics);
+			// Make the agent aware how much memory is available on the node. Do this once.
+			if ( agent.getNodeInfo() == null ) {
+				agent.setNodeInfo(node);
+			}
+						
 			((DuccNode)node).duccLingExists(agent.duccLingExists());
 			((DuccNode)node).runWithDuccLing(agent.runWithDuccLing());
 			logger.info(methodName, null, "... Agent "+node.getNodeIdentity().getName()+" Posting Memory:"

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java Fri Apr 12 16:46:54 2013
@@ -29,11 +29,15 @@ import org.apache.uima.ducc.agent.NodeAg
 import org.apache.uima.ducc.agent.launcher.ManagedProcess;
 import org.apache.uima.ducc.agent.metrics.collectors.DuccGarbageStatsCollector;
 import org.apache.uima.ducc.agent.metrics.collectors.ProcessCpuUsageCollector;
+import org.apache.uima.ducc.agent.metrics.collectors.ProcessMajorFaultCollector;
 import org.apache.uima.ducc.agent.metrics.collectors.ProcessResidentMemoryCollector;
 import org.apache.uima.ducc.common.agent.metrics.cpu.ProcessCpuUsage;
 import org.apache.uima.ducc.common.agent.metrics.memory.ProcessResidentMemory;
+import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessSwapSpaceUsage;
+import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage;
 import org.apache.uima.ducc.common.node.metrics.ProcessGarbageCollectionStats;
 import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
 import org.apache.uima.ducc.transport.event.common.IDuccProcess;
 import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
@@ -52,7 +56,7 @@ implements ProcessMetricsProcessor {
 	private ManagedProcess managedProcess;
 	private NodeAgent agent;
   private int fudgeFactor = 5;  // default is 5%
-  
+  	private int logCounter=0;
 	public LinuxProcessMetricsProcessor(DuccLogger logger, IDuccProcess process, NodeAgent agent,String statmFilePath, String nodeStatFilePath, String processStatFilePath, ManagedProcess managedProcess) throws FileNotFoundException{
 		this.logger = logger;
 		statmFile = new RandomAccessFile(statmFilePath, "r");
@@ -99,32 +103,67 @@ implements ProcessMetricsProcessor {
 			
 			ProcessCpuUsageCollector processCpuUsageCollector =
 					new ProcessCpuUsageCollector(logger, process.getPID(), processStatFile,42,0);
+			
 			Future<ProcessCpuUsage> processCpuUsage = pool.submit(processCpuUsageCollector);
 			
+			ProcessMajorFaultCollector processMajorFaultUsageCollector =
+					new ProcessMajorFaultCollector(logger, process.getPID(), processStatFile,42,0);
+			
+			Future<ProcessMemoryPageLoadUsage> processMajorFaultUsage = pool.submit(processMajorFaultUsageCollector);
+			String DUCC_HOME = Utils.findDuccHome();
+			//	executes script DUCC_HOME/admin/ducc_get_process_swap_usage.sh which sums up swap used by a process
+			DuccProcessSwapSpaceUsage processSwapSpaceUsage = 
+					new DuccProcessSwapSpaceUsage(process.getPID(),DUCC_HOME+"/admin/ducc_get_process_swap_usage.sh", logger);
+			
 			logger.trace("process", null, "----------- PID:"+process.getPID()+" Cumulative CPU Time (jiffies):"+processCpuUsage.get().getTotalJiffies()); 
 			//	Publish cumulative CPU usage
 			process.setCpuTime(processCpuUsage.get().getTotalJiffies());
-			// if the fudgeFactor is negative, don't check if the process exceeded its 
-			// memory assignment.
+			long majorFaults = processMajorFaultUsage.get().getMajorFaults();
+			// collects process Major faults (swap in memory)
+			process.setMajorFaults(majorFaults);
+			//	Current Process Swap Usage in bytes
+			long processSwapUsage = processSwapSpaceUsage.getSwapUsage()*1024;
+			//	collects swap usage from /proc/<PID>/smaps file via a script DUCC_HOME/admin/collect_process_swap_usage.sh
+			process.setSwapUsage(processSwapUsage);
+			if ( (logCounter % 100 ) == 0 ) {
+			   logger.info("process", null, "----------- PID:"+process.getPID()+" Major Faults:"+majorFaults+" Process Swap Usage:"+processSwapUsage); 
+			}
+			logCounter++;
 			
-			if ( fudgeFactor > -1 && managedProcess.getProcessMemoryAssignment() > 0 ) {
-			  // RSS is in terms of pages(blocks) which size is system dependent. Default 4096 bytes
-        long rss = (prm.get().get()*(blockSize/1024))/1024;  // normalize RSS into MB
-        logger.trace("process", null, "*** Process with PID:"+managedProcess.getPid()+ " Assigned Memory (MB): "+ managedProcess.getProcessMemoryAssignment()+" MBs. Current RSS (MB):"+rss);
-        //  check if process resident memory exceeds its memory assignment calculate in the PM
-        if ( rss > managedProcess.getProcessMemoryAssignment() ) {
-          logger.error("process", null, "\n\n********************************************************\n\tProcess with PID:"+managedProcess.getPid()+ " Exceeded its max memory assignment (including a fudge factor) of "+ managedProcess.getProcessMemoryAssignment()+" MBs. This Process Resident Memory Size: "+rss+" MBs .Killing process ...\n********************************************************\n\n" );
-         try {
-           managedProcess.kill();  // mark it for death
-           process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededShareSize.toString());
-           agent.stopProcess(process); 
-         } catch( Exception ee) {
-           logger.error("process", null,ee);           
-         }
-         return;
-        }
-			} 
-	    //  Publish resident memory
+			if (processSwapUsage > 0 && processSwapUsage > managedProcess.getMaxSwapThreshold()) {
+				logger.error("process", null, "\n\n********************************************************\n\tProcess with PID:"+managedProcess.getPid()+ " Exceeded its max swap usage assignment  of "+ managedProcess.getMaxSwapThreshold()+" MBs. This Process Swap Usage is: "+processSwapUsage+" MBs .Killing process ...\n********************************************************\n\n" );
+				try {
+					managedProcess.kill();  // mark it for death
+					process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededSwapThreshold.toString());
+					agent.stopProcess(process); 
+				} catch( Exception ee) {
+					logger.error("process", null,ee);           
+				}
+				return;
+			} else {
+				// if the fudgeFactor is negative, don't check if the process exceeded its 
+				// memory assignment.
+
+				if ( fudgeFactor > -1 && managedProcess.getProcessMemoryAssignment().getMaxMemoryWithFudge() > 0 ) {
+					// RSS is in terms of pages(blocks) which size is system dependent. Default 4096 bytes
+					long rss = (prm.get().get()*(blockSize/1024))/1024;  // normalize RSS into MB
+					logger.trace("process", null, "*** Process with PID:"+managedProcess.getPid()+ " Assigned Memory (MB): "+ managedProcess.getProcessMemoryAssignment()+" MBs. Current RSS (MB):"+rss);
+					//  check if process resident memory exceeds its memory assignment calculate in the PM
+					if ( rss > managedProcess.getProcessMemoryAssignment().getMaxMemoryWithFudge() ) {
+						logger.error("process", null, "\n\n********************************************************\n\tProcess with PID:"+managedProcess.getPid()+ " Exceeded its max memory assignment (including a fudge factor) of "+ managedProcess.getProcessMemoryAssignment()+" MBs. This Process Resident Memory Size: "+rss+" MBs .Killing process ...\n********************************************************\n\n" );
+						try {
+							managedProcess.kill();  // mark it for death
+							process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededShareSize.toString());
+							agent.stopProcess(process); 
+						} catch( Exception ee) {
+							logger.error("process", null,ee);           
+						}
+						return;
+					}
+				} 
+
+			}
+			    //  Publish resident memory
 	    process.setResidentMemory((prm.get().get()*blockSize));
 	    ProcessGarbageCollectionStats gcStats = 
 	          gcStatsCollector.collect();

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java Fri Apr 12 16:46:54 2013
@@ -0,0 +1,17 @@
+package org.apache.uima.ducc.common.agent.metrics.swap;
+
+import org.apache.uima.ducc.common.node.metrics.ByteBufferParser;
+
+public class DuccProcessMemoryPageLoadUsage extends ByteBufferParser implements
+		ProcessMemoryPageLoadUsage {
+	private static final long serialVersionUID = 1L;
+	public static final int MAJORFAULTSFLD=12;
+	
+	public DuccProcessMemoryPageLoadUsage(byte[] memInfoBuffer,
+			int[] memInfoFieldOffsets, int[] memInfoFiledLengths) {
+		super(memInfoBuffer, memInfoFieldOffsets, memInfoFiledLengths);
+	}	
+	public long getMajorFaults() {
+		return super.getFieldAsLong(MAJORFAULTSFLD);
+	}
+}

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java Fri Apr 12 16:46:54 2013
@@ -0,0 +1,55 @@
+package org.apache.uima.ducc.common.agent.metrics.swap;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+
+public class DuccProcessSwapSpaceUsage implements ProcessSwapSpaceUsage {
+	String pid=null;
+	String execScript=null;
+	DuccLogger logger=null;
+	
+	public DuccProcessSwapSpaceUsage( String pid, String execScript, DuccLogger logger) {
+		this.pid = pid;
+		this.execScript = execScript;
+		this.logger = logger;
+	}
+	public long getSwapUsage() {
+		long swapusage=0;
+		if ( pid != null && execScript != null ) {
+			InputStreamReader in = null;
+			try {
+				ProcessBuilder pb = new ProcessBuilder();
+				String[] command = {execScript,pid};
+				pb.command(command);
+				pb.redirectErrorStream(true);
+				Process swapCollectorProcess = pb.start();
+				in = new InputStreamReader(swapCollectorProcess.getInputStream());
+				BufferedReader reader = new BufferedReader(in);
+				String line=null;
+				
+				while ((line = reader.readLine()) != null && line.trim().length() > 0 ) {
+					try {
+						swapusage = Long.parseLong(line.trim());
+					} catch( NumberFormatException e) {
+						logger.error("getSwapUsage", null, line);
+					}
+				}
+			} catch( Exception e) {
+				logger.error("getSwapUsage", null, e);
+			} finally {
+				if ( in != null ) {
+					try {
+						in.close();	
+					} catch( Exception e) {
+						logger.error("getSwapUsage", null, e);
+					}
+					
+				}
+			}
+		}
+		return swapusage;
+	}
+	
+}

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java Fri Apr 12 16:46:54 2013
@@ -0,0 +1,5 @@
+package org.apache.uima.ducc.common.agent.metrics.swap;
+
+public interface ProcessMemoryPageLoadUsage {
+	public long getMajorFaults();
+}

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessSwapSpaceUsage.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessSwapSpaceUsage.java?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessSwapSpaceUsage.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessSwapSpaceUsage.java Fri Apr 12 16:46:54 2013
@@ -0,0 +1,5 @@
+package org.apache.uima.ducc.common.agent.metrics.swap;
+
+public interface ProcessSwapSpaceUsage {
+	public long getSwapUsage();
+}

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-pm/src/main/java/org/apache/uima/ducc/pm/ProcessManagerComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-pm/src/main/java/org/apache/uima/ducc/pm/ProcessManagerComponent.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-pm/src/main/java/org/apache/uima/ducc/pm/ProcessManagerComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-pm/src/main/java/org/apache/uima/ducc/pm/ProcessManagerComponent.java Fri Apr 12 16:46:54 2013
@@ -36,6 +36,7 @@ import org.apache.uima.ducc.transport.di
 import org.apache.uima.ducc.transport.event.DuccEvent;
 import org.apache.uima.ducc.transport.event.DuccJobsStateEvent;
 import org.apache.uima.ducc.transport.event.PmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.common.CGroup;
 import org.apache.uima.ducc.transport.event.common.DuccJobDeployment;
 import org.apache.uima.ducc.transport.event.common.DuccUimaDeploymentDescriptor;
 import org.apache.uima.ducc.transport.event.common.DuccUserReservation;
@@ -47,6 +48,7 @@ import org.apache.uima.ducc.transport.ev
 import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
 import org.apache.uima.ducc.transport.event.common.IDuccWork;
 import org.apache.uima.ducc.transport.event.common.IDuccUnits.MemoryUnits;
+import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment;
 
 /**
  * The ProcessManager's main role is to receive Orchestrator updates, trim received state and
@@ -94,26 +96,48 @@ implements ProcessManager {
 	
 	/* New Code */
 	
+	private long normalizeMemory(String processMemoryAssignment, MemoryUnits units) {
+		 //  Get user defined memory assignment for the JP
+	    long normalizedProcessMemoryRequirements =
+	            Long.parseLong(processMemoryAssignment);
+	    // Normalize memory requirements for JPs into Gigs 
+	    if ( units.equals(MemoryUnits.KB ) ) {
+	      normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements/(1024*1024);
+	    } else if ( units.equals(MemoryUnits.MB ) ) {
+	      normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements/1024;
+	    } else if ( units.equals(MemoryUnits.GB ) ) {
+	      //  already normalized
+	    } else if ( units.equals(MemoryUnits.TB ) ) {
+	      normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements*1024;
+	    }
+	    return normalizedProcessMemoryRequirements;
+	}
+	private int getShares(long normalizedProcessMemoryRequirements ) {
+	    int shares = (int)normalizedProcessMemoryRequirements/shareQuantum;  // get number of shares
+	    if ( (normalizedProcessMemoryRequirements % shareQuantum) > 0 ) shares++; // ciel
+	    return shares;
+	}
 	private long calculateProcessMemoryAssignment( String processMemoryAssignment, MemoryUnits units) {
-    //  Get user defined memory assignment for the JP
-    long normalizedProcessMemoryRequirements =
-            Long.parseLong(processMemoryAssignment);
-    // Normalize memory requirements for JPs into Gigs 
-    if ( units.equals(MemoryUnits.KB ) ) {
-      normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements/(1024*1024);
-    } else if ( units.equals(MemoryUnits.MB ) ) {
-      normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements/1024;
-    } else if ( units.equals(MemoryUnits.GB ) ) {
-      //  already normalized
-    } else if ( units.equals(MemoryUnits.TB ) ) {
-      normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements*1024;
-    }
+      //  Get user defined memory assignment for the JP
+      long normalizedProcessMemoryRequirements = normalizeMemory(processMemoryAssignment,units);
+    
+//            Long.parseLong(processMemoryAssignment);
+//    // Normalize memory requirements for JPs into Gigs 
+//    if ( units.equals(MemoryUnits.KB ) ) {
+//      normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements/(1024*1024);
+//    } else if ( units.equals(MemoryUnits.MB ) ) {
+//      normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements/1024;
+//    } else if ( units.equals(MemoryUnits.GB ) ) {
+//      //  already normalized
+//    } else if ( units.equals(MemoryUnits.TB ) ) {
+//      normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements*1024;
+//    }
     int shares = (int)normalizedProcessMemoryRequirements/shareQuantum;  // get number of shares
     if ( (normalizedProcessMemoryRequirements % shareQuantum) > 0 ) shares++; // ciel
     // normalize to get process memory in terms of Megs
     long processAdjustedMemorySize = shares * shareQuantum * 1024;  
     //  add fudge factor (5% default)  to adjusted memory computed above 
-    processAdjustedMemorySize += (processAdjustedMemorySize * ((double)fudgeFactor/100));        
+    //processAdjustedMemorySize += (processAdjustedMemorySize * ((double)fudgeFactor/100));        
     return processAdjustedMemorySize;
 	}
 
@@ -132,30 +156,48 @@ implements ProcessManager {
 	        DuccWorkJob dcj = (DuccWorkJob)entry.getValue();
 	        //  Create process list for each job
 	        List<IDuccProcess> jobProcessList = new ArrayList<IDuccProcess>();
+	        
+	        long normalizedProcessMemoryRequirements = normalizeMemory(dcj.getSchedulingInfo().getShareMemorySize(),dcj.getSchedulingInfo().getShareMemoryUnits());
+	        int shares = getShares(normalizedProcessMemoryRequirements);
+	        long processAdjustedMemorySize = shares * shareQuantum * 1024;  
+	        ProcessMemoryAssignment pma = new ProcessMemoryAssignment();
+	        pma.setShares(shares);
+	        pma.setNormalizedMemoryInMBs(processAdjustedMemorySize);
+	        
+	    
 	        //  Copy job processes 
 	        for( Entry<DuccId,IDuccProcess> jpProcess : dcj.getProcessMap().getMap().entrySet()) {
 	          jobProcessList.add(jpProcess.getValue());
 	        }
-
+	        
+	        
+	        
 	        if ( dcj.getUimaDeployableConfiguration() instanceof DuccUimaDeploymentDescriptor ) {
 	          //  Add deployment UIMA AS deployment descriptor path
 	          ((JavaCommandLine)dcj.getCommandLine()).
 	            addArgument(((DuccUimaDeploymentDescriptor)dcj.getUimaDeployableConfiguration()).getDeploymentDescriptorPath());
 	        }
+	        
+	        
 	        //  Calculate Process memory allocation including a fudge factor (if one is defined). The
 	        //  returned value is in terms of Megs.
-	        long processAdjustedMemorySize = 
-	                calculateProcessMemoryAssignment(dcj.getSchedulingInfo().getShareMemorySize(), 
-	                        dcj.getSchedulingInfo().getShareMemoryUnits());
-
+//	        long processAdjustedMemorySize = 
+//	                calculateProcessMemoryAssignment(dcj.getSchedulingInfo().getShareMemorySize(), 
+//	                        dcj.getSchedulingInfo().getShareMemoryUnits());
+	        //  add fudge factor (5% default)  to adjust memory computed above 
+	        processAdjustedMemorySize += (processAdjustedMemorySize * ((double)fudgeFactor/100));
+	        pma.setMaxMemoryWithFudge(processAdjustedMemorySize);
+	        
 	        logger.info(methodName,dcj.getDuccId(),"--------------- User Requested Memory For Process:"+dcj.getSchedulingInfo().getShareMemorySize()+dcj.getSchedulingInfo().getShareMemoryUnits()+" PM Calculated Memory Assignment of:"+processAdjustedMemorySize);
 	        
 	        ICommandLine driverCmdLine = null;
 	        IDuccProcess driverProcess = null;
 	        switch(dcj.getDuccType()) {
 	        case Job:
+		       
 	          driverCmdLine = dcj.getDriver().getCommandLine();
 	          driverProcess = dcj.getDriver().getProcessMap().entrySet().iterator().next().getValue();
+	          
 	          break;
 	        case Service:
 	          //logger.info(methodName,null,"!!!!!!!!!!!!! GOT SERVICE");
@@ -164,11 +206,13 @@ implements ProcessManager {
 	        default:
 	          
 	        }
+	        
 	        jobDeploymentList.add( new DuccJobDeployment(dcj.getDuccId(), driverCmdLine,
 	                           dcj.getCommandLine(), 
 	                           dcj.getStandardInfo(),
 	                           driverProcess,
-	                           processAdjustedMemorySize,
+	                           pma,
+	                           //processAdjustedMemorySize,
 	                           jobProcessList ));
 	      } else if (entry.getValue() instanceof DuccWorkReservation ) {
 	        String userId = ((DuccWorkReservation) entry.getValue()).getStandardInfo().getUser();

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccJobDeployment.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccJobDeployment.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccJobDeployment.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccJobDeployment.java Fri Apr 12 16:46:54 2013
@@ -15,7 +15,7 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
-*/
+ */
 package org.apache.uima.ducc.transport.event.common;
 
 import java.util.ArrayList;
@@ -26,35 +26,39 @@ import org.apache.uima.ducc.transport.cm
 import org.apache.uima.ducc.transport.cmdline.JavaCommandLine;
 import org.apache.uima.ducc.transport.cmdline.NonJavaCommandLine;
 
-
 public class DuccJobDeployment implements IDuccJobDeployment {
 	private static final long serialVersionUID = 1L;
 	private DuccId jobId;
-	//	at most two command lines can be accommodated
-  private ICommandLine[] jdclArray = new JavaCommandLine[1];
-  private ICommandLine[] pclArray;// = new JavaCommandLine[2];
+	// at most two command lines can be accommodated
+	private ICommandLine[] jdclArray = new JavaCommandLine[1];
+	private ICommandLine[] pclArray;// = new JavaCommandLine[2];
 
-  private IDuccStandardInfo stdInfo;
+	private IDuccStandardInfo stdInfo;
 	private List<IDuccProcess> jobProcesses = new ArrayList<IDuccProcess>();
-	private long processMemoryAssignment;
+//	private long processMemoryAssignment;
+	private ProcessMemoryAssignment pma; 
 	
-	public DuccJobDeployment( DuccId jobId, ICommandLine jdCmdLine, ICommandLine jpCmdLine,
-			IDuccStandardInfo stdInfo, IDuccProcess jdProcess, long processMemoryAssignment, List<IDuccProcess> jps ) {
+	public DuccJobDeployment(DuccId jobId, ICommandLine jdCmdLine,
+			ICommandLine jpCmdLine, IDuccStandardInfo stdInfo,
+			IDuccProcess jdProcess, ProcessMemoryAssignment pma,
+			List<IDuccProcess> jps) {
 		this.jobId = jobId;
-//    this.jdclArray = new JavaCommandLine[2];
+		// this.jdclArray = new JavaCommandLine[2];
 
-		if ( jpCmdLine instanceof JavaCommandLine ) {
-      this.pclArray = new JavaCommandLine[1];
-    } else {
-      this.pclArray = new NonJavaCommandLine[1];
-    }
+		if (jpCmdLine instanceof JavaCommandLine) {
+			this.pclArray = new JavaCommandLine[1];
+		} else {
+			this.pclArray = new NonJavaCommandLine[1];
+		}
 		this.jdclArray[0] = jdCmdLine;
 		this.pclArray[0] = jpCmdLine;
 		this.stdInfo = stdInfo;
 		this.jobProcesses.add(jdProcess);
 		this.jobProcesses.addAll(jps);
-		this.processMemoryAssignment = processMemoryAssignment;
+		this.pma = pma;
+		//this.processMemoryAssignment = processMemoryAssignment;
 	}
+
 	public ICommandLine getJdCmdLine() {
 		return this.jdclArray[0];
 	}
@@ -74,10 +78,12 @@ public class DuccJobDeployment implement
 	public List<IDuccProcess> getJpProcessList() {
 		return this.jobProcesses.subList(1, this.jobProcesses.size());
 	}
+
 	public DuccId getJobId() {
 		return jobId;
 	}
-  public long getProcessMemoryAssignment() {
-    return processMemoryAssignment;
-  }
+
+	public ProcessMemoryAssignment getProcessMemoryAssignment() {
+		return pma;
+	}
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccProcess.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccProcess.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccProcess.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccProcess.java Fri Apr 12 16:46:54 2013
@@ -60,6 +60,8 @@ public class DuccProcess implements IDuc
 	private boolean initialized = false;
 	private int exitCode;
 	private CGroup cgroup;
+	private long majorFaults;
+	private long swapUsage;
 	
 	public DuccProcess(DuccId duccId, NodeIdentity nodeIdentity) {
 		setDuccId(duccId);
@@ -517,4 +519,24 @@ public class DuccProcess implements IDuc
 		this.node = node;
 	}
 
+	public void setMajorFaults(long faultCount) {
+		this.majorFaults = faultCount;	
+	}
+
+	public long getMajorFaults() {
+		return majorFaults;
+	}
+
+	
+	public void setSwapUsage(long susage) {
+		this.swapUsage = susage;
+		
+	}
+
+	
+	public long getSwapUsage() {
+		
+		return swapUsage;
+	}
+
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccJobDeployment.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccJobDeployment.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccJobDeployment.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccJobDeployment.java Fri Apr 12 16:46:54 2013
@@ -62,6 +62,6 @@ public interface IDuccJobDeployment exte
 	 * Returns memory size assigned by user to this process
 	 * @return
 	 */
-  public long getProcessMemoryAssignment();
+  public ProcessMemoryAssignment getProcessMemoryAssignment();
 
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccProcess.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccProcess.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccProcess.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccProcess.java Fri Apr 12 16:46:54 2013
@@ -103,6 +103,7 @@ public interface IDuccProcess extends Se
 		Croaked,
 		Deallocated,
 		ExceededShareSize,
+		ExceededSwapThreshold,
 		FailedInitialization,
 		InitializationTimeout,
 		JPHasNoActiveJob, 
@@ -119,4 +120,10 @@ public interface IDuccProcess extends Se
 	public void setCGroup( CGroup cgroup);
 	public CGroup getCGroup();
 	
+	public void setMajorFaults(long faultCount);
+	public long getMajorFaults();
+	
+	public void setSwapUsage(long susage);
+	public long getSwapUsage();
+	
 }

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/ProcessMemoryAssignment.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/ProcessMemoryAssignment.java?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/ProcessMemoryAssignment.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/ProcessMemoryAssignment.java Fri Apr 12 16:46:54 2013
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.common;
+
+import java.io.Serializable;
+
+public class ProcessMemoryAssignment implements Serializable{
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+	int shares;
+	long normalizedMemoryInMBs;
+	long maxMemoryWithFudge;
+	public int getShares() {
+		return shares;
+	}
+	public void setShares(int shares) {
+		this.shares = shares;
+	}
+	public long getNormalizedMemoryInMBs() {
+		return normalizedMemoryInMBs;
+	}
+	public void setNormalizedMemoryInMBs(long normalizedMemoryInMBs) {
+		this.normalizedMemoryInMBs = normalizedMemoryInMBs;
+	}
+	public long getMaxMemoryWithFudge() {
+		return maxMemoryWithFudge;
+	}
+	public void setMaxMemoryWithFudge(long maxMemoryWithFudge) {
+		this.maxMemoryWithFudge = maxMemoryWithFudge;
+	}
+	
+}