You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2016/03/30 18:40:09 UTC

svn commit: r1737134 - in /uima/sandbox/uima-ducc/trunk: src/main/resources/ uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/ uima-ducc-agent/src/main/java/org/apache/uima/ducc/a...

Author: cwiklik
Date: Wed Mar 30 16:40:08 2016
New Revision: 1737134

URL: http://svn.apache.org/viewvc?rev=1737134&view=rev
Log:
UIMA-4862 refactored CGroupsManager. Parameterized retry cound and delay factor

Modified:
    uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java

Modified: uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties?rev=1737134&r1=1737133&r2=1737134&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties (original)
+++ uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties Wed Mar 30 16:40:08 2016
@@ -449,6 +449,10 @@ ducc.agent.launcher.cgroups.utils.dir=/u
 # Set cgroup memory.swappiness
 ducc.agent.launcher.cgroups.swappiness=10
 
+# number of retries to use when cgcreate fails
+ducc.agent.launcher.cgroups.max.retry.count=10
+# delay factor which will be used to increase amount of time to wait in between retries
+ducc.agent.launcher.cgroups.retry.delay.factor=2000
 # exclusion file to enable node based exclusion for cgroups and aps
 # syntax:  <node>=cgroups,ap
 # the above will exclude node from using cgroups and/or prevent deployment of APs

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java?rev=1737134&r1=1737133&r2=1737134&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java Wed Mar 30 16:40:08 2016
@@ -290,6 +290,7 @@ public class NodeAgent extends AbstractD
         		}
 
                 cgroupsManager = new CGroupsManager(cgUtilsPath, cgroupsBaseDir, cgroupsSubsystems, logger, maxTimeToWaitForProcessToStop);
+                cgroupsManager.configure(this);
                 // check if cgroups base directory exists in the filesystem
                 // which means that cgroups
                 // and cgroups convenience package are installed and the

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java?rev=1737134&r1=1737133&r2=1737134&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java Wed Mar 30 16:40:08 2016
@@ -105,6 +105,14 @@ public class AgentConfiguration {
   @Value("#{ systemProperties['ducc.agent.launcher.cgroups.swappiness'] }")
   public String nodeSwappiness;
 
+  // Get number of retries to use when cgcreate fails
+  @Value("#{ systemProperties['ducc.agent.launcher.cgroups.max.retry.count'] }")
+  public String maxRetryCount;
+
+  // Get delay factor which will be used to increase amount of time to wait in between retries
+  @Value("#{ systemProperties['ducc.agent.launcher.cgroups.retry.delay.factor'] }")
+  public String retryDelayFactor;
+
   
   @Autowired
   DuccTransportConfiguration agentTransport;

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java?rev=1737134&r1=1737133&r2=1737134&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java Wed Mar 30 16:40:08 2016
@@ -60,12 +60,15 @@ public class CGroupsManager {
    		 return cmd;
    	 }
     };
+    
 	private Set<String> containerIds = new LinkedHashSet<String>();
 	private String cgroupBaseDir = "";
 	private String cgroupUtilsDir=null;
 	private String cgroupSubsystems = ""; // comma separated list of subsystems
+	private long retryMax = 4;
+	private long delayFactor = 2000;  // 2 secs in millis
 											// eg. memory,cpu
-    private long maxTimeToWaitForProcessToStop;
+        private long maxTimeToWaitForProcessToStop;
     
 	/**
 	 * @param args
@@ -105,6 +108,16 @@ public class CGroupsManager {
 		this.agentLogger = agentLogger;
 		this.maxTimeToWaitForProcessToStop = maxTimeToWaitForProcessToStop;
 	}
+	public void configure(NodeAgent agent ) {
+		if ( agent != null ) {
+			if ( agent.configurationFactory.maxRetryCount != null ) {
+				retryMax = Integer.valueOf(agent.configurationFactory.maxRetryCount);
+			}
+			if ( agent.configurationFactory.retryDelayFactor != null ) {
+				delayFactor = Integer.valueOf(agent.configurationFactory.retryDelayFactor);
+			}
+		}
+	}
 	public Validator validator( String cgroupsBaseDir,String containerId, String uid, boolean useDuccling) {
 		return new Validator(this, cgroupsBaseDir, containerId, uid, useDuccling);
 	}
@@ -270,46 +283,28 @@ public class CGroupsManager {
 
 	public void kill(final String user, final String pid, final int signal) {
 		final String methodName = "kill";
-		InputStream is = null;
-		BufferedReader reader = null;
+		//InputStream is = null;
+		//BufferedReader reader = null;
 		try {
-		    //			String c_launcher_path = Utils.resolvePlaceholderIfExists(
-		    //			System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
-		    //			System.getProperties());
 			String cmdLine;
 			String arg;
-			//	boolean useDuccling = false;
 			if (Utils.isWindows()) {
 				cmdLine = "taskkill";
 				arg = "/PID";
 			} else {
-
-			    //  String useSpawn = System
-			    //			.getProperty("ducc.agent.launcher.use.ducc_spawn");
-			    //	if (useSpawn != null && useSpawn.toLowerCase().equals("true")) {
-			    //		useDuccling = true;
-			    //	}
-
 				cmdLine = "/bin/kill";
 				arg = "-"+signal;
 			}
-			//String[] duccling_nolog;
-			//if (useDuccling) {
-			//	duccling_nolog = new String[] { c_launcher_path, "-u", user,
-			//						"--", cmdLine, arg, pid };
-		//} else {
-		//	duccling_nolog = new String[] { cmdLine, arg, pid };
-		//	}
-			String[] commandLine  = new String[] { cmdLine, arg, pid };
-			// if (kill != null && Boolean.parseBoolean(kill) == true) {
+			
+			String[] command  = new String[] { cmdLine, arg, pid };
+			launchCommand(command, "ducc");
+			/*
 			ProcessBuilder pb = new ProcessBuilder(commandLine);
-			//			ProcessBuilder pb = new ProcessBuilder(duccling_nolog);
 			pb.redirectErrorStream(true);
 			java.lang.Process killedProcess = pb.start();
 			is = killedProcess.getInputStream();
 			reader = new BufferedReader(
 					new InputStreamReader(is));
-			// String line = null;
 			// read the next line from kill command
 			while (reader.readLine() != null) {
 				// dont care about the output, just drain the buffers
@@ -317,7 +312,6 @@ public class CGroupsManager {
 			is.close();
 			killedProcess.waitFor();
 			StringBuffer sb = new StringBuffer();
-			//			for (String part : duccling_nolog) {
 			for (String part : commandLine) {
 				sb.append(part).append(" ");
 			}
@@ -330,14 +324,22 @@ public class CGroupsManager {
 						"--------- Killed CGroup Process:" + pid + " Owned by:" + user
 								+ " Command:" + sb.toString());
 			}
+			*/
+			StringBuffer sb = new StringBuffer();
+			for (String part : command) {
+				sb.append(part).append(" ");
+			}
+			if (agentLogger == null) {
+				System.out.println("--------- Killed Process:" + pid
+						+ " Owned by:" + user + " Command:" + sb.toString());
+
+			} else {
+				agentLogger.info(methodName, null,
+						"--------- Killed CGroup Process:" + pid + " Owned by:" + user
+								+ " Command:" + sb.toString());
+			}
 		} catch (Exception e) {
 			agentLogger.error(methodName, null,e );
-		} finally {
-			if ( reader != null ) {
-				try {
-					reader.close();
-				} catch( Exception e) {}
-			}
 		}
 	}
     public String getContainerId(ManagedProcess managedProcess) {
@@ -366,60 +368,30 @@ public class CGroupsManager {
 	 */
 	public  boolean createContainer(String containerId, String userId,
 			boolean useDuccSpawn) throws Exception {
-		int retryMax = 3;
-		int retryCount = 2;
-		int delay = 30;  // in millis
-		Object sleepMonitor = new Object();
-		synchronized(CGroupsManager.class) {
-			// for some strange reason cgcreate fails to create a cgroup when
-			// the command is run in quick succession. Adding delay below seems
-			// to fix this. Note that the code below is synchronized using 
-			// class level locking which prevents two threads from running 
-			// cgcreate at the same time, yet the failures occur. Strange!
-			
-			try {
-				synchronized(sleepMonitor) {
-					sleepMonitor.wait(delay*(retryMax-retryCount));
-				}
-			} catch( InterruptedException ie) {}
-			while( retryCount > 0 ) {
-				try {
-					agentLogger.info("createContainer", null, "Creating CGroup Container:" + containerId);
-					
-					String[] command = new String[] { cgroupUtilsDir+"/cgcreate", "-t",
+
+		String message = "";
+		agentLogger.info("createContainer", null, "Creating CGroup Container:" + containerId);
+		String[] command = new String[] { cgroupUtilsDir+"/cgcreate", "-t",
 							"ducc", "-a", "ducc", "-g",
 							cgroupSubsystems + ":ducc/" + containerId };
-					int retCode = launchCommand(command, useDuccSpawn, "ducc",
-							containerId);
-					// Starting with libcgroup v.0.38, the cgcreate fails
-					// with exit code = 96 even though the cgroup gets
-					// created! The following code treats such return code
-					// as success. In case there is an error, subsequent
-					// cgset or cgexec will fail.
-					if (retCode == 0 || retCode == 96) {
-						containerIds.add(containerId);
-						agentLogger.info("createContainer", null, ">>>>"
-								+ "SUCCESS - Created CGroup Container:" + containerId);
-						return true;
-
-					} else {
-						System.out.println("RETCODE:"+retCode);
-						agentLogger.info("createContainer", null, ">>>>"
-								+ "FAILURE - return code:"+retCode+" Unable To Create CGroup Container:"
-								+ containerId+" Retrying in "+delay+" millis - retry#"+(retryMax-retryCount));
-					}
-				} catch (Exception e) {
-					agentLogger.error("createContainer", null, ">>>>"
-							+ "FAILURE - Unable To Create CGroup Container:"
-							+ containerId+" Retrying in "+delay*(retryMax-retryCount)+" millis - retry#"+(retryMax-retryCount), e);
-				}
-				System.out.println("FAILURE >>> Unable To Create CGroup Container:"
-						+ containerId+" Retrying in "+delay+" millis - retry#"+(retryMax-retryCount));
-				retryCount--;
-
-				
-			}
+		int retCode = launchCommand(command, "ducc");
+		if ( cgroupExists(cgroupBaseDir + "/" + containerId)) {
+			// Starting with libcgroup v.0.38, the cgcreate fails
+			// with exit code = 96 even though the cgroup gets
+			// created! The following code treats such return code
+			// as success. In case there is an error, subsequent
+			// cgset or cgexec will fail.
+			if (retCode == 0 || retCode == 96) {
+				containerIds.add(containerId);
+				agentLogger.info("createContainer", null, ">>>>"
+						+ "SUCCESS - Created CGroup Container:" + containerId+". The cgcreate return code:"+retCode);
+				return true;
+			} 
+		} else {
+			message = ">>> CGroup Container:"+containerId+ " not found in "+cgroupBaseDir;
 		}
+		agentLogger.error("createContainer", null, message);
+		System.out.println(message);
 		return false;
 	}
 
@@ -448,8 +420,7 @@ public class CGroupsManager {
 			String[] command = new String[] { cgroupUtilsDir+"/cgset", "-r",
 					"memory.limit_in_bytes=" + containerMaxSize,
 					"ducc/" + containerId };
-			int retCode = launchCommand(command, useDuccSpawn, "ducc",
-					containerId);
+			int retCode = launchCommand(command, "ducc");
 			if (retCode == 0) {
 				agentLogger.info("setContainerMaxMemoryLimit", null, ">>>>"
 						+ "SUCCESS - Created CGroup Limit on Container:"
@@ -493,8 +464,7 @@ public class CGroupsManager {
 			String[] command = new String[] { cgroupUtilsDir+"/cgset", "-r",
 					"cpu.shares=" + containerCpuShares,
 					"ducc/" + containerId };
-			int retCode = launchCommand(command, useDuccSpawn, "ducc",
-					containerId);
+			int retCode = launchCommand(command, "ducc");
 			if (retCode == 0) {
 				agentLogger.info("setContainerCpuShares", null, ">>>>"
 						+ "SUCCESS - Created CGroup with CPU Shares="+containerCpuShares+" on Container:"
@@ -539,8 +509,7 @@ public class CGroupsManager {
 			String[] command = new String[] { cgroupUtilsDir+"/cgset", "-r",
 					"memory.swappiness=" + swappiness,
 					"ducc/" + containerId };
-			int retCode = launchCommand(command, useDuccSpawn, "ducc",
-					containerId);
+			int retCode = launchCommand(command, "ducc");
 			if (retCode == 0) {
 				agentLogger.info("setContainerSwappiness", null, ">>>>"
 						+ "SUCCESS - Updated CGroup with Memory Swappiness="+swappiness+" on Container:"
@@ -613,7 +582,7 @@ public class CGroupsManager {
 				
 				String[] command = new String[] { "/bin/rmdir",
 						cgroupBaseDir + "/" + containerId };
-				int retCode = launchCommand(command, false, "ducc", containerId);
+				int retCode = launchCommand(command, "ducc");
 				if (retCode == 0) {
 					containerIds.remove(containerId);
 					return true;
@@ -627,109 +596,87 @@ public class CGroupsManager {
 		}
 	}
 
-	private int launchCommand(String[] command, boolean useDuccSpawn,
-			String userId, String containerId) throws Exception {
-	    //		String[] commandLine = null;
-		InputStreamReader in = null;
-		BufferedReader reader = null;
-		try {
-			//
-			// Use ducc_ling (c code) as a launcher for the actual process. The
-			// ducc_ling
-			// allows the process to run as a specified user in order to write
-			// out logs in
-			// user's space as oppose to ducc space.
-		    
-		    //			String c_launcher_path = Utils.resolvePlaceholderIfExists(
-		    //			System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
-		    //			System.getProperties());
-		    
-     	                StringBuffer sb = new StringBuffer();
-
-			/*
-			if (useDuccSpawn && c_launcher_path != null) {
-				commandLine = new String[4 + command.length];
-				commandLine[0] = c_launcher_path;
-				sb.append(c_launcher_path).append(" ");
-				commandLine[1] = "-u";
-				sb.append("-u ");
-				commandLine[2] = userId;
-				sb.append(userId);
-				commandLine[3] = "--";
-				sb.append(" -- ");
-				int j = 0;
-				for (int i = 4; i < commandLine.length; i++) {
-					sb.append(command[j]).append(" ");
-					commandLine[i] = command[j++];
-				}
-			} else {
-				commandLine = command;
+	private int launchCommand(String[] command,	String userId) throws Exception {
+		
+		int retryCount=0;
+		Object sleepMonitor = new Object();
+		synchronized(CGroupsManager.class) {
+			long delay = delayFactor;//
+			while( retryCount <= retryMax ) {
+				String message = "";
+				InputStreamReader in = null;
+				BufferedReader reader = null;
+				 StringBuffer sb = new StringBuffer();
 				if ( command != null ) {
-   				    for (int i = 0; i < command.length; i++) {
-					    sb.append(command[i]).append(" ");
+				    for (int i = 0; i < command.length; i++) {
+				    	sb.append(command[i]).append(" ");
 				    }
 				}
-			}
-			*/
+				
+				try {
+					agentLogger.info("launchCommand", null, "Launching Process - Commandline:"+sb.toString());
+					ProcessBuilder processLauncher = new ProcessBuilder();
+					processLauncher.command(command);
+					processLauncher.redirectErrorStream(true);
+					java.lang.Process process = processLauncher.start();
+
+					in = new InputStreamReader(
+							process.getInputStream());
+					reader = new BufferedReader(in);
+					String line;
+					agentLogger.info("launchCommand", null, "Consuming Process Streams");
+					while ((line = reader.readLine()) != null) {
+						agentLogger.info("launchCommand", null, ">>>>" + line);
+						System.out.println(line);
+					}
+					agentLogger.info("launchCommand", null, "Waiting for Process to Exit");
+					int retCode = process.waitFor();
+					
+					// Starting with libcgroup v.0.38, the cgcreate fails
+					// with exit code = 96 even though the cgroup gets
+					// created! The following code treats such return code
+					// as success. In case there is an error, subsequent
+					// cgset or cgexec will fail.
+					if (retCode == 0 || retCode == 96) {
+						System.out.println("--------- Returning Code:"+retCode+" Command:"+sb.toString());
 
-			//commandLine = command;
-			if ( command != null ) {
-			    for (int i = 0; i < command.length; i++) {
-				sb.append(command[i]).append(" ");
-			    }
-			}
-			agentLogger.info("launchCommand", null, "Launching Process - Commandline:"+sb.toString());
-			
-			ProcessBuilder processLauncher = new ProcessBuilder();
-			processLauncher.command(command);
-			//			processLauncher.command(commandLine);
-			processLauncher.redirectErrorStream(true);
-
-			java.lang.Process process = processLauncher.start();
-
-			in = new InputStreamReader(
-					process.getInputStream());
-			reader = new BufferedReader(in);
-			String line;
-			agentLogger.info("launchCommand", null, "Consuming Process Streams");
-			while ((line = reader.readLine()) != null) {
-				agentLogger.info("launchCommand", null, ">>>>" + line);
-				System.out.println(line);
-			}
-			agentLogger.info("launchCommand", null, "Waiting for Process to Exit");
-			int retCode = process.waitFor();
-			System.out.println("--------- Returning Code:"+retCode);
-			return retCode;
+						return retCode;
+					} else {
+						message = ">>>>"
+								+ "FAILURE - return code:"+retCode+" Unable To exec command:"+sb.toString()
+								+ " Retrying in "+delay+" millis - retry#"+(retryCount+1);
+					}
 
-		} catch (Exception e) {
-			StringBuffer sb = new StringBuffer();
-			//			if (commandLine != null) {
-			if (command != null) {
-			    //				for (String cmdPart : commandLine) {
-				for (String cmdPart : command) {
-					sb.append(cmdPart).append(" ");
+				} catch (Exception e) {
+					e.printStackTrace();
+					message =  ">>>>"
+							+ "FAILURE - Unable To exec command:"+sb.toString()
+						    +" Retrying in "+delay+" millis - retry#"+(retryCount+1);
+				} finally {
+					if ( reader != null ) {
+						try {
+							reader.close();
+						} catch( Exception exx) {}
+					}
 				}
-			}
-			if (agentLogger != null) {
-				agentLogger.error("launchCommand", null,
-						"Unable to Launch Command:" + sb.toString(), e);
-				System.out
-				.println("CGroupsManager.launchCommand()- Unable to Launch Command:"
-						+ sb.toString());
-		e.printStackTrace();
-			} else {
-				System.out
-						.println("CGroupsManager.launchCommand()- Unable to Launch Command:"
-								+ sb.toString());
-				e.printStackTrace();
-			}
-
-		} finally {
-			if ( reader != null ) {
-				try {
-					reader.close();
-				} catch( Exception exx) {}
-			}
+				if ( retryMax == 0 ) {
+					agentLogger.error("launchCommand", null, ">>>>"
+							+ "Not configured to retry command:"+sb.toString());
+					break; 
+				}
+				agentLogger.error("launchCommand", null, message);
+				System.out.println(message);
+			    try {
+		           synchronized(sleepMonitor) {
+				      sleepMonitor.wait(delay);
+	  	           } 
+		        } catch( InterruptedException ie) {}
+
+				retryCount++;
+				delay += delayFactor;
+			}  // while
+			
+			
 		}
 		return -1; // failure
 	}