You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/04/12 18:46:55 UTC
svn commit: r1467346 - in /uima/sandbox/uima-ducc/trunk: src/main/admin/
uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/
uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/
uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/...
Author: cwiklik
Date: Fri Apr 12 16:46:54 2013
New Revision: 1467346
URL: http://svn.apache.org/r1467346
Log:
UIMA-2804 Support for CGroups
Added:
uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_get_process_swap_usage.sh
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessSwapSpaceUsage.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/ProcessMemoryAssignment.java
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessLifecycleController.java
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
uima/sandbox/uima-ducc/trunk/uima-ducc-pm/src/main/java/org/apache/uima/ducc/pm/ProcessManagerComponent.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccJobDeployment.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccProcess.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccJobDeployment.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccProcess.java
Added: uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_get_process_swap_usage.sh
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_get_process_swap_usage.sh?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_get_process_swap_usage.sh (added)
+++ uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_get_process_swap_usage.sh Fri Apr 12 16:46:54 2013
@@ -0,0 +1,28 @@
+#!/bin/bash
+# -----------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# -----------------------------------------------------------------------
+
+# Sums up swap use of a process with a given PID as reported by /proc/<PID>/smaps file
+
+for swap in `grep Swap /proc/$1/smaps 2>/dev/null|awk '{print $2}'`;
+do
+let sum=$sum+$swap;
+done
+echo $sum
+
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java Fri Apr 12 16:46:54 2013
@@ -18,9 +18,11 @@
*/
package org.apache.uima.ducc.agent;
+import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileReader;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
@@ -42,9 +44,11 @@ import org.apache.camel.builder.RouteBui
import org.apache.commons.lang.SerializationUtils;
import org.apache.uima.ducc.agent.config.AgentConfiguration;
import org.apache.uima.ducc.agent.event.ProcessLifecycleObserver;
+import org.apache.uima.ducc.agent.launcher.CGroupsManager;
import org.apache.uima.ducc.agent.launcher.Launcher;
import org.apache.uima.ducc.agent.launcher.ManagedProcess;
import org.apache.uima.ducc.agent.metrics.collectors.NodeUsersCollector;
+import org.apache.uima.ducc.common.Node;
import org.apache.uima.ducc.common.NodeIdentity;
import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties;
import org.apache.uima.ducc.common.component.AbstractDuccComponent;
@@ -74,6 +78,7 @@ import org.apache.uima.ducc.transport.ev
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
import org.apache.uima.ducc.transport.event.common.IResourceState.ResourceState;
import org.apache.uima.ducc.transport.event.common.ITimeWindow;
+import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment;
import org.apache.uima.ducc.transport.event.common.TimeWindow;
@@ -124,6 +129,15 @@ public class NodeAgent extends AbstractD
private RogueProcessReaper rogueProcessReaper =
new RogueProcessReaper(logger, 5, 10);
+ public volatile boolean useCgroups = false;
+
+ public CGroupsManager cgroupsManager = null;
+
+ public Node node = null;
+
+ public volatile boolean excludeAPs = false;
+
+ public int shareQuantum;
/**
* Ctor used exclusively for black-box testing of this class.
*/
@@ -133,6 +147,7 @@ public class NodeAgent extends AbstractD
public NodeAgent(NodeIdentity ni) {
this();
this.nodeIdentity = ni;
+
}
/**
@@ -152,6 +167,66 @@ public class NodeAgent extends AbstractD
this.launcher = launcher;
this.configurationFactory = factory;
this.commonProcessDispatcher = factory.getCommonProcessDispatcher(context);
+
+ if ( System.getProperty("ducc.rm.share.quantum") != null && System.getProperty("ducc.rm.share.quantum").trim().length() > 0 ) {
+ shareQuantum = Integer.parseInt(System.getProperty("ducc.rm.share.quantum").trim());
+ }
+ /* Enable CGROUPS */
+ String cgroups;
+ boolean excludeNodeFromCGroups=false;
+ if ( ( cgroups = System.getProperty("ducc.agent.launcher.cgroups.enable")) != null ) {
+ if ( cgroups.equalsIgnoreCase("true")) {
+ // Load exclusion file. Some nodes may be excluded from cgroups
+ String exclusionFile;
+
+ // get the name of the exclusion file from ducc.properties
+ if ( ( exclusionFile = System.getProperty("ducc.agent.exclusion.file")) != null ) {
+ // Parse node exclusion file and determine if cgroups and AP deployment
+ // is allowed on this node
+ NodeExclusionParser exclusionParser = new NodeExclusionParser();
+ exclusionParser.parse(exclusionFile);
+ excludeNodeFromCGroups = exclusionParser.cgroupsExcluded();
+ excludeAPs = exclusionParser.apExcluded();
+
+ System.out.println("excludeNodeFromCGroups="+excludeNodeFromCGroups+" excludeAPs="+excludeAPs);
+ } else {
+ System.out.println("Running with No exclusion File");
+ }
+ // node not in the exclusion list for cgroups
+ if ( !excludeNodeFromCGroups ) {
+ // get the top level cgroup folder from ducc.properties. If not defined, use /cgroup/ducc as default
+ String cgroupsBaseDir =
+ System.getProperty("ducc.agent.launcher.cgroups.basedir");
+ if ( cgroupsBaseDir == null ) {
+ cgroupsBaseDir = "/cgroup/ducc";
+ }
+ // get the cgroup subsystems. If not defined, default to the memory subsystem
+ String cgroupsSubsystems =
+ System.getProperty("ducc.agent.launcher.cgroups.subsystems");
+ if ( cgroupsSubsystems == null ) {
+ cgroupsSubsystems = "memory";
+ }
+ cgroupsManager = new CGroupsManager(cgroupsBaseDir, cgroupsSubsystems, logger);
+ // check if cgroups base directory exists in the filesystem which means that cgroups
+ // and cgroups convenience package are installed and the daemon is up and running.
+ if ( cgroupsManager.cgroupExists(cgroupsBaseDir) ) {
+ useCgroups = true;
+ logger.info("nodeAgent", null,
+ "------- Agent Running with CGroups Enabled");
+
+ } else {
+ logger.info("nodeAgent", null,
+ "------- CGroups Not Installed on this Machine");
+ }
+ }
+
+ }
+ } else {
+ logger.info("nodeAgent", null,
+ "------- CGroups Not Enabled on this Machine");
+ }
+ logger.info("nodeAgent", null,"CGroup Support="+useCgroups+" excludeNodeFromCGroups="+excludeNodeFromCGroups+" excludeAPs="+excludeAPs);
+
String useSpawn = System.getProperty("ducc.agent.launcher.use.ducc_spawn");
if (useSpawn != null && useSpawn.toLowerCase().equals("true")) {
runWithDuccLing = true;
@@ -184,6 +259,25 @@ public class NodeAgent extends AbstractD
}
+ public void setNodeInfo( Node node ) {
+ this.node = node;
+ }
+
+ public Node getNodeInfo() {
+ return node;
+ }
+
+ public int getNodeTotalNumberOfShares() {
+ int shareQuantum = 0;
+ int shares = 1;
+ if ( System.getProperty("ducc.rm.share.quantum") != null && System.getProperty("ducc.rm.share.quantum").trim().length() > 0 ) {
+ shareQuantum = Integer.parseInt(System.getProperty("ducc.rm.share.quantum").trim());
+ shares = (int)getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal()/shareQuantum; // get number of shares
+ if ( (getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal() % shareQuantum) > 0 ) shares++; // ciel
+ }
+
+ return shares;
+ }
public void start(DuccService service) throws Exception {
super.start(service, null);
String name = nodeIdentity.getName();
@@ -326,7 +420,7 @@ public class NodeAgent extends AbstractD
* @param workDuccId - job id
*/
public void reconcileProcessStateAndTakeAction(ProcessLifecycleController lifecycleController,
- IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info, long processMemoryAssignment, DuccId workDuccId) {
+ IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info, ProcessMemoryAssignment processMemoryAssignment, DuccId workDuccId) {
String methodName = "reconcileProcessStateAndTakeAction";
try {
inventorySemaphore.acquire();
@@ -395,7 +489,7 @@ public class NodeAgent extends AbstractD
+ process.getDuccId() + " is already in agent's inventory.");
return;
}
- startProcess(process, commandLine, info, workDuccId,0);
+ startProcess(process, commandLine, info, workDuccId, new ProcessMemoryAssignment());
} catch (InterruptedException e) {
logger.error(methodName, null, e);
} finally {
@@ -451,7 +545,7 @@ public class NodeAgent extends AbstractD
*
*/
public void startProcess(IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info,
- DuccId workDuccId, long processMemoryAssignment) {
+ DuccId workDuccId, ProcessMemoryAssignment processMemoryAssignment) {
String methodName = "startProcess";
try {
@@ -772,7 +866,7 @@ public class NodeAgent extends AbstractD
* - fully defined command line that will be used to exec the process.
*/
private void deployProcess(IDuccProcess process, ICommandLine commandLine,
- IDuccStandardInfo info, DuccId workDuccId, long processMemoryAssignment) {
+ IDuccStandardInfo info, DuccId workDuccId, ProcessMemoryAssignment processMemoryAssignment) {
String methodName = "deployProcess";
synchronized (monitor) {
boolean deployProcess = true;
@@ -878,7 +972,7 @@ public class NodeAgent extends AbstractD
+ " Not in Agent's inventory. Adding to the inventory with state=Stopped");
process.setProcessState(ProcessState.Stopped);
inventory.put(process.getDuccId(), process);
- deployedProcesses.add(new ManagedProcess(process, null, this, logger,0));
+ deployedProcesses.add(new ManagedProcess(process, null, this, logger, new ProcessMemoryAssignment()));
}
}
}
@@ -1356,4 +1450,49 @@ public class NodeAgent extends AbstractD
e.printStackTrace();
}
}
+
+ private class NodeExclusionParser {
+ private boolean excludeNodeFromCGroups = false;
+ private boolean excludeAP = false;
+
+ public void parse(String exclFile) throws Exception {
+ // <node>=cgroup,ap
+ File exclusionFile = new File(exclFile);
+ if ( !exclusionFile.exists() ) {
+ return;
+ }
+ BufferedReader br = new BufferedReader(new FileReader(exclusionFile));
+ String line;
+ NodeIdentity node = getIdentity();
+ String nodeName = node.getName();
+ if ( nodeName.indexOf(".") > -1 ) {
+ nodeName = nodeName.substring(0, nodeName.indexOf("."));
+ }
+
+ while ((line = br.readLine()) != null) {
+ if ( line.startsWith(nodeName ) ) {
+ String exclusions = line.substring( line.indexOf("=")+1);
+ String[] parsedExclusions = exclusions.split(",");
+ for( String exclusion : parsedExclusions ) {
+
+ if ( exclusion.trim().equals("cgroup") ) {
+ excludeNodeFromCGroups = true;
+
+ } else if ( exclusion.trim().equals("ap")) {
+ excludeAP = true;
+
+ }
+ }
+ break;
+ }
+ }
+ br.close();
+ }
+ public boolean apExcluded() {
+ return excludeAP;
+ }
+ public boolean cgroupsExcluded() {
+ return excludeNodeFromCGroups;
+ }
+ }
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessLifecycleController.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessLifecycleController.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessLifecycleController.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessLifecycleController.java Fri Apr 12 16:46:54 2013
@@ -22,9 +22,10 @@ import org.apache.uima.ducc.common.utils
import org.apache.uima.ducc.transport.cmdline.ICommandLine;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
+import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment;
public interface ProcessLifecycleController {
- public void startProcess(IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info, DuccId workDuccId, long shareMemorySize);
+ public void startProcess(IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info, DuccId workDuccId, ProcessMemoryAssignment pma);
public void stopProcess( IDuccProcess process );
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java Fri Apr 12 16:46:54 2013
@@ -48,6 +48,9 @@ import org.springframework.beans.factory
public class AgentEventListener implements DuccEventDelegateListener {
DuccLogger logger = DuccLogger.getLogger(this.getClass(), Agent.COMPONENT_NAME);
ProcessLifecycleController lifecycleController = null;
+ // On startup of the Agent we may need to do cleanup of cgroups.
+ // This cleanup will happen once right after processing of the first OR publication.
+ private boolean cleanupPhase = true;
private NodeAgent agent;
public AgentEventListener(NodeAgent agent, ProcessLifecycleController lifecycleController) {
@@ -98,39 +101,46 @@ public class AgentEventListener implemen
// }
try {
- // print JP report targeted for this node
- reportIncomingStateForThisNode(duccEvent);
-
- List<DuccUserReservation> reservations =
- duccEvent.getUserReservations();
- agent.setReservations(reservations);
- // Stop any process that is in this Agent's inventory but not associated with any
- // of the jobs we just received
- agent.takeDownProcessWithNoJob(agent,duccEvent.getJobList());
- // iterate over all jobs and reconcile those processes that are assigned to this agent. First,
- // look at the job's JD process and than JPs.
- for( IDuccJobDeployment jobDeployment : duccEvent.getJobList()) {
- // check if this node is a target for this job's JD
- if ( isTargetNodeForProcess(jobDeployment.getJdProcess()) ) {
- // agent will check the state of JD process and either start, stop, or take no action
- ICommandLine jdCommandLine = jobDeployment.getJdCmdLine();
- if(jdCommandLine != null) {
- agent.reconcileProcessStateAndTakeAction(lifecycleController, jobDeployment.getJdProcess(), jobDeployment.getJdCmdLine(),
- jobDeployment.getStandardInfo(), jobDeployment.getProcessMemoryAssignment(), jobDeployment.getJobId());
- }
- else {
- logger.error("onDuccJobsStateEvent", null, "job is service");
- }
- }
- // check JPs
- for( IDuccProcess process : jobDeployment.getJpProcessList() ) {
- if ( isTargetNodeForProcess(process) ) {
- // agent will check the state of JP process and either start, stop, or take no action
- agent.reconcileProcessStateAndTakeAction(lifecycleController, process, jobDeployment.getJpCmdLine(),
+
+ synchronized( this ) {
+ // print JP report targeted for this node
+ reportIncomingStateForThisNode(duccEvent);
+
+ List<DuccUserReservation> reservations =
+ duccEvent.getUserReservations();
+ if ( cleanupPhase ) { // true on Agent startup
+ // cleanup reservation cgroups
+ }
+ agent.setReservations(reservations);
+ // Stop any process that is in this Agent's inventory but not associated with any
+ // of the jobs we just received
+ agent.takeDownProcessWithNoJob(agent,duccEvent.getJobList());
+ // iterate over all jobs and reconcile those processes that are assigned to this agent. First,
+ // look at the job's JD process and than JPs.
+ for( IDuccJobDeployment jobDeployment : duccEvent.getJobList()) {
+ // check if this node is a target for this job's JD
+ if ( isTargetNodeForProcess(jobDeployment.getJdProcess()) ) {
+ // agent will check the state of JD process and either start, stop, or take no action
+ ICommandLine jdCommandLine = jobDeployment.getJdCmdLine();
+ if(jdCommandLine != null) {
+ agent.reconcileProcessStateAndTakeAction(lifecycleController, jobDeployment.getJdProcess(), jobDeployment.getJdCmdLine(),
jobDeployment.getStandardInfo(), jobDeployment.getProcessMemoryAssignment(), jobDeployment.getJobId());
+ }
+ else {
+ logger.error("onDuccJobsStateEvent", null, "job is service");
+ }
+ }
+ // check JPs
+ for( IDuccProcess process : jobDeployment.getJpProcessList() ) {
+ if ( isTargetNodeForProcess(process) ) {
+ // agent will check the state of JP process and either start, stop, or take no action
+ agent.reconcileProcessStateAndTakeAction(lifecycleController, process, jobDeployment.getJpCmdLine(),
+ jobDeployment.getStandardInfo(), jobDeployment.getProcessMemoryAssignment(), jobDeployment.getJobId());
+ }
}
}
- }
+
+ }
} catch( Exception e ) {
logger.error("onDuccJobsStateEvent", null, e);
}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java Fri Apr 12 16:46:54 2013
@@ -0,0 +1,220 @@
+package org.apache.uima.ducc.agent.launcher;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+
+/**
+ * Manages cgroup container on a node
+ *
+ * Supported operations:
+ * - cgcreate - creates cgroup container
+ * - cgset - sets max memory limit for an existing container
+ *
+ *
+ */
+public class CGroupsManager {
+ private DuccLogger agentLogger = null;
+
+ private Set<String> containerIds = new LinkedHashSet<String>();
+ private String cgroupBaseDir = "";
+ private String cgroupSubsystems = ""; // comma separated list of subsystems eg. memory,cpu
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ try {
+
+ CGroupsManager cgMgr = new CGroupsManager("/cgroup/ducc", "memory", null);
+ System.out.println("Cgroups Installed:"+cgMgr.cgroupExists("/cgroup/ducc"));
+ Set<String> containers = cgMgr.collectExistingContainers();
+ for ( String containerId : containers ) {
+ System.out.println("Existing CGroup Container ID:"+containerId);
+ }
+ cgMgr.createContainer(args[0], args[2], true);
+ cgMgr.setContainerMaxMemoryLimit(args[0], args[2], true, Long.parseLong(args[1]));
+ synchronized( cgMgr ) {
+ cgMgr.wait(60000);
+ }
+ cgMgr.destroyContainer(args[0]);
+
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public CGroupsManager(String cgroupBaseDir, String cgroupSubsystems, DuccLogger agentLogger ) {
+ this.cgroupBaseDir = cgroupBaseDir;
+ this.cgroupSubsystems = cgroupSubsystems;
+ this.agentLogger = agentLogger;
+ }
+ /**
+ * Creates cgroup container with a given id and owner.
+ *
+ * @param containerId - new cgroup container id
+ * @param userId - owner of the cgroup container
+ * @param useDuccSpawn - use duccling to run 'cgcreate' command
+ *
+ * @return - true on success, false otherwise
+ *
+ * @throws Exception
+ */
+ public boolean createContainer(String containerId, String userId, boolean useDuccSpawn ) throws Exception {
+
+ try {
+ String [] command = new String[] {"/usr/bin/cgcreate","-g", cgroupSubsystems+":ducc/"+containerId};
+ int retCode = launchCommand(command, useDuccSpawn, userId, containerId);
+ if ( retCode == 0 ) {
+ containerIds.add(containerId);
+ return true;
+ } else {
+ return false;
+ }
+ } catch ( Exception e ) {
+ return false;
+ }
+ }
+ /**
+ * Sets the max memory use for an existing cgroup container.
+ *
+ * @param containerId - existing container id for which limit will be set
+ * @param userId - container owner
+ * @param useDuccSpawn - run 'cgset' command as a user
+ * @param containerMaxSize - max memory limit
+ *
+ * @return - true on success, false otherwise
+ *
+ * @throws Exception
+ */
+ public boolean setContainerMaxMemoryLimit( String containerId, String userId, boolean useDuccSpawn, long containerMaxSize) throws Exception {
+ try {
+ String [] command = new String[] {"/usr/bin/cgset","-r", "memory.limit_in_bytes="+containerMaxSize, "ducc/"+containerId};
+ int retCode = launchCommand(command, useDuccSpawn, userId, containerId);
+ return retCode == 0 ? true : false;
+ } catch ( Exception e ) {
+ return false;
+ }
+ }
+
+ /**
+ * Removes cgroup container with a given id. Cgroups are implemented as
+ * a virtual file system. All is needed here is just rmdir.
+ *
+ * @param containerId - cgroup to remove
+ * @return - true on success, false otherwise
+ *
+ * @throws Exception
+ */
+ public boolean destroyContainer(String containerId) throws Exception {
+ try {
+ if ( cgroupExists(cgroupBaseDir+"/"+containerId)) {
+ String [] command = new String[] {"/bin/rmdir", cgroupBaseDir+"/"+containerId};
+ int retCode = launchCommand(command, false, "ducc", containerId);
+ if ( retCode == 0 ) {
+ containerIds.remove(containerId);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ return true; // nothing to do, cgroup does not exist
+ } catch ( Exception e ) {
+ return false;
+ }
+ }
+
+ private int launchCommand(String[] command, boolean useDuccSpawn, String userId, String containerId) throws Exception {
+ String[] commandLine = null;
+ try {
+ //
+ // Use ducc_ling (c code) as a launcher for the actual process. The ducc_ling
+ // allows the process to run as a specified user in order to write out logs in
+ // user's space as oppose to ducc space.
+ String c_launcher_path =
+ Utils.resolvePlaceholderIfExists(
+ System.getProperty("ducc.agent.launcher.ducc_spawn_path"),System.getProperties());
+
+
+
+ if ( useDuccSpawn && c_launcher_path != null ) {
+ commandLine = new String[4+command.length];
+ commandLine[0] = c_launcher_path;
+ commandLine[1] = "-u";
+ commandLine[2] = userId;
+ commandLine[3] = "--";
+
+ int j=0;
+ for(int i=4; i < commandLine.length;i++) {
+ commandLine[i] = command[j++];
+ }
+ } else {
+ commandLine = command;
+ }
+ ProcessBuilder processLauncher = new ProcessBuilder();
+ processLauncher.command(commandLine);
+ processLauncher.redirectErrorStream();
+
+ java.lang.Process process = processLauncher.start();
+
+ InputStreamReader in = new InputStreamReader(process.getInputStream());
+ BufferedReader reader = new BufferedReader(in);
+ String line;
+ while ((line = reader.readLine()) != null) {
+ System.out.println(">>>>"+line);
+ }
+ int retCode = process.waitFor();
+ return retCode;
+
+ } catch( Exception e) {
+ StringBuffer sb = new StringBuffer();
+ if ( commandLine != null ) {
+ for ( String cmdPart : commandLine ) {
+ sb.append(cmdPart).append(" ");
+ }
+ }
+ if ( agentLogger != null ) {
+ agentLogger.error("launchCommand", null, "Unable to Launch Command:"+sb.toString(),e);
+ } else {
+ System.out.println("CGroupsManager.launchCommand()- Unable to Launch Command:"+sb.toString());
+ e.printStackTrace();
+ }
+
+ }
+ return -1; // failure
+ }
+ /**
+ * Return a Set of existing cgroup Ids found in the filesystem identified
+ * by 'cgroupBaseDir'.
+ *
+ * @return - set of cgroup ids
+ *
+ * @throws Exception
+ */
+ public Set<String> collectExistingContainers() throws Exception {
+ File duccCGroupBaseDir = new File(cgroupBaseDir);
+ if ( duccCGroupBaseDir.exists()) {
+ File[] existingCGroups = duccCGroupBaseDir.listFiles();
+ for (File cgroup : existingCGroups ) {
+ if ( cgroup.isDirectory() ) {
+ containerIds.add(cgroup.getName());
+ }
+ }
+ }
+ return containerIds;
+ }
+ public String getDuccCGroupBaseDir() {
+ return cgroupBaseDir;
+ }
+ public String getSubsystems() {
+ return cgroupSubsystems;
+ }
+ public boolean cgroupExists(String cgroup) throws Exception {
+ File duccCGroupBaseDir = new File(cgroup);
+ return duccCGroupBaseDir.exists();
+ }
+}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java Fri Apr 12 16:46:54 2013
@@ -40,10 +40,10 @@ import org.apache.uima.ducc.transport.cm
import org.apache.uima.ducc.transport.cmdline.JavaCommandLine;
import org.apache.uima.ducc.transport.event.ProcessStopDuccEvent;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
-import org.apache.uima.ducc.transport.event.common.ITimeWindow;
-import org.apache.uima.ducc.transport.event.common.TimeWindow;
import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+import org.apache.uima.ducc.transport.event.common.ITimeWindow;
+import org.apache.uima.ducc.transport.event.common.TimeWindow;
public class DuccCommandExecutor extends CommandExecutor {
@@ -73,24 +73,182 @@ public class DuccCommandExecutor extends
// default
return false;
}
-
+ /**
+ * Test if a given DuccProcess owns a cgroup on this node. Return false if the process is JD since JDs dont own
+ * a cgroup in which they run.
+ *
+ * An owner process DuccId matches cgroup name.
+ *
+ * @param duccProcess
+ * @return
+ * @throws Exception
+ */
+ private boolean cgroupOwner(IDuccProcess duccProcess ) throws Exception {
+ if ( duccProcess.getProcessType().equals(ProcessType.Pop) ) {
+ return false; // JD
+ } else if ( !agent.cgroupsManager.cgroupExists(agent.cgroupsManager.getDuccCGroupBaseDir()+"/"+duccProcess.getCGroup().getId() ) ) {
+ return true;
+ }
+ return false;
+ }
+ private boolean createCGroupContainer(IDuccProcess duccProcess, String containerId, String owner ) throws Exception {
+ // create cgroups container and assign limits
+ if ( agent.cgroupsManager.createContainer( containerId, owner, useDuccSpawn()) ) {
+ return agent.cgroupsManager.setContainerMaxMemoryLimit(containerId,
+ owner, useDuccSpawn(), duccProcess.getCGroup().getMaxMemoryLimit());
+
+// if ( isJD(duccProcess) ) {
+// agent.cgroupsManager.setContainerMaxMemoryLimit(containerId,
+// owner, useDuccSpawn(), duccProcess.getCGroup().getShares()* agent.shareQuantum); //duccProcess.getCGroup().getMaxMemoryLimit());
+// } else {
+// agent.cgroupsManager.setContainerMaxMemoryLimit(containerId,
+// owner, useDuccSpawn(),duccProcess.getCGroup().getShares()* agent.shareQuantum); //((ManagedProcess)super.managedProcess).getProcessMemoryAssignment().getNormalizedMemoryInMBs());
+// }
+// float percentOfTotal = ((ManagedProcess)super.managedProcess).getProcessMemoryAssignment().getShares()/agent.getNodeTotalNumberOfShares();
+// long maxProcessSwapUsage = (long) (agent.getNodeInfo().getNodeMetrics().getNodeMemory().getSwapTotal()*percentOfTotal);
+
+
+ /** NEED TO START DAEMON THREAD TO CHECK SWAP USAGE OF THIS PROCESS */
+ //return true;
+ }
+ return false;
+ }
+ private boolean isJD(IDuccProcess duccProcess ) {
+ return duccProcess.getProcessType().equals(ProcessType.Pop);
+ }
+ private String getContainerId() {
+ String containerId;
+ if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Service)) {
+ containerId = String.valueOf(((ManagedProcess) managedProcess).getDuccProcess().getCGroup().getId());
+ } else {
+ containerId = ((ManagedProcess) managedProcess).getWorkDuccId().getFriendly()+"."+((ManagedProcess) managedProcess).getDuccProcess().getCGroup().getId();
+ }
+ return containerId;
+ }
public Process exec(ICommandLine cmdLine, Map<String, String> processEnv)
throws Exception {
String methodName = "exec";
try {
String[] cmd = getDeployableCommandLine(cmdLine,processEnv);
if ( isKillCommand(cmdLine) ) {
- logger.info(methodName, null, "Killing process");
+ logger.info(methodName, null, "Killing process");
stopProcess(cmdLine, cmd);
} else {
- startProcess(cmdLine, cmd, processEnv);
+ IDuccProcess duccProcess = ((ManagedProcess) managedProcess).getDuccProcess();
+
+
+ // Calculate how much swap space the process is allowed to use. The calculation is based on
+ // the percentage of real memory the process is assigned. The process is entitled the
+ // same percentage of the swap.
+ // Normalize node's total memory as it is expressed in KB. The calculation below is based on bytes.
+ double percentOfTotal = ((double)duccProcess.getCGroup().getMaxMemoryLimit())/
+ (agent.getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal()*1024); // need bytes
+
+
+ // substract 1Gig from total swap on this node to accommodate OS needs for swapping. The
+ // getSwapTotal() returns swap space in KBs so normalize 1Gig
+ long adjustedTotalSwapAvailable =
+ agent.getNodeInfo().getNodeMetrics().getNodeMemory().getSwapTotal() - 1048576;
+ // calculate the portion (in bytes) of swap this process is entitled to
+ long maxProcessSwapUsage =
+ (long) (adjustedTotalSwapAvailable*percentOfTotal)*1024;
+ // assigned how much swap this process is entitled to. If it exceeds this number the Agent
+ // will kill the process.
+ ((ManagedProcess) managedProcess).setMaxSwapThreshold(maxProcessSwapUsage);
+ logger.info(methodName, null, "---Process DuccId:"+duccProcess.getDuccId()+
+ " CGroup.getMaxMemoryLimit():"+((duccProcess.getCGroup().getMaxMemoryLimit()/1024)/1024)+" MBs"+
+ " Node Memory Total:"+(agent.getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal()/1024)+" MBs"+
+ " Percentage Of Real Memory:"+percentOfTotal+
+ " Adjusted Total Swap Available On Node:"+adjustedTotalSwapAvailable/1024+" MBs"+
+ " Process Entitled To Max:"+(maxProcessSwapUsage/1024)/1024+" MBs of Swap"
+ );
+
+ //logger.info(methodName, null, "The Process With ID:"+duccProcess.getDuccId()+" is Entitled to the Max "+( (maxProcessSwapUsage/1024)/1024)+" Megs of Swap Space");
+
+ // if configured to use cgroups and the process is the cgroup owner, create a cgroup
+ // using Process DuccId as a name. Additional processes may be injected into the
+ // cgroup by declaring cgroup owner id.
+ if ( agent.useCgroups ) {
+
+
+ // cgroup container id
+ //long containerId;
+ // JDs are of type Pop (Plain Old Process). JDs run in a reservation. The cgroup container
+ // is created for the reservation and we co-locate as many JDs as we can fit in it.
+ //String containerId = ((ManagedProcess) managedProcess).getWorkDuccId()+"."+duccProcess.getCGroup().getId().getFriendly();
+ String containerId = getContainerId();
+ logger.info(methodName, null, "Creating CGroup with ID:"+containerId);
+ if ( !agent.cgroupsManager.cgroupExists(agent.cgroupsManager.getDuccCGroupBaseDir()+"/"+containerId) ) {
+
+ // create cgroup container for JDs
+ try {
+ if ( createCGroupContainer(duccProcess, containerId, ((ManagedProcess)super.managedProcess).getOwner()) ) {
+ logger.info(methodName, null, "Created CGroup with ID:"+containerId+" With Memory Limit="+((ManagedProcess)super.managedProcess).getDuccProcess().getCGroup().getMaxMemoryLimit()+" Bytes");
+ } else {
+ logger.info(methodName, null, "Failed To Create CGroup with ID:"+containerId);
+ }
+ } catch( Exception e) {
+ logger.error(methodName, null, e);
+
+ }
+ } else {
+ logger.info(methodName, null, "CGroup Exists with ID:"+containerId);
+
+ }
+/*
+ if ( isJD(duccProcess) ) {
+ // For JDs the container is the reservation id
+ containerId = duccProcess.getCGroup().getId().getFriendly();
+ // check if we need to create a cgroup for JDs. First JD deployment will force creation
+ // of cgroup container
+ if ( !agent.cgroupsManager.cgroupExists(agent.cgroupsManager.getDuccCGroupBaseDir()+"/"+duccProcess.getCGroup().getId().getFriendly()) ) {
+ // create cgroup container for JDs
+ try {
+ if ( createCGroupContainer(duccProcess, containerId, "ducc") ) {
+ logger.info(methodName, null, "Created CGroup with ID:"+containerId);
+ } else {
+ logger.info(methodName, null, "Failed To Create CGroup with ID:"+containerId);
+ }
+ } catch( Exception e) {
+ logger.error(methodName, null, e);
+
+ }
+ }
+ } else if ( cgroupOwner(duccProcess)) {
+
+ containerId = duccProcess.getCGroup().getId().getFriendly();
+ // create cgroup container for JP/AP
+ createCGroupContainer(duccProcess, containerId, ((ManagedProcess)super.managedProcess).getOwner());
+
+// // NEED TO START DAEMON THREAD TO CHECK SWAP USAGE OF THIS PROCESS
+// }
+
+
+ } else {
+ containerId = duccProcess.getCGroup().getId().getFriendly();
+ }
+ */
+ String[] cgroupCmd = new String[cmd.length+3];
+ cgroupCmd[0] = "/usr/bin/cgexec";
+ cgroupCmd[1] = "-g";
+ cgroupCmd[2] = agent.cgroupsManager.getSubsystems()+":ducc/"+containerId;
+ int inx = 3;
+ for ( String cmdPart : cmd ) {
+ cgroupCmd[inx++] = cmdPart;
+ }
+ startProcess(cmdLine, cgroupCmd, processEnv);
+ } else {
+ // dont use CGroups
+ startProcess(cmdLine, cmd, processEnv);
+ }
+
}
return managedProcess;
} catch (Exception e) {
- if ( ((ManagedProcess)super.managedProcess).getDuccProcess() != null ) {
- DuccId duccId = ((ManagedProcess)super.managedProcess).getDuccId();
- logger.error(methodName, duccId, ((ManagedProcess)super.managedProcess).getDuccProcess().getDuccId(), e, new Object[]{});
- }
+ if ( ((ManagedProcess)super.managedProcess).getDuccProcess() != null ) {
+ DuccId duccId = ((ManagedProcess)super.managedProcess).getDuccId();
+ logger.error(methodName, duccId, ((ManagedProcess)super.managedProcess).getDuccProcess().getDuccId(), e, new Object[]{});
+ }
throw e;
}
}
@@ -160,6 +318,7 @@ public class DuccCommandExecutor extends
} else {
logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Dispatched STOP Request to Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" Process State: "+((ManagedProcess) managedProcess).getDuccProcess().getProcessState()+" .Process Not In Running State");
}
+
} catch (TimeoutException tex) { // on time out kill the process
if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Running)) {
logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Timed-out Waiting for Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" to Stop. Process State:"+((ManagedProcess) managedProcess).getDuccProcess().getProcessState()+" .Process did not stop in alloted time of "+maxTimeToWaitForProcessToStop+" millis");
@@ -178,47 +337,47 @@ public class DuccCommandExecutor extends
private void startProcess(ICommandLine cmdLine,String[] cmd, Map<String, String> processEnv) throws Exception {
String methodName = "startProcess";
-
+
ITimeWindow twr = new TimeWindow();
- String millis;
- millis = TimeStamp.getCurrentMillis();
-
- ((ManagedProcess) managedProcess).getDuccProcess().setTimeWindowRun(twr);
- twr.setStart(millis);
- ProcessBuilder pb = new ProcessBuilder(cmd);
-
- if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Pop) ||
- ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Service) ) {
- ITimeWindow twi = new TimeWindow();
- ((ManagedProcess) managedProcess).getDuccProcess().setTimeWindowInit(twi);
- twi.setStart(millis);
- twi.setEnd(millis);
- }
- Map<String, String> env = pb.environment();
- // enrich Process environment
- env.putAll(processEnv);
- if( cmdLine instanceof ACommandLine ) {
- // enrich Process environment with one from a given command line
- env.putAll(((ACommandLine)cmdLine).getEnvironment());
- }
- if ( logger.isTrace()) {
- // <dump>
- Iterator<String> iterator = env.keySet().iterator();
- while(iterator.hasNext()) {
- String key = iterator.next();
- String value = env.get(key);
- String message = "key:"+key+" "+"value:"+value;
- logger.trace(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), message);
- }
- }
- try {
- doExec(pb, cmd, isKillCommand(cmdLine));
- } catch(Exception e) {
- throw e;
- } finally {
- millis = TimeStamp.getCurrentMillis();
- twr.setEnd(millis);
+ String millis;
+ millis = TimeStamp.getCurrentMillis();
+
+ ((ManagedProcess) managedProcess).getDuccProcess().setTimeWindowRun(twr);
+ twr.setStart(millis);
+ ProcessBuilder pb = new ProcessBuilder(cmd);
+
+ if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Pop) ||
+ ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Service) ) {
+ ITimeWindow twi = new TimeWindow();
+ ((ManagedProcess) managedProcess).getDuccProcess().setTimeWindowInit(twi);
+ twi.setStart(millis);
+ twi.setEnd(millis);
+ }
+ Map<String, String> env = pb.environment();
+ // enrich Process environment
+ env.putAll(processEnv);
+ if( cmdLine instanceof ACommandLine ) {
+ // enrich Process environment with one from a given command line
+ env.putAll(((ACommandLine)cmdLine).getEnvironment());
+ }
+ if ( logger.isTrace()) {
+ // <dump>
+ Iterator<String> iterator = env.keySet().iterator();
+ while(iterator.hasNext()) {
+ String key = iterator.next();
+ String value = env.get(key);
+ String message = "key:"+key+" "+"value:"+value;
+ logger.trace(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), message);
}
+ }
+ try {
+ doExec(pb, cmd, isKillCommand(cmdLine));
+ } catch(Exception e) {
+ throw e;
+ } finally {
+ millis = TimeStamp.getCurrentMillis();
+ twr.setEnd(millis);
+ }
}
private void doExec(ProcessBuilder pb, String[] cmd, boolean isKillCmd) throws Exception {
String methodName = "doExec";
@@ -240,7 +399,14 @@ public class DuccCommandExecutor extends
exitCode = process.waitFor();
if ( !isKillCommand(cmdLine) ) {
logger.info(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), ">>>>>>>>>>>>> Process with PID:"+((ManagedProcess)super.managedProcess).getDuccProcess().getPID()+" Terminated");
+ // Process is dead, determine if the cgroup container should be destroyed as well.
+ if ( agent.useCgroups ) {
+ String containerId = getContainerId();
+ agent.cgroupsManager.destroyContainer(containerId);
+ logger.info(methodName, null, "Removed CGroup Container with ID:"+containerId);
+ }
}
+
} catch( NullPointerException ex) {
((ManagedProcess)super.managedProcess).getDuccProcess().setProcessState(ProcessState.Failed);
StringBuffer sb = new StringBuffer();
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java Fri Apr 12 16:46:54 2013
@@ -38,6 +38,7 @@ import org.apache.uima.ducc.transport.ev
import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment;
@@ -137,18 +138,24 @@ public class ManagedProcess implements P
private transient DuccLogger logger;
- private long processMemoryAssignment;
+ //private long processMemoryAssignment;
+
+ private long maxSwapThreshold;
+
+
+
+ private ProcessMemoryAssignment processMemoryAssignment;
public ManagedProcess(IDuccProcess process, ICommandLine commandLine) {
- this(process, commandLine, null, null,0);
+ this(process, commandLine, null, null, new ProcessMemoryAssignment());
}
public ManagedProcess(IDuccProcess process, ICommandLine commandLine,boolean agentProcess) {
- this(process, commandLine, null, null,0);
+ this(process, commandLine, null, null,new ProcessMemoryAssignment());
this.agentProcess = agentProcess;
}
- public ManagedProcess(IDuccProcess process, ICommandLine commandLine,ProcessLifecycleObserver observer, DuccLogger logger, long processMemoryAssignment) {
+ public ManagedProcess(IDuccProcess process, ICommandLine commandLine,ProcessLifecycleObserver observer, DuccLogger logger, ProcessMemoryAssignment processMemoryAssignment) {
this.commandLine = commandLine;
this.duccProcess = process;
this.observer = observer;
@@ -603,7 +610,7 @@ public class ManagedProcess implements P
this.future = future;
}
- public long getProcessMemoryAssignment() {
+ public ProcessMemoryAssignment getProcessMemoryAssignment() {
return processMemoryAssignment;
}
@@ -614,4 +621,12 @@ public class ManagedProcess implements P
public void setSocketEndpoint(String socketEndpoint) {
this.socketEndpoint = socketEndpoint;
}
+
+ public long getMaxSwapThreshold() {
+ return maxSwapThreshold;
+ }
+
+ public void setMaxSwapThreshold(long maxSwapThreshold) {
+ this.maxSwapThreshold = maxSwapThreshold;
+ }
}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java Fri Apr 12 16:46:54 2013
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.io.RandomAccessFile;
+import java.util.concurrent.Callable;
+
+import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessMemoryPageLoadUsage;
+import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+
+public class ProcessMajorFaultCollector extends AbstractMetricCollector implements
+ Callable<ProcessMemoryPageLoadUsage> {
+
+ public ProcessMajorFaultCollector(DuccLogger logger, String pid,
+ RandomAccessFile fileHandle, int howMany, int offset) {
+ super(fileHandle, howMany, offset);
+ }
+
+ public ProcessMemoryPageLoadUsage call() throws Exception {
+ try {
+ super.parseMetricFile();
+ return new DuccProcessMemoryPageLoadUsage(super.metricFileContents,
+ super.metricFieldOffsets, super.metricFieldLengths);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+/*
+ private String execTopShell() throws Exception {
+ List<String> command = new ArrayList<String>();
+ command.add("top");
+ command.add("-b");
+ command.add("-n");
+ command.add("1");
+ command.add("-p");
+ command.add(pid);
+
+ ProcessBuilder builder = new ProcessBuilder(command);
+ Process process = builder.start();
+ InputStream is = process.getInputStream();
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+ String line;
+ int count = 0;
+ String cpu = "";
+ try {
+ while ((line = br.readLine()) != null) {
+ if (count == 7) {
+ String[] values = line.trim().split("\\s+");
+ cpu = values[9];
+ process.destroy();
+ break;
+ }
+ count++;
+ }
+ } finally {
+ if (is != null) {
+ is.close();
+ }
+ }
+ process.waitFor();
+ return cpu;
+ }
+ */
+}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java Fri Apr 12 16:46:54 2013
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.io.RandomAccessFile;
+import java.util.concurrent.Callable;
+
+import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessMemoryPageLoadUsage;
+import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+
+public class ProcessSwapUsageCollector extends AbstractMetricCollector implements
+ Callable<ProcessMemoryPageLoadUsage> {
+
+ public ProcessSwapUsageCollector(DuccLogger logger, String pid,
+ RandomAccessFile fileHandle, int howMany, int offset) {
+ super(fileHandle, howMany, offset);
+ }
+
+ public ProcessMemoryPageLoadUsage call() throws Exception {
+ try {
+ super.parseMetricFile();
+ return new DuccProcessMemoryPageLoadUsage(super.metricFileContents,
+ super.metricFieldOffsets, super.metricFieldLengths);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+/*
+ private String execTopShell() throws Exception {
+ List<String> command = new ArrayList<String>();
+ command.add("top");
+ command.add("-b");
+ command.add("-n");
+ command.add("1");
+ command.add("-p");
+ command.add(pid);
+
+ ProcessBuilder builder = new ProcessBuilder(command);
+ Process process = builder.start();
+ InputStream is = process.getInputStream();
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+ String line;
+ int count = 0;
+ String cpu = "";
+ try {
+ while ((line = br.readLine()) != null) {
+ if (count == 7) {
+ String[] values = line.trim().split("\\s+");
+ cpu = values[9];
+ process.destroy();
+ break;
+ }
+ count++;
+ }
+ } finally {
+ if (is != null) {
+ is.close();
+ }
+ }
+ process.waitFor();
+ return cpu;
+ }
+ */
+}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java Fri Apr 12 16:46:54 2013
@@ -118,6 +118,11 @@ public class LinuxNodeMetricsProcessor e
cpuFuture.get(), nuiFuture.get());
Node node = new DuccNode(agent.getIdentity(), nodeMetrics);
+ // Make the agent aware how much memory is available on the node. Do this once.
+ if ( agent.getNodeInfo() == null ) {
+ agent.setNodeInfo(node);
+ }
+
((DuccNode)node).duccLingExists(agent.duccLingExists());
((DuccNode)node).runWithDuccLing(agent.runWithDuccLing());
logger.info(methodName, null, "... Agent "+node.getNodeIdentity().getName()+" Posting Memory:"
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java Fri Apr 12 16:46:54 2013
@@ -29,11 +29,15 @@ import org.apache.uima.ducc.agent.NodeAg
import org.apache.uima.ducc.agent.launcher.ManagedProcess;
import org.apache.uima.ducc.agent.metrics.collectors.DuccGarbageStatsCollector;
import org.apache.uima.ducc.agent.metrics.collectors.ProcessCpuUsageCollector;
+import org.apache.uima.ducc.agent.metrics.collectors.ProcessMajorFaultCollector;
import org.apache.uima.ducc.agent.metrics.collectors.ProcessResidentMemoryCollector;
import org.apache.uima.ducc.common.agent.metrics.cpu.ProcessCpuUsage;
import org.apache.uima.ducc.common.agent.metrics.memory.ProcessResidentMemory;
+import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessSwapSpaceUsage;
+import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage;
import org.apache.uima.ducc.common.node.metrics.ProcessGarbageCollectionStats;
import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
@@ -52,7 +56,7 @@ implements ProcessMetricsProcessor {
private ManagedProcess managedProcess;
private NodeAgent agent;
private int fudgeFactor = 5; // default is 5%
-
+ private int logCounter=0;
public LinuxProcessMetricsProcessor(DuccLogger logger, IDuccProcess process, NodeAgent agent,String statmFilePath, String nodeStatFilePath, String processStatFilePath, ManagedProcess managedProcess) throws FileNotFoundException{
this.logger = logger;
statmFile = new RandomAccessFile(statmFilePath, "r");
@@ -99,32 +103,67 @@ implements ProcessMetricsProcessor {
ProcessCpuUsageCollector processCpuUsageCollector =
new ProcessCpuUsageCollector(logger, process.getPID(), processStatFile,42,0);
+
Future<ProcessCpuUsage> processCpuUsage = pool.submit(processCpuUsageCollector);
+ ProcessMajorFaultCollector processMajorFaultUsageCollector =
+ new ProcessMajorFaultCollector(logger, process.getPID(), processStatFile,42,0);
+
+ Future<ProcessMemoryPageLoadUsage> processMajorFaultUsage = pool.submit(processMajorFaultUsageCollector);
+ String DUCC_HOME = Utils.findDuccHome();
+ // executes script DUCC_HOME/admin/ducc_get_process_swap_usage.sh which sums up swap used by a process
+ DuccProcessSwapSpaceUsage processSwapSpaceUsage =
+ new DuccProcessSwapSpaceUsage(process.getPID(),DUCC_HOME+"/admin/ducc_get_process_swap_usage.sh", logger);
+
logger.trace("process", null, "----------- PID:"+process.getPID()+" Cumulative CPU Time (jiffies):"+processCpuUsage.get().getTotalJiffies());
// Publish cumulative CPU usage
process.setCpuTime(processCpuUsage.get().getTotalJiffies());
- // if the fudgeFactor is negative, don't check if the process exceeded its
- // memory assignment.
+ long majorFaults = processMajorFaultUsage.get().getMajorFaults();
+ // collects process Major faults (swap in memory)
+ process.setMajorFaults(majorFaults);
+ // Current Process Swap Usage in bytes
+ long processSwapUsage = processSwapSpaceUsage.getSwapUsage()*1024;
+ // collects swap usage from /proc/<PID>/smaps file via a script DUCC_HOME/admin/collect_process_swap_usage.sh
+ process.setSwapUsage(processSwapUsage);
+ if ( (logCounter % 100 ) == 0 ) {
+ logger.info("process", null, "----------- PID:"+process.getPID()+" Major Faults:"+majorFaults+" Process Swap Usage:"+processSwapUsage);
+ }
+ logCounter++;
- if ( fudgeFactor > -1 && managedProcess.getProcessMemoryAssignment() > 0 ) {
- // RSS is in terms of pages(blocks) which size is system dependent. Default 4096 bytes
- long rss = (prm.get().get()*(blockSize/1024))/1024; // normalize RSS into MB
- logger.trace("process", null, "*** Process with PID:"+managedProcess.getPid()+ " Assigned Memory (MB): "+ managedProcess.getProcessMemoryAssignment()+" MBs. Current RSS (MB):"+rss);
- // check if process resident memory exceeds its memory assignment calculate in the PM
- if ( rss > managedProcess.getProcessMemoryAssignment() ) {
- logger.error("process", null, "\n\n********************************************************\n\tProcess with PID:"+managedProcess.getPid()+ " Exceeded its max memory assignment (including a fudge factor) of "+ managedProcess.getProcessMemoryAssignment()+" MBs. This Process Resident Memory Size: "+rss+" MBs .Killing process ...\n********************************************************\n\n" );
- try {
- managedProcess.kill(); // mark it for death
- process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededShareSize.toString());
- agent.stopProcess(process);
- } catch( Exception ee) {
- logger.error("process", null,ee);
- }
- return;
- }
- }
- // Publish resident memory
+ if (processSwapUsage > 0 && processSwapUsage > managedProcess.getMaxSwapThreshold()) {
+ logger.error("process", null, "\n\n********************************************************\n\tProcess with PID:"+managedProcess.getPid()+ " Exceeded its max swap usage assignment of "+ managedProcess.getMaxSwapThreshold()+" MBs. This Process Swap Usage is: "+processSwapUsage+" MBs .Killing process ...\n********************************************************\n\n" );
+ try {
+ managedProcess.kill(); // mark it for death
+ process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededSwapThreshold.toString());
+ agent.stopProcess(process);
+ } catch( Exception ee) {
+ logger.error("process", null,ee);
+ }
+ return;
+ } else {
+ // if the fudgeFactor is negative, don't check if the process exceeded its
+ // memory assignment.
+
+ if ( fudgeFactor > -1 && managedProcess.getProcessMemoryAssignment().getMaxMemoryWithFudge() > 0 ) {
+ // RSS is in terms of pages(blocks) which size is system dependent. Default 4096 bytes
+ long rss = (prm.get().get()*(blockSize/1024))/1024; // normalize RSS into MB
+ logger.trace("process", null, "*** Process with PID:"+managedProcess.getPid()+ " Assigned Memory (MB): "+ managedProcess.getProcessMemoryAssignment()+" MBs. Current RSS (MB):"+rss);
+ // check if process resident memory exceeds its memory assignment calculate in the PM
+ if ( rss > managedProcess.getProcessMemoryAssignment().getMaxMemoryWithFudge() ) {
+ logger.error("process", null, "\n\n********************************************************\n\tProcess with PID:"+managedProcess.getPid()+ " Exceeded its max memory assignment (including a fudge factor) of "+ managedProcess.getProcessMemoryAssignment()+" MBs. This Process Resident Memory Size: "+rss+" MBs .Killing process ...\n********************************************************\n\n" );
+ try {
+ managedProcess.kill(); // mark it for death
+ process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededShareSize.toString());
+ agent.stopProcess(process);
+ } catch( Exception ee) {
+ logger.error("process", null,ee);
+ }
+ return;
+ }
+ }
+
+ }
+ // Publish resident memory
process.setResidentMemory((prm.get().get()*blockSize));
ProcessGarbageCollectionStats gcStats =
gcStatsCollector.collect();
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java Fri Apr 12 16:46:54 2013
@@ -0,0 +1,17 @@
+package org.apache.uima.ducc.common.agent.metrics.swap;
+
+import org.apache.uima.ducc.common.node.metrics.ByteBufferParser;
+
+public class DuccProcessMemoryPageLoadUsage extends ByteBufferParser implements
+ ProcessMemoryPageLoadUsage {
+ private static final long serialVersionUID = 1L;
+ public static final int MAJORFAULTSFLD=12;
+
+ public DuccProcessMemoryPageLoadUsage(byte[] memInfoBuffer,
+ int[] memInfoFieldOffsets, int[] memInfoFiledLengths) {
+ super(memInfoBuffer, memInfoFieldOffsets, memInfoFiledLengths);
+ }
+ public long getMajorFaults() {
+ return super.getFieldAsLong(MAJORFAULTSFLD);
+ }
+}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java Fri Apr 12 16:46:54 2013
@@ -0,0 +1,55 @@
+package org.apache.uima.ducc.common.agent.metrics.swap;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+
+public class DuccProcessSwapSpaceUsage implements ProcessSwapSpaceUsage {
+ String pid=null;
+ String execScript=null;
+ DuccLogger logger=null;
+
+ public DuccProcessSwapSpaceUsage( String pid, String execScript, DuccLogger logger) {
+ this.pid = pid;
+ this.execScript = execScript;
+ this.logger = logger;
+ }
+ public long getSwapUsage() {
+ long swapusage=0;
+ if ( pid != null && execScript != null ) {
+ InputStreamReader in = null;
+ try {
+ ProcessBuilder pb = new ProcessBuilder();
+ String[] command = {execScript,pid};
+ pb.command(command);
+ pb.redirectErrorStream(true);
+ Process swapCollectorProcess = pb.start();
+ in = new InputStreamReader(swapCollectorProcess.getInputStream());
+ BufferedReader reader = new BufferedReader(in);
+ String line=null;
+
+ while ((line = reader.readLine()) != null && line.trim().length() > 0 ) {
+ try {
+ swapusage = Long.parseLong(line.trim());
+ } catch( NumberFormatException e) {
+ logger.error("getSwapUsage", null, line);
+ }
+ }
+ } catch( Exception e) {
+ logger.error("getSwapUsage", null, e);
+ } finally {
+ if ( in != null ) {
+ try {
+ in.close();
+ } catch( Exception e) {
+ logger.error("getSwapUsage", null, e);
+ }
+
+ }
+ }
+ }
+ return swapusage;
+ }
+
+}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java Fri Apr 12 16:46:54 2013
@@ -0,0 +1,5 @@
+package org.apache.uima.ducc.common.agent.metrics.swap;
+
+public interface ProcessMemoryPageLoadUsage {
+ public long getMajorFaults();
+}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessSwapSpaceUsage.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessSwapSpaceUsage.java?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessSwapSpaceUsage.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessSwapSpaceUsage.java Fri Apr 12 16:46:54 2013
@@ -0,0 +1,5 @@
+package org.apache.uima.ducc.common.agent.metrics.swap;
+
+public interface ProcessSwapSpaceUsage {
+ public long getSwapUsage();
+}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-pm/src/main/java/org/apache/uima/ducc/pm/ProcessManagerComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-pm/src/main/java/org/apache/uima/ducc/pm/ProcessManagerComponent.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-pm/src/main/java/org/apache/uima/ducc/pm/ProcessManagerComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-pm/src/main/java/org/apache/uima/ducc/pm/ProcessManagerComponent.java Fri Apr 12 16:46:54 2013
@@ -36,6 +36,7 @@ import org.apache.uima.ducc.transport.di
import org.apache.uima.ducc.transport.event.DuccEvent;
import org.apache.uima.ducc.transport.event.DuccJobsStateEvent;
import org.apache.uima.ducc.transport.event.PmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.common.CGroup;
import org.apache.uima.ducc.transport.event.common.DuccJobDeployment;
import org.apache.uima.ducc.transport.event.common.DuccUimaDeploymentDescriptor;
import org.apache.uima.ducc.transport.event.common.DuccUserReservation;
@@ -47,6 +48,7 @@ import org.apache.uima.ducc.transport.ev
import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
import org.apache.uima.ducc.transport.event.common.IDuccWork;
import org.apache.uima.ducc.transport.event.common.IDuccUnits.MemoryUnits;
+import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment;
/**
* The ProcessManager's main role is to receive Orchestrator updates, trim received state and
@@ -94,26 +96,48 @@ implements ProcessManager {
/* New Code */
+ private long normalizeMemory(String processMemoryAssignment, MemoryUnits units) {
+ // Get user defined memory assignment for the JP
+ long normalizedProcessMemoryRequirements =
+ Long.parseLong(processMemoryAssignment);
+ // Normalize memory requirements for JPs into Gigs
+ if ( units.equals(MemoryUnits.KB ) ) {
+ normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements/(1024*1024);
+ } else if ( units.equals(MemoryUnits.MB ) ) {
+ normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements/1024;
+ } else if ( units.equals(MemoryUnits.GB ) ) {
+ // already normalized
+ } else if ( units.equals(MemoryUnits.TB ) ) {
+ normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements*1024;
+ }
+ return normalizedProcessMemoryRequirements;
+ }
+ private int getShares(long normalizedProcessMemoryRequirements ) {
+ int shares = (int)normalizedProcessMemoryRequirements/shareQuantum; // get number of shares
+ if ( (normalizedProcessMemoryRequirements % shareQuantum) > 0 ) shares++; // ciel
+ return shares;
+ }
private long calculateProcessMemoryAssignment( String processMemoryAssignment, MemoryUnits units) {
- // Get user defined memory assignment for the JP
- long normalizedProcessMemoryRequirements =
- Long.parseLong(processMemoryAssignment);
- // Normalize memory requirements for JPs into Gigs
- if ( units.equals(MemoryUnits.KB ) ) {
- normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements/(1024*1024);
- } else if ( units.equals(MemoryUnits.MB ) ) {
- normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements/1024;
- } else if ( units.equals(MemoryUnits.GB ) ) {
- // already normalized
- } else if ( units.equals(MemoryUnits.TB ) ) {
- normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements*1024;
- }
+ // Get user defined memory assignment for the JP
+ long normalizedProcessMemoryRequirements = normalizeMemory(processMemoryAssignment,units);
+
+// Long.parseLong(processMemoryAssignment);
+// // Normalize memory requirements for JPs into Gigs
+// if ( units.equals(MemoryUnits.KB ) ) {
+// normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements/(1024*1024);
+// } else if ( units.equals(MemoryUnits.MB ) ) {
+// normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements/1024;
+// } else if ( units.equals(MemoryUnits.GB ) ) {
+// // already normalized
+// } else if ( units.equals(MemoryUnits.TB ) ) {
+// normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements*1024;
+// }
int shares = (int)normalizedProcessMemoryRequirements/shareQuantum; // get number of shares
if ( (normalizedProcessMemoryRequirements % shareQuantum) > 0 ) shares++; // ciel
// normalize to get process memory in terms of Megs
long processAdjustedMemorySize = shares * shareQuantum * 1024;
// add fudge factor (5% default) to adjusted memory computed above
- processAdjustedMemorySize += (processAdjustedMemorySize * ((double)fudgeFactor/100));
+ //processAdjustedMemorySize += (processAdjustedMemorySize * ((double)fudgeFactor/100));
return processAdjustedMemorySize;
}
@@ -132,30 +156,48 @@ implements ProcessManager {
DuccWorkJob dcj = (DuccWorkJob)entry.getValue();
// Create process list for each job
List<IDuccProcess> jobProcessList = new ArrayList<IDuccProcess>();
+
+ long normalizedProcessMemoryRequirements = normalizeMemory(dcj.getSchedulingInfo().getShareMemorySize(),dcj.getSchedulingInfo().getShareMemoryUnits());
+ int shares = getShares(normalizedProcessMemoryRequirements);
+ long processAdjustedMemorySize = shares * shareQuantum * 1024;
+ ProcessMemoryAssignment pma = new ProcessMemoryAssignment();
+ pma.setShares(shares);
+ pma.setNormalizedMemoryInMBs(processAdjustedMemorySize);
+
+
// Copy job processes
for( Entry<DuccId,IDuccProcess> jpProcess : dcj.getProcessMap().getMap().entrySet()) {
jobProcessList.add(jpProcess.getValue());
}
-
+
+
+
if ( dcj.getUimaDeployableConfiguration() instanceof DuccUimaDeploymentDescriptor ) {
// Add deployment UIMA AS deployment descriptor path
((JavaCommandLine)dcj.getCommandLine()).
addArgument(((DuccUimaDeploymentDescriptor)dcj.getUimaDeployableConfiguration()).getDeploymentDescriptorPath());
}
+
+
// Calculate Process memory allocation including a fudge factor (if one is defined). The
// returned value is in terms of Megs.
- long processAdjustedMemorySize =
- calculateProcessMemoryAssignment(dcj.getSchedulingInfo().getShareMemorySize(),
- dcj.getSchedulingInfo().getShareMemoryUnits());
-
+// long processAdjustedMemorySize =
+// calculateProcessMemoryAssignment(dcj.getSchedulingInfo().getShareMemorySize(),
+// dcj.getSchedulingInfo().getShareMemoryUnits());
+ // add fudge factor (5% default) to adjust memory computed above
+ processAdjustedMemorySize += (processAdjustedMemorySize * ((double)fudgeFactor/100));
+ pma.setMaxMemoryWithFudge(processAdjustedMemorySize);
+
logger.info(methodName,dcj.getDuccId(),"--------------- User Requested Memory For Process:"+dcj.getSchedulingInfo().getShareMemorySize()+dcj.getSchedulingInfo().getShareMemoryUnits()+" PM Calculated Memory Assignment of:"+processAdjustedMemorySize);
ICommandLine driverCmdLine = null;
IDuccProcess driverProcess = null;
switch(dcj.getDuccType()) {
case Job:
+
driverCmdLine = dcj.getDriver().getCommandLine();
driverProcess = dcj.getDriver().getProcessMap().entrySet().iterator().next().getValue();
+
break;
case Service:
//logger.info(methodName,null,"!!!!!!!!!!!!! GOT SERVICE");
@@ -164,11 +206,13 @@ implements ProcessManager {
default:
}
+
jobDeploymentList.add( new DuccJobDeployment(dcj.getDuccId(), driverCmdLine,
dcj.getCommandLine(),
dcj.getStandardInfo(),
driverProcess,
- processAdjustedMemorySize,
+ pma,
+ //processAdjustedMemorySize,
jobProcessList ));
} else if (entry.getValue() instanceof DuccWorkReservation ) {
String userId = ((DuccWorkReservation) entry.getValue()).getStandardInfo().getUser();
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccJobDeployment.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccJobDeployment.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccJobDeployment.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccJobDeployment.java Fri Apr 12 16:46:54 2013
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
-*/
+ */
package org.apache.uima.ducc.transport.event.common;
import java.util.ArrayList;
@@ -26,35 +26,39 @@ import org.apache.uima.ducc.transport.cm
import org.apache.uima.ducc.transport.cmdline.JavaCommandLine;
import org.apache.uima.ducc.transport.cmdline.NonJavaCommandLine;
-
public class DuccJobDeployment implements IDuccJobDeployment {
private static final long serialVersionUID = 1L;
private DuccId jobId;
- // at most two command lines can be accommodated
- private ICommandLine[] jdclArray = new JavaCommandLine[1];
- private ICommandLine[] pclArray;// = new JavaCommandLine[2];
+ // at most two command lines can be accommodated
+ private ICommandLine[] jdclArray = new JavaCommandLine[1];
+ private ICommandLine[] pclArray;// = new JavaCommandLine[2];
- private IDuccStandardInfo stdInfo;
+ private IDuccStandardInfo stdInfo;
private List<IDuccProcess> jobProcesses = new ArrayList<IDuccProcess>();
- private long processMemoryAssignment;
+// private long processMemoryAssignment;
+ private ProcessMemoryAssignment pma;
- public DuccJobDeployment( DuccId jobId, ICommandLine jdCmdLine, ICommandLine jpCmdLine,
- IDuccStandardInfo stdInfo, IDuccProcess jdProcess, long processMemoryAssignment, List<IDuccProcess> jps ) {
+ public DuccJobDeployment(DuccId jobId, ICommandLine jdCmdLine,
+ ICommandLine jpCmdLine, IDuccStandardInfo stdInfo,
+ IDuccProcess jdProcess, ProcessMemoryAssignment pma,
+ List<IDuccProcess> jps) {
this.jobId = jobId;
-// this.jdclArray = new JavaCommandLine[2];
+ // this.jdclArray = new JavaCommandLine[2];
- if ( jpCmdLine instanceof JavaCommandLine ) {
- this.pclArray = new JavaCommandLine[1];
- } else {
- this.pclArray = new NonJavaCommandLine[1];
- }
+ if (jpCmdLine instanceof JavaCommandLine) {
+ this.pclArray = new JavaCommandLine[1];
+ } else {
+ this.pclArray = new NonJavaCommandLine[1];
+ }
this.jdclArray[0] = jdCmdLine;
this.pclArray[0] = jpCmdLine;
this.stdInfo = stdInfo;
this.jobProcesses.add(jdProcess);
this.jobProcesses.addAll(jps);
- this.processMemoryAssignment = processMemoryAssignment;
+ this.pma = pma;
+ //this.processMemoryAssignment = processMemoryAssignment;
}
+
public ICommandLine getJdCmdLine() {
return this.jdclArray[0];
}
@@ -74,10 +78,12 @@ public class DuccJobDeployment implement
public List<IDuccProcess> getJpProcessList() {
return this.jobProcesses.subList(1, this.jobProcesses.size());
}
+
public DuccId getJobId() {
return jobId;
}
- public long getProcessMemoryAssignment() {
- return processMemoryAssignment;
- }
+
+ public ProcessMemoryAssignment getProcessMemoryAssignment() {
+ return pma;
+ }
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccProcess.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccProcess.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccProcess.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccProcess.java Fri Apr 12 16:46:54 2013
@@ -60,6 +60,8 @@ public class DuccProcess implements IDuc
private boolean initialized = false;
private int exitCode;
private CGroup cgroup;
+ private long majorFaults;
+ private long swapUsage;
public DuccProcess(DuccId duccId, NodeIdentity nodeIdentity) {
setDuccId(duccId);
@@ -517,4 +519,24 @@ public class DuccProcess implements IDuc
this.node = node;
}
+ public void setMajorFaults(long faultCount) {
+ this.majorFaults = faultCount;
+ }
+
+ public long getMajorFaults() {
+ return majorFaults;
+ }
+
+
+ public void setSwapUsage(long susage) {
+ this.swapUsage = susage;
+
+ }
+
+
+ public long getSwapUsage() {
+
+ return swapUsage;
+ }
+
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccJobDeployment.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccJobDeployment.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccJobDeployment.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccJobDeployment.java Fri Apr 12 16:46:54 2013
@@ -62,6 +62,6 @@ public interface IDuccJobDeployment exte
* Returns memory size assigned by user to this process
* @return
*/
- public long getProcessMemoryAssignment();
+ public ProcessMemoryAssignment getProcessMemoryAssignment();
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccProcess.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccProcess.java?rev=1467346&r1=1467345&r2=1467346&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccProcess.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IDuccProcess.java Fri Apr 12 16:46:54 2013
@@ -103,6 +103,7 @@ public interface IDuccProcess extends Se
Croaked,
Deallocated,
ExceededShareSize,
+ ExceededSwapThreshold,
FailedInitialization,
InitializationTimeout,
JPHasNoActiveJob,
@@ -119,4 +120,10 @@ public interface IDuccProcess extends Se
public void setCGroup( CGroup cgroup);
public CGroup getCGroup();
+ public void setMajorFaults(long faultCount);
+ public long getMajorFaults();
+
+ public void setSwapUsage(long susage);
+ public long getSwapUsage();
+
}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/ProcessMemoryAssignment.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/ProcessMemoryAssignment.java?rev=1467346&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/ProcessMemoryAssignment.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/ProcessMemoryAssignment.java Fri Apr 12 16:46:54 2013
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.common;
+
+import java.io.Serializable;
+
+public class ProcessMemoryAssignment implements Serializable{
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ int shares;
+ long normalizedMemoryInMBs;
+ long maxMemoryWithFudge;
+ public int getShares() {
+ return shares;
+ }
+ public void setShares(int shares) {
+ this.shares = shares;
+ }
+ public long getNormalizedMemoryInMBs() {
+ return normalizedMemoryInMBs;
+ }
+ public void setNormalizedMemoryInMBs(long normalizedMemoryInMBs) {
+ this.normalizedMemoryInMBs = normalizedMemoryInMBs;
+ }
+ public long getMaxMemoryWithFudge() {
+ return maxMemoryWithFudge;
+ }
+ public void setMaxMemoryWithFudge(long maxMemoryWithFudge) {
+ this.maxMemoryWithFudge = maxMemoryWithFudge;
+ }
+
+}