You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2017/01/18 14:38:34 UTC

svn commit: r1779333 - /uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java

Author: cwiklik
Date: Wed Jan 18 14:38:34 2017
New Revision: 1779333

URL: http://svn.apache.org/viewvc?rev=1779333&view=rev
Log:
UIMA-5255 refactored to detect sim links to cpu and cpuacct

Modified:
    uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java?rev=1779333&r1=1779332&r2=1779333&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java Wed Jan 18 14:38:34 2017
@@ -25,6 +25,9 @@ import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
@@ -57,8 +60,6 @@ public class CGroupsManager {
 	private static final String CGDuccMemoryPath = "/memory/"+SYSTEM+"/";
 	private static final String CGDuccCpuPath = "/cpu/"+SYSTEM+"/";
 	private static final String CGProcsFile = "/cgroup.procs";
-//	private static final String CGDuccCpuAcctPath = "/cpu/"+SYSTEM+"/";
-	
 	// legacy means that the cgonfig points to <cgroup location>/ducc
 	private boolean legacyCgConfig = false;
 	
@@ -81,6 +82,9 @@ public class CGroupsManager {
 	// stores cgroup utils location like cgcreate, cgset, etc
 	private String cgroupUtilsDir=null;
 	// stores comma separated list of subsystems like cpu,memory
+	
+	private boolean cpuInfoSymlinked = true;
+
 	private String cgroupSubsystems = ""; // comma separated list of subsystems
 	private long retryMax = 4;
 	private long delayFactor = 2000;  // 2 secs in millis
@@ -88,29 +92,37 @@ public class CGroupsManager {
     private static  String fetchCgroupsBaseDir(String mounts) {
   	  String cbaseDir=null;
   	  BufferedReader br = null;
+  	  List<String> cgroupsEntries = new ArrayList<String>();
   	  try {
-  		  FileInputStream fis = new FileInputStream(mounts);
-  			//Construct BufferedReader from InputStreamReader
-  		  br = new BufferedReader(new InputStreamReader(fis));
-  		 
-  		String line = null;
-  		while ((line = br.readLine()) != null) {
-  			System.out.println(line);
-  			if ( line.trim().startsWith("cgroup") ) { 
+  	  	  List<String> lines = Files.readAllLines(Paths.get(mounts),Charset.defaultCharset());
+  	  	  for( String line : lines ) {
+  	  		// trim the list to just cgroups  
+  	  		if ( line.trim().startsWith("cgroup") ) {
+  	  			cgroupsEntries.add(line);
+  	  		}
+  	  	  }
+  	      // check if this is legacy cgroups installation looking like
+		  // cgroup /cgroup cgroup rw,relatime,memory,cpuacct,cpu 0 0
+  	  	  if ( cgroupsEntries.size() == 1) {
+				String[] cgroupsInfo = cgroupsEntries.get(0).split(" ");
+				// return the mount point minus the memory part
+				cbaseDir = cgroupsInfo[1].trim();
+  	  	  } else {
+  	  	      // check if this is recent cgroups installation looking like
+			  // cgroup /cgroup/memory cgroup rw,relatime,memory 0 0
+              //     OR
+			  // cgroup /sys/fs/cgroup/memory cgroup rw ...
+				
+			  // return the mount point minus the memory part
+  	  		  for(String line : cgroupsEntries) {
   				String[] cgroupsInfo = line.split(" ");
-  				if ( cgroupsInfo[1].indexOf("/memory") > -1 ) {
-  					// return the mount point minus the memory part
+  			    if ( cgroupsInfo[1].indexOf("/memory") != -1 ) {
   					cbaseDir = cgroupsInfo[1].substring(0, cgroupsInfo[1].indexOf("/memory") );
-  				} else if ( cgroupsInfo[1].indexOf("cpu") > -1){
-  					// return the mount point minus the memory part
-  					cbaseDir = cgroupsInfo[1].substring(0, cgroupsInfo[1].indexOf("/cpu") );
-  				} else {
-  					cbaseDir = cgroupsInfo[1].trim();
-  				}
-  			    break;
-  			}
-  		}
-  		 
+  			    	break;
+  			    }
+  	  		  }
+  	  	  }
+		 
   	  } catch( Exception e) {
   		  e.printStackTrace();
   	  } finally {
@@ -126,45 +138,81 @@ public class CGroupsManager {
 	 * @param args
 	 */
 	public static void main(String[] args) {
-		try {
-
-//			String cgBaseDir = "/cgroup"; //"/sys/fs/cgroup";//fetchCgroupsBaseDir("/proc/mounts");
-			String cgBaseDir = "/sys/fs/cgroup";//fetchCgroupsBaseDir("/proc/mounts");
-			
-			CGroupsManager cgMgr = new CGroupsManager("/usr/bin",cgBaseDir, "memory",
-					null, 10000);
-			System.out.println("Cgroups Accounting Enabled:"+cgMgr.isCpuReportingEnabled());
-			
-			/*
-			cgMgr.validator(cgBaseDir, "test2", System.getProperty("user.name"),false)
-          .cgcreate();
-
-			System.out.println("Cgroups Installed:"
-					+ cgMgr.cgroupExists("/cgroup/ducc"));
-			Set<String> containers = cgMgr.collectExistingContainers();
-			for (String containerId : containers) {
-				System.out.println("Existing CGroup Container ID:"
-						+ containerId);
-			}
-			cgMgr.createContainer(args[0], args[2], cgMgr.getUserGroupName(args[2]),true);
-			cgMgr.setContainerMaxMemoryLimit(args[0], args[2], true,
-					Long.parseLong(args[1]));
-			synchronized (cgMgr) {
-				cgMgr.wait(60000);
-			}
-			cgMgr.destroyContainer(args[0], args[2], NodeAgent.SIGKILL);
-*/
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
+    	String cgroupsUtilsDirs = "/usr/bin,/bin"; //System.getProperty("ducc.agent.launcher.cgroups.utils.dir");
+    	String cgUtilsPath = null;
+    	DuccLogger logger = DuccLogger.getLogger(CGroupsManager.class, "CGroupsManager");
+    	//boolean useCgroups = false;
+
+    	String[] paths = cgroupsUtilsDirs.split(",");
+    	for( String path : paths ) {
+    		File file = new File(path.trim()+"/cgexec");
+    		if ( file.exists() ) {
+    			cgUtilsPath = path;
+    			break;
+    		}
+    	}
+
+    	String cgroupsBaseDir = fetchCgroupsBaseDir("/proc/mounts");
+        
+        if ( cgUtilsPath == null ) {
+            System.out.println("------- CGroups Disabled - Unable to Find Cgroups Utils Directory. Add/Modify ducc.agent.launcher.cgroups.utils.dir property in ducc.properties");
+        } else if ( cgroupsBaseDir == null || cgroupsBaseDir.trim().length() == 0) {
+            System.out.println("------- CGroups Disabled - Unable to Find Cgroups Root Directory in /proc/mounts");
+        	
+        } else {
+        	System.out.println("Agent found cgroups runtime in "+cgUtilsPath+" cgroups base dir="+cgroupsBaseDir);
+
+        	String cgroupsSubsystems = "memory,cpu";
+
+            long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
+
+            CGroupsManager cgroupsManager = 
+            		new CGroupsManager(cgUtilsPath, cgroupsBaseDir, cgroupsSubsystems, logger, maxTimeToWaitForProcessToStop);
+            // check if cgroups base directory exists in the filesystem
+            // which means that cgroups
+            // and cgroups convenience package are installed and the
+            // daemon is up and running.
+              
+            try {
+               if (cgroupsManager.cgroupExists(cgroupsBaseDir)) {
+                    	System.out.println("Agent found cgroup base directory in "+cgroupsBaseDir);
+
+                   String containerId = "test";
+            	  // validate cgroups by creating a dummy cgroup. The code checks if cgroup actually got created by
+            	  // verifying existence of test cgroup file. The second step in verification is to check if 
+            	  // CPU control is working. Configured in cgconfig.conf, the CPU control allows for setting 
+            	  // cpu.shares. The code will attempt to set the shares and subsequently tries to read the
+            	  // value from cpu.shares file to make sure the values match. Any exception in the above steps
+            	  // will cause cgroups to be disabled.
+            	  //
+            	  cgroupsManager.validator(cgroupsBaseDir, containerId, System.getProperty("user.name"),false)
+            	              .cgcreate()
+            	              .cgset(100);   // write cpu.shares=100 and validate
+            	  
+            	  // cleanup dummy cgroup
+            	  cgroupsManager.destroyContainer(containerId, System.getProperty("user.name"), -9);
+               }
+                    
+              } catch( Exception ee) {
+            	  ee.printStackTrace();
+              }
+        }
 	}
-	public CGroupsManager(String cgroupUtilsDir, String cgroupBaseDir, String cgroupSubsystems,
+	public CGroupsManager(String cgroupUtilsDir, String cgroupBaseDir, String subsystems,
 			DuccLogger agentLogger, long maxTimeToWaitForProcessToStop) {
 		this.cgroupUtilsDir = cgroupUtilsDir;
 		this.cgroupBaseDir = cgroupBaseDir;
-		this.cgroupSubsystems = cgroupSubsystems;
+		this.cgroupSubsystems = subsystems;
 		this.agentLogger = agentLogger;
 		this.maxTimeToWaitForProcessToStop = maxTimeToWaitForProcessToStop;
+
+		// on some systems cpu and cpuacct may be linked to the same directory. In such
+		// cases we need adjust cgdelete command to only include memory,cpu as submodules:
+		// 
+		
+		
+		
+		
 		// determine what cgroup base location should be. For legacy cgconfig
 		// it will be <cgroup folder>/ducc
 		try {
@@ -183,6 +231,37 @@ public class CGroupsManager {
 			// if there is an error here, the new cgconfig is assumed and subject
 			// to additional testing on agent startup.
 		}
+		cpuInfoSymlinked = symbolicLinksForCpu();
+		String location = getCGroupLocation("cpuacct").trim();
+		if ( !legacyCgConfig ) {
+			if (!location.endsWith(System.getProperty("file.separator") )) {
+				location += System.getProperty("file.separator");
+			} 
+			location += SYSTEM+System.getProperty("file.separator");
+		}
+		System.out.println("------------- Location:"+location);
+		/*
+		if ( !legacyCgConfig ) {
+			location = cgroupBaseDir; //SYSTEM+System.getProperty("file.separator");
+			if ( !location.endsWith(System.getProperty("file.separator"))) {
+				location = location + System.getProperty("file.separator");
+			}
+			location += SYSTEM+System.getProperty("file.separator");
+		} else {
+			location = getCGroupLocation("cpuacct").trim();
+			if ( !location.endsWith(System.getProperty("file.separator"))) {
+				location = location + System.getProperty("file.separator");
+			}
+			
+		}
+*/
+		
+		
+		File cpuacctUsageFile = new File(location+"cpuacct.usage");
+		if ( cpuacctUsageFile.exists()) {
+			System.out.println("Got cpuacct.usage file");
+			this.cgroupSubsystems += ",cpuacct";
+		} 
 	}
 
 	/**
@@ -208,7 +287,75 @@ public class CGroupsManager {
 		} 
 		return location + subsystem;
 	}
-	
+	private boolean isCpuSubmodule(String[] parts, String submoduleName) {
+		return ( parts.length > 10 && parts[10].equals(submoduleName) );
+	}
+	private boolean symbolicLinksForCpu() {
+//	   	File f = new File("/sys/fs/cgroup");
+	   	File f = new File(cgroupBaseDir);
+	   	if ( !f.exists()) {
+	   		return false;
+	   	}
+    	InputStreamReader isr = null;
+    	BufferedReader reader = null;
+    	try {
+//    		String cmd[] = {"/usr/bin/ls","-l","/sys/fs/cgroup"};
+    		String cmd[] = {"/bin/ls","-l",cgroupBaseDir};
+		    StringBuffer sb = new StringBuffer();
+		    for (String s : cmd) {
+			   sb.append(s).append(" ");
+			}
+			agentLogger.info("symbolicLinksForCpu", null, "Launching Process - Commandline:"+sb.toString());
+
+    		ProcessBuilder processLauncher = new ProcessBuilder();
+			processLauncher.command(cmd);
+			processLauncher.redirectErrorStream(true);
+			java.lang.Process process = processLauncher.start();
+			isr = new InputStreamReader(process.getInputStream());
+			reader = new BufferedReader(isr);
+			String line;
+			String cpuLinkDir = "";
+			String cpuacctLinkDir = "";
+			
+			agentLogger.info("symbolicLinksForCpu", null, "Consuming Process Streams");
+			while ((line = reader.readLine()) != null) {
+				agentLogger.info("symbolicLinksForCpu", null, ">>>>" + line);
+				//groupName = line.trim();
+				String parts[] = line.split(" ");
+				if ( parts.length > 0 && parts[0].charAt(0)=='l') {  // link
+					if ( isCpuSubmodule(parts,"cpu") ) {
+						cpuLinkDir = parts[parts.length-1];
+					} else if (isCpuSubmodule(parts,"cpuacct")	) {
+					    cpuacctLinkDir = parts[parts.length-1];
+					}
+					// check if we got what we were looking for. If so, no need to iterate more
+					if ( cpuLinkDir.length() > 0 && cpuacctLinkDir.length() > 0 ) {
+						break;
+					}
+			    }
+			}
+			
+			agentLogger.info("symbolicLinksForCpu", null, "Waiting for Process to Exit");
+			int retCode = process.waitFor();
+			agentLogger.info("symbolicLinksForCpu", null, "Pocess Exit Code="+retCode);
+			if ( cpuLinkDir.length() > 0 && cpuacctLinkDir.length() > 0) {
+				if ( cpuLinkDir.trim().equals(cpuLinkDir.trim())) {
+					return true;  // both cpu and cpuacct link to the same dir
+				}
+			}
+    	    		
+    	} catch( Exception e) {
+			agentLogger.error("symbolicLinksForCpu", null, e);
+    	
+    	} finally {
+    		if ( reader != null ) {
+    			try {
+        			reader.close();
+    			} catch(Exception e) {}
+    		}
+    	}
+    	return false;
+	}
 	public void configure(NodeAgent agent ) {
 		if ( agent != null ) {
 			if ( agent.configurationFactory.maxRetryCount != null ) {
@@ -767,6 +914,39 @@ public class CGroupsManager {
 		}
 		return childCount;
 	}
+	private String adjustSubsystems() {
+		
+		
+		// if cpu and cpuacct are sym linked to the same dir, remove cpuacct part
+		// from the submodule list as it causes the cgdelete to throw an error.
+		if ( cpuInfoSymlinked && cgroupSubsystems.indexOf(",cpuacct") > -1 ) { 
+			return cgroupSubsystems.substring(0,cgroupSubsystems.indexOf(",cpuacct") );
+			
+			
+			/*
+			StringBuffer sb = new StringBuffer();
+			if ( cgroupSubsystems.indexOf("cpuacct") > -1 ) {
+				String[] subsystems = cgroupSubsystems.trim().split(",");
+				//StringUtils.j
+				Iterator<String> it = Arrays.asList(subsystems).iterator();
+				if ( it.hasNext()) {
+					while(it.hasNext() ) {
+						String subsystem = it.next();
+						if ( !"cpuacct".equals(subsystem)) {
+							sb.append(subsystem);
+							if ( it.hasNext()) {
+								sb.append(",");
+							}
+						}
+					}
+				}
+				return sb.toString();
+			}
+			*/
+		}
+		
+		return cgroupSubsystems;
+	}
 	/**
 	 * Removes cgroup container with a given id. Cgroups are implemented as a
 	 * virtual file system. All is needed here is just rmdir.
@@ -800,7 +980,10 @@ public class CGroupsManager {
 				}
 				// Any process remaining in a cgroup will be killed hard
 				killChildProcesses(containerId, userId, NodeAgent.SIGKILL);
-				String[] command = new String[] { cgroupUtilsDir+"/cgdelete",cgroupSubsystems + ":"+SYSTEM+"/" + containerId };
+				//String subsystems =cgroupSubsystems.substring(0,cgroupSubsystems.indexOf(",cpuacct") );
+				String subsystems = adjustSubsystems();
+
+				String[] command = new String[] { cgroupUtilsDir+"/cgdelete",subsystems + ":"+SYSTEM+"/" + containerId };
 				int retCode = launchCommand(command);
 				if ( cgroupExists(getCGroupLocation(CGDuccMemoryPath) + containerId)) {
 					agentLogger.info("destroyContainer", null, "Failed to remove Container "+containerId+" Using cgdelete command. Exit code:"+retCode);
@@ -809,12 +992,6 @@ public class CGroupsManager {
 					containerIds.remove(containerId);
 					return true;
 				}
-//				if (retCode == 0) {
-//					containerIds.remove(containerId);
-//					return true;
-//				} else {
-//					return false;
-//				}
 			}
 			return true; // nothing to do, cgroup does not exist
 		} catch (Exception e) {