You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2019/08/02 17:27:49 UTC

svn commit: r1864247 [6/10] - in /uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent: ./ config/ deploy/ deploy/uima/ event/ exceptions/ launcher/ metrics/collectors/ monitor/ processors/

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsTest.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsTest.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsTest.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsTest.java Fri Aug  2 17:27:48 2019
@@ -28,117 +28,126 @@ import org.apache.uima.ducc.agent.NodeAg
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.id.DuccIdFactory;
 
-
 public class CGroupsTest {
-	 public static DuccLogger logger = DuccLogger.getLogger(NodeAgent.class, "CGroupsTest");
-	 CGroupsManager cgroupsManager = null;
-	 DuccIdFactory idFactory = null;
-	 Lock lock = new ReentrantLock();
-	 
-	public static void main(String[] args) {
-		try {
-			CGroupsTest tester = new CGroupsTest();
-			tester.initialize();
-			if ( args.length > 1) {
-				// run concurrent threads 
-				tester.run(Long.parseLong(args[0]), true);
-			} else {
-				// run sequentially 
-				tester.run(Long.parseLong(args[0]), false);
-			}
-		} catch( Exception e) {
-			e.printStackTrace();
-		}
-	}
-	public void run(long howMany, boolean concurrent) {
-		try {
-			CGroupsTest tester = new CGroupsTest();
-			tester.initialize();
-			ExecutorService executor = Executors.newCachedThreadPool();
-			// more than 1 arg to this program = concurrent
-			if ( concurrent ) {
-				for (int i = 0; i < howMany; i++) {
-					WorkerThread t = new WorkerThread();
-					executor.execute(t);
-					// if the wait below is removed, cgroup creation fails
-					synchronized(t) {
-						// NOTE: waiting for 100ms seems to make cgcreate working.
-						// Tested 10ms and got a failure to create cgroup. Weird.
-						t.wait(100);
-					}
-				}
-			} else {
-				for (int i = 0; i < howMany; i++) {
-					WorkerThread t = new WorkerThread();
-					Future<?> f = executor.submit(t);
-					f.get();
-
-				}
-			}
-			executor.shutdownNow();
-		} catch( Exception e) {
-			e.printStackTrace();
-		}
-		
-	}
-	public void initialize() throws Exception {
-
-		idFactory = new DuccIdFactory();
-		String cgroupsUtilsDirs = System.getProperty("ducc.agent.launcher.cgroups.utils.dir");
-      	String cgUtilsPath=null;
-      	if (cgroupsUtilsDirs == null) {
-        	cgUtilsPath = "/bin";  // default
-        } 
-        // get the top level cgroup folder from ducc.properties. If
-        // not defined, use /cgroup/ducc as default
-        String cgroupsBaseDir = System.getProperty("ducc.agent.launcher.cgroups.basedir");
-        if (cgroupsBaseDir == null) {
-          cgroupsBaseDir = "/cgroup/ducc";
-        }
-        String cgroupsSubsystems = "memory,cpu";
-		long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
+  public static DuccLogger logger = DuccLogger.getLogger(NodeAgent.class, "CGroupsTest");
+
+  CGroupsManager cgroupsManager = null;
+
+  DuccIdFactory idFactory = null;
 
-		cgroupsManager = 
-				new CGroupsManager(cgUtilsPath, cgroupsBaseDir, cgroupsSubsystems, logger, maxTimeToWaitForProcessToStop);
+  Lock lock = new ReentrantLock();
 
-		
-	}
-	
-	public class WorkerThread implements Runnable {
-		public WorkerThread() {
-			
-		}
-	public void run() {
-		try {
-			String containerId;
-			lock.lock();
-			containerId = idFactory.next().toString()+"."+idFactory.next().toString();
-			
-			System.out.println(">>>> Thread::"+Thread.currentThread().getId()+" creating cgroup with id:"+containerId);
-			if ( !cgroupsManager.createContainer(containerId, System.getProperty("user.name"), 
-					cgroupsManager.getUserGroupName(System.getProperty("user.name")),
-					true) ) {
-				System.out.println("Thread::"+Thread.currentThread().getId()+" Failure to create cgroup with id:"+containerId);
-				System.exit(-1);
-				
-			} else {
-				if ( cgroupsManager.cgroupExists(cgroupsManager.getDuccCGroupBaseDir() + "/" + containerId) ) {
-					System.out.println("Thread::"+Thread.currentThread().getId()+" Success creating cgroup with id:"+containerId);
-				
-				cgroupsManager.setContainerSwappiness(containerId, cgroupsManager.getDuccUid(), true, 10);
-				} else {
-					System.out.println("Failed to validate existance of cgroup with id:"+containerId);
-					System.exit(-1);
-				}
-			}
-			cgroupsManager.destroyContainer(containerId, cgroupsManager.getDuccUid(), NodeAgent.SIGTERM);
-			System.out.println("Cgroup "+containerId+" Removed");
-		} catch( Exception e ) {
-			e.printStackTrace();
-			//System.exit(-1);
-		} finally {
-			lock.unlock();
-		}
-	}
-	}
+  public static void main(String[] args) {
+    try {
+      CGroupsTest tester = new CGroupsTest();
+      tester.initialize();
+      if (args.length > 1) {
+        // run concurrent threads
+        tester.run(Long.parseLong(args[0]), true);
+      } else {
+        // run sequentially
+        tester.run(Long.parseLong(args[0]), false);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void run(long howMany, boolean concurrent) {
+    try {
+      CGroupsTest tester = new CGroupsTest();
+      tester.initialize();
+      ExecutorService executor = Executors.newCachedThreadPool();
+      // more than 1 arg to this program = concurrent
+      if (concurrent) {
+        for (int i = 0; i < howMany; i++) {
+          WorkerThread t = new WorkerThread();
+          executor.execute(t);
+          // if the wait below is removed, cgroup creation fails
+          synchronized (t) {
+            // NOTE: waiting for 100ms seems to make cgcreate working.
+            // Tested 10ms and got a failure to create cgroup. Weird.
+            t.wait(100);
+          }
+        }
+      } else {
+        for (int i = 0; i < howMany; i++) {
+          WorkerThread t = new WorkerThread();
+          Future<?> f = executor.submit(t);
+          f.get();
+
+        }
+      }
+      executor.shutdownNow();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+  }
+
+  public void initialize() throws Exception {
+
+    idFactory = new DuccIdFactory();
+    String cgroupsUtilsDirs = System.getProperty("ducc.agent.launcher.cgroups.utils.dir");
+    String cgUtilsPath = null;
+    if (cgroupsUtilsDirs == null) {
+      cgUtilsPath = "/bin"; // default
+    }
+    // get the top level cgroup folder from ducc.properties. If
+    // not defined, use /cgroup/ducc as default
+    String cgroupsBaseDir = System.getProperty("ducc.agent.launcher.cgroups.basedir");
+    if (cgroupsBaseDir == null) {
+      cgroupsBaseDir = "/cgroup/ducc";
+    }
+    String cgroupsSubsystems = "memory,cpu";
+    long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
+
+    cgroupsManager = new CGroupsManager(cgUtilsPath, cgroupsBaseDir, cgroupsSubsystems, logger,
+            maxTimeToWaitForProcessToStop);
+
+  }
+
+  public class WorkerThread implements Runnable {
+    public WorkerThread() {
+
+    }
+
+    public void run() {
+      try {
+        String containerId;
+        lock.lock();
+        containerId = idFactory.next().toString() + "." + idFactory.next().toString();
+
+        System.out.println(">>>> Thread::" + Thread.currentThread().getId()
+                + " creating cgroup with id:" + containerId);
+        if (!cgroupsManager.createContainer(containerId, System.getProperty("user.name"),
+                cgroupsManager.getUserGroupName(System.getProperty("user.name")), true)) {
+          System.out.println("Thread::" + Thread.currentThread().getId()
+                  + " Failure to create cgroup with id:" + containerId);
+          System.exit(-1);
+
+        } else {
+          if (cgroupsManager
+                  .cgroupExists(cgroupsManager.getDuccCGroupBaseDir() + "/" + containerId)) {
+            System.out.println("Thread::" + Thread.currentThread().getId()
+                    + " Success creating cgroup with id:" + containerId);
+
+            cgroupsManager.setContainerSwappiness(containerId, cgroupsManager.getDuccUid(), true,
+                    10);
+          } else {
+            System.out.println("Failed to validate existance of cgroup with id:" + containerId);
+            System.exit(-1);
+          }
+        }
+        cgroupsManager.destroyContainer(containerId, cgroupsManager.getDuccUid(),
+                NodeAgent.SIGTERM);
+        System.out.println("Cgroup " + containerId + " Removed");
+      } catch (Exception e) {
+        e.printStackTrace();
+        // System.exit(-1);
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
 }

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CommandBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CommandBuilder.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CommandBuilder.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CommandBuilder.java Fri Aug  2 17:27:48 2019
@@ -24,39 +24,35 @@ import org.apache.uima.internal.util.Arr
 
 public class CommandBuilder {
 
-	private static final String ducclingPath = Utils.resolvePlaceholderIfExists(
-			System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
-			System.getProperties());
-	
-	public CommandBuilder() {
-		
-	}
-	private static boolean useDuccling(ManagedProcess process) {
-		if ( process.isAgentProcess() || Utils.isWindows()) {
-			return false;
-		}
-		// On non-windows check if we should spawn the process via ducc_ling
-		String useDuccling = System
-				.getProperty("ducc.agent.launcher.use.ducc_spawn");
-		return ("true".equalsIgnoreCase(useDuccling) ? true : false );
-	}
-
-	public static String[] deployableStopCommand(ICommandLine cmdLine, ManagedProcess process) {
-		String[] cmd;
-		// Duccling, with no logging, always run by ducc, no need for
-		// workingdir
-		String[] ducclingNolog = new String[] { ducclingPath, "-u",
-				process.getOwner(), "--" };
-		
-		if ( useDuccling(process) ) {
-			cmd = (String[]) ArrayUtils.combine( ducclingNolog,
-					(String[]) ArrayUtils.combine(new String[] { cmdLine.getExecutable() },
-					cmdLine.getCommandLine()));
-		} else {
-			cmd = (String[]) ArrayUtils.combine(
-					new String[] { cmdLine.getExecutable() },
-					cmdLine.getCommandLine());
-		}
-		return cmd;
-	}
+  private static final String ducclingPath = Utils.resolvePlaceholderIfExists(
+          System.getProperty("ducc.agent.launcher.ducc_spawn_path"), System.getProperties());
+
+  public CommandBuilder() {
+
+  }
+
+  private static boolean useDuccling(ManagedProcess process) {
+    if (process.isAgentProcess() || Utils.isWindows()) {
+      return false;
+    }
+    // On non-windows check if we should spawn the process via ducc_ling
+    String useDuccling = System.getProperty("ducc.agent.launcher.use.ducc_spawn");
+    return ("true".equalsIgnoreCase(useDuccling) ? true : false);
+  }
+
+  public static String[] deployableStopCommand(ICommandLine cmdLine, ManagedProcess process) {
+    String[] cmd;
+    // Duccling, with no logging, always run by ducc, no need for
+    // workingdir
+    String[] ducclingNolog = new String[] { ducclingPath, "-u", process.getOwner(), "--" };
+
+    if (useDuccling(process)) {
+      cmd = (String[]) ArrayUtils.combine(ducclingNolog, (String[]) ArrayUtils
+              .combine(new String[] { cmdLine.getExecutable() }, cmdLine.getCommandLine()));
+    } else {
+      cmd = (String[]) ArrayUtils.combine(new String[] { cmdLine.getExecutable() },
+              cmdLine.getCommandLine());
+    }
+    return cmd;
+  }
 }

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java Fri Aug  2 17:27:48 2019
@@ -34,210 +34,186 @@ import org.apache.uima.ducc.transport.ev
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
 
 public abstract class CommandExecutor implements Callable<Process> {
-	protected Process managedProcess = null;
-	protected String host;
-	protected String ip;
-	protected ICommandLine cmdLine;
-	protected NodeAgent agent;
-
-	public abstract void stop() throws Exception;
-
-	public abstract Process exec(ICommandLine commandLine,
-			Map<String, String> processEnv) throws Exception;
-
-	public CommandExecutor(NodeAgent agent, ICommandLine cmdLine, String host,
-			String ip, Process managedProcess) throws Exception {
-		this.agent = agent;
-		this.host = host;
-		this.ip = ip;
-		this.managedProcess = managedProcess;
-		this.cmdLine = cmdLine;
-	}
-
-	/**
-	 * Called after process is launched. Assign PID, state and finally drain
-	 * process streams.
-	 * 
-	 * @param process
-	 *            - launched process
-	 */
-	protected void postExecStep(java.lang.Process process, DuccLogger logger,
-			boolean isKillCmd) {
-	    String methodName="postExecStep";
-
-	    if (!isKillCmd) {
-	      	int pid = Utils.getPID(process);
-			if (pid != -1) {
-				((ManagedProcess) managedProcess).setPid(String.valueOf(pid));
-				((ManagedProcess) managedProcess).getDuccProcess().setPID(
-						String.valueOf(pid));
-				/*
-				boolean isAPorJD = ((ManagedProcess) managedProcess).isJd()
-						|| ((ManagedProcess) managedProcess).getDuccProcess()
-								.getProcessType().equals(ProcessType.Pop);
-*/
-				// JDs and APs dond't report internal status to the agent
-				// (initializing or running) so assume these start and enter
-				// Running state
-				/*
-				if (isAPorJD
-						&& !((ManagedProcess) managedProcess).getDuccProcess()
-								.getProcessState().equals(ProcessState.Stopped)) {
-					((ManagedProcess) managedProcess).getDuccProcess()
-							.setProcessState(ProcessState.Running);
-				}
-*/
-				if ( !((ManagedProcess) managedProcess).getDuccProcess()
-				.getProcessState().equals(ProcessState.Stopped) ||
-				!((ManagedProcess) managedProcess).getDuccProcess()
-				.getProcessState().equals(ProcessState.Stopping) ||
-				!((ManagedProcess) managedProcess).getDuccProcess()
-				.getProcessState().equals(ProcessState.Failed) ||
-				!((ManagedProcess) managedProcess).getDuccProcess()
-				.getProcessState().equals(ProcessState.FailedInitialization)
-						) {
-	                ((ManagedProcess) managedProcess).getDuccProcess()
-			            .setProcessState(ProcessState.Started);
-	                
-				}
-				((ManagedProcess) managedProcess).setPid(String.valueOf(pid));
-				((ManagedProcess) managedProcess)
-				     .getDuccProcess().setPID(String.valueOf(pid));
-
-				try {
-					synchronized (this) {
-						// wait for 5 seconds before starting the camel route
-						// responsible for collecting process related stats.
-						// Allow
-						// enough time for the process to start.
-						wait(5000);
-					}
-				        logger.info(methodName, null,
-						">>>>>>>>> PID:"
-								+ String.valueOf(pid)
-								+ " Process State:"
-								+ ((ManagedProcess) managedProcess)
-										.getDuccProcess().getProcessState());
-
-				if ( !((ManagedProcess) managedProcess).getDuccProcess()
-				       .getProcessState().equals(ProcessState.Stopped) &&
-				     !((ManagedProcess) managedProcess).getDuccProcess()
-				       .getProcessState().equals(ProcessState.Stopping) &&
-				     !((ManagedProcess) managedProcess).getDuccProcess()
-				       .getProcessState().equals(ProcessState.Failed) &&
-				     !((ManagedProcess) managedProcess).getDuccProcess()
-				       .getProcessState().equals(ProcessState.FailedInitialization)
-						) {
-					RouteBuilder rb = agent.new ProcessMemoryUsageRoute(agent,
-							((ManagedProcess) managedProcess).getDuccProcess(),
-							(ManagedProcess) managedProcess);
-					agent.getContext().addRoutes(rb);
-					agent.getContext().startRoute(String.valueOf(pid));
-					logger.info(methodName, null,
-							"Started Process Metric Gathering Thread For PID:"
-									+ String.valueOf(pid));
-
-					StringBuffer sb = new StringBuffer();
-					for (Route route : agent.getContext().getRoutes()) {
-						sb.append("Camel Context - RouteId:" + route.getId()
-								+ "\n");
-					}
-					logger.info(methodName, null, sb.toString());
-
-					logger.info(methodName, null,
-							"Started Process Metric Gathering Thread For PID:"
-									+ String.valueOf(pid));
-
-
-				}
-
-				/*
-					RouteBuilder rb = agent.new ProcessMemoryUsageRoute(agent,
-							((ManagedProcess) managedProcess).getDuccProcess(),
-							(ManagedProcess) managedProcess);
-					agent.getContext().addRoutes(rb);
-					agent.getContext().startRoute(String.valueOf(pid));
-					logger.info(methodName, null,
-							"Started Process Metric Gathering Thread For PID:"
-									+ String.valueOf(pid));
-
-					StringBuffer sb = new StringBuffer();
-					for (Route route : agent.getContext().getRoutes()) {
-						sb.append("Camel Context - RouteId:" + route.getId()
-								+ "\n");
-					}
-					logger.info(methodName, null, sb.toString());
-
-					logger.info(methodName, null,
-							"Started Process Metric Gathering Thread For PID:"
-									+ String.valueOf(pid));
-				*/
-				} catch (Exception e) {
-					logger.error("postExecStep", null, e);
-				}
-
-			}
-	    }
-
-	    // Drain process streams in dedicated threads.
-	    ((ManagedProcess) managedProcess).drainProcessStreams(process, logger,System.out, isKillCmd);
-	}
-
-	/**
-	 * Called by Executor to exec a process.
-	 */
-	public Process call() throws Exception {
-		Process deployedProcess = null;
-		try {
-			// ICommandLine commandLine = ((ManagedProcess)
-			// managedProcess).getCommandLine();
-			Map<String, String> env = new HashMap<String, String>();
-			if (!isKillCommand(cmdLine)) {
-			  // UIMA-4935 Moved setting of JpUniqueId to DuccComamndExecutor where the cmdLine is not shared
-				// Enrich environment for the new process. Via these settings
-				// the UIMA AS
-				// service wrapper can notify the agent of its state.
-				env.put(IDuccUser.EnvironmentVariable.DUCC_IP.value(), ip);
-				env.put(IDuccUser.EnvironmentVariable.DUCC_NODENAME.value(), host);
-				// Add "friendly" process name for coordination with JD and OR
-				env.put(IDuccUser.EnvironmentVariable.DUCC_ID_PROCESS.value(), ((ManagedProcess) managedProcess)
-						.getDuccId().getFriendly()+"");
-				// Add unique process id. The process will send this along with its state update
-				env.put(IDuccUser.EnvironmentVariable.DUCC_PROCESS_UNIQUEID.value(), ((ManagedProcess) managedProcess).getDuccId().getUnique());
-
-				if (((ManagedProcess) managedProcess).getDuccProcess()
-						.getProcessType()
-						.equals(ProcessType.Job_Uima_AS_Process)) {
-					IDuccStandardInfo processInfo = ((ManagedProcess) managedProcess)
-							.getProcessInfo();
-					long maxInitTime = 0;
-
-					if (processInfo != null) {
-						maxInitTime = processInfo
-								.getProcessInitializationTimeMax();
-					}
-					agent.getLogger().info("CommandExecutor.call",
-							((ManagedProcess) managedProcess).getWorkDuccId(),
-							"Starting Process Initialization Monitor with Max Process Initialization Time:" + maxInitTime);
-					((ManagedProcess) managedProcess)
-							.startInitializationTimer(maxInitTime); 
-				}
-			}
-			deployedProcess = exec(cmdLine, env);
-		} catch (Exception e) {
-			e.printStackTrace();
-		} finally {
-			if (((ManagedProcess) managedProcess).getDuccProcess()
-					.getProcessType().equals(ProcessType.Job_Uima_AS_Process)) {
-				((ManagedProcess) managedProcess).stopInitializationTimer();
-			}
-		}
-		return deployedProcess;
-	}
-
-	protected boolean isKillCommand(ICommandLine cmdLine) {
-		return (cmdLine.getExecutable() != null && (cmdLine.getExecutable()
-				.startsWith("/bin/kill") || cmdLine.getExecutable().startsWith(
-				"taskkill")));
-	}
+  protected Process managedProcess = null;
+
+  protected String host;
+
+  protected String ip;
+
+  protected ICommandLine cmdLine;
+
+  protected NodeAgent agent;
+
+  public abstract void stop() throws Exception;
+
+  public abstract Process exec(ICommandLine commandLine, Map<String, String> processEnv)
+          throws Exception;
+
+  public CommandExecutor(NodeAgent agent, ICommandLine cmdLine, String host, String ip,
+          Process managedProcess) throws Exception {
+    this.agent = agent;
+    this.host = host;
+    this.ip = ip;
+    this.managedProcess = managedProcess;
+    this.cmdLine = cmdLine;
+  }
+
+  /**
+   * Called after process is launched. Assign PID, state and finally drain process streams.
+   * 
+   * @param process
+   *          - launched process
+   */
+  protected void postExecStep(java.lang.Process process, DuccLogger logger, boolean isKillCmd) {
+    String methodName = "postExecStep";
+
+    if (!isKillCmd) {
+      int pid = Utils.getPID(process);
+      if (pid != -1) {
+        ((ManagedProcess) managedProcess).setPid(String.valueOf(pid));
+        ((ManagedProcess) managedProcess).getDuccProcess().setPID(String.valueOf(pid));
+        /*
+         * boolean isAPorJD = ((ManagedProcess) managedProcess).isJd() || ((ManagedProcess)
+         * managedProcess).getDuccProcess() .getProcessType().equals(ProcessType.Pop);
+         */
+        // JDs and APs dond't report internal status to the agent
+        // (initializing or running) so assume these start and enter
+        // Running state
+        /*
+         * if (isAPorJD && !((ManagedProcess) managedProcess).getDuccProcess()
+         * .getProcessState().equals(ProcessState.Stopped)) { ((ManagedProcess)
+         * managedProcess).getDuccProcess() .setProcessState(ProcessState.Running); }
+         */
+        if (!((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+                .equals(ProcessState.Stopped)
+                || !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+                        .equals(ProcessState.Stopping)
+                || !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+                        .equals(ProcessState.Failed)
+                || !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+                        .equals(ProcessState.FailedInitialization)) {
+          ((ManagedProcess) managedProcess).getDuccProcess().setProcessState(ProcessState.Started);
+
+        }
+        ((ManagedProcess) managedProcess).setPid(String.valueOf(pid));
+        ((ManagedProcess) managedProcess).getDuccProcess().setPID(String.valueOf(pid));
+
+        try {
+          synchronized (this) {
+            // wait for 5 seconds before starting the camel route
+            // responsible for collecting process related stats.
+            // Allow
+            // enough time for the process to start.
+            wait(5000);
+          }
+          logger.info(methodName, null, ">>>>>>>>> PID:" + String.valueOf(pid) + " Process State:"
+                  + ((ManagedProcess) managedProcess).getDuccProcess().getProcessState());
+
+          if (!((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+                  .equals(ProcessState.Stopped)
+                  && !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+                          .equals(ProcessState.Stopping)
+                  && !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+                          .equals(ProcessState.Failed)
+                  && !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+                          .equals(ProcessState.FailedInitialization)) {
+            RouteBuilder rb = agent.new ProcessMemoryUsageRoute(agent,
+                    ((ManagedProcess) managedProcess).getDuccProcess(),
+                    (ManagedProcess) managedProcess);
+            agent.getContext().addRoutes(rb);
+            agent.getContext().startRoute(String.valueOf(pid));
+            logger.info(methodName, null,
+                    "Started Process Metric Gathering Thread For PID:" + String.valueOf(pid));
+
+            StringBuffer sb = new StringBuffer();
+            for (Route route : agent.getContext().getRoutes()) {
+              sb.append("Camel Context - RouteId:" + route.getId() + "\n");
+            }
+            logger.info(methodName, null, sb.toString());
+
+            logger.info(methodName, null,
+                    "Started Process Metric Gathering Thread For PID:" + String.valueOf(pid));
+
+          }
+
+          /*
+           * RouteBuilder rb = agent.new ProcessMemoryUsageRoute(agent, ((ManagedProcess)
+           * managedProcess).getDuccProcess(), (ManagedProcess) managedProcess);
+           * agent.getContext().addRoutes(rb); agent.getContext().startRoute(String.valueOf(pid));
+           * logger.info(methodName, null, "Started Process Metric Gathering Thread For PID:" +
+           * String.valueOf(pid));
+           * 
+           * StringBuffer sb = new StringBuffer(); for (Route route :
+           * agent.getContext().getRoutes()) { sb.append("Camel Context - RouteId:" + route.getId()
+           * + "\n"); } logger.info(methodName, null, sb.toString());
+           * 
+           * logger.info(methodName, null, "Started Process Metric Gathering Thread For PID:" +
+           * String.valueOf(pid));
+           */
+        } catch (Exception e) {
+          logger.error("postExecStep", null, e);
+        }
+
+      }
+    }
+
+    // Drain process streams in dedicated threads.
+    ((ManagedProcess) managedProcess).drainProcessStreams(process, logger, System.out, isKillCmd);
+  }
+
+  /**
+   * Called by Executor to exec a process.
+   */
+  public Process call() throws Exception {
+    Process deployedProcess = null;
+    try {
+      // ICommandLine commandLine = ((ManagedProcess)
+      // managedProcess).getCommandLine();
+      Map<String, String> env = new HashMap<String, String>();
+      if (!isKillCommand(cmdLine)) {
+        // UIMA-4935 Moved setting of JpUniqueId to DuccComamndExecutor where the cmdLine is not
+        // shared
+        // Enrich environment for the new process. Via these settings
+        // the UIMA AS
+        // service wrapper can notify the agent of its state.
+        env.put(IDuccUser.EnvironmentVariable.DUCC_IP.value(), ip);
+        env.put(IDuccUser.EnvironmentVariable.DUCC_NODENAME.value(), host);
+        // Add "friendly" process name for coordination with JD and OR
+        env.put(IDuccUser.EnvironmentVariable.DUCC_ID_PROCESS.value(),
+                ((ManagedProcess) managedProcess).getDuccId().getFriendly() + "");
+        // Add unique process id. The process will send this along with its state update
+        env.put(IDuccUser.EnvironmentVariable.DUCC_PROCESS_UNIQUEID.value(),
+                ((ManagedProcess) managedProcess).getDuccId().getUnique());
+
+        if (((ManagedProcess) managedProcess).getDuccProcess().getProcessType()
+                .equals(ProcessType.Job_Uima_AS_Process)) {
+          IDuccStandardInfo processInfo = ((ManagedProcess) managedProcess).getProcessInfo();
+          long maxInitTime = 0;
+
+          if (processInfo != null) {
+            maxInitTime = processInfo.getProcessInitializationTimeMax();
+          }
+          agent.getLogger().info("CommandExecutor.call",
+                  ((ManagedProcess) managedProcess).getWorkDuccId(),
+                  "Starting Process Initialization Monitor with Max Process Initialization Time:"
+                          + maxInitTime);
+          ((ManagedProcess) managedProcess).startInitializationTimer(maxInitTime);
+        }
+      }
+      deployedProcess = exec(cmdLine, env);
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (((ManagedProcess) managedProcess).getDuccProcess().getProcessType()
+              .equals(ProcessType.Job_Uima_AS_Process)) {
+        ((ManagedProcess) managedProcess).stopInitializationTimer();
+      }
+    }
+    return deployedProcess;
+  }
+
+  protected boolean isKillCommand(ICommandLine cmdLine) {
+    return (cmdLine.getExecutable() != null && (cmdLine.getExecutable().startsWith("/bin/kill")
+            || cmdLine.getExecutable().startsWith("taskkill")));
+  }
 }

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DefunctProcessDetector.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DefunctProcessDetector.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DefunctProcessDetector.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DefunctProcessDetector.java Fri Aug  2 17:27:48 2019
@@ -31,81 +31,87 @@ import org.apache.uima.ducc.transport.ev
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
 
 public class DefunctProcessDetector implements Runnable {
-	ManagedProcess childProcess;
-	DuccLogger logger;
-	public DefunctProcessDetector(ManagedProcess process, DuccLogger logger) {
-		childProcess = process;
-		this.logger = logger;
-	}
-	private  boolean isDefunctProcess(String pid) throws Exception {
-		boolean zombie = false;
-		InputStreamReader in = null;
-		String[] command = {
-				"/bin/ps",
-				"-ef",
-				pid};
-		try {
-			ProcessBuilder pb = new ProcessBuilder();
-			pb.command(command);
-
-			pb.redirectErrorStream(true);
-			java.lang.Process childProcess = pb.start();
-			in = new InputStreamReader(childProcess.getInputStream());
-			BufferedReader reader = new BufferedReader(in);
-			String line = null;
-			while ((line = reader.readLine()) != null) {
-				// the ps line will have string <defunct> if the 
-				// process is a zombie
-				zombie = (line.indexOf("defunct") > 0);
-				if ( zombie ) {
-   				   logger.info("DefunctProcessDetector.isDefunctProcess", null, "Process with PID:"+pid+" Is Defunct - OS reports:"+line);
-				    break;
-				}
-			}
-		} catch (Exception e) {
-			throw e;
-		} finally {
-			if (in != null) {
-				try {
-					in.close();
-				} catch (Exception e) {
-				}
-			}
-		}
-		return zombie;
-	}
-
-	public void run() {
-		try {
-			if (isDefunctProcess(childProcess.getPid())) {
-				logger.info("DefunctProcessDetector.run()", childProcess.getDuccProcess().getDuccId(), "Process with PID:"+childProcess.getPid()+" Is Defunct - Changing State to Stopped");
-				childProcess.getDuccProcess().setProcessState(ProcessState.Stopped);
-				childProcess.getDuccProcess().setReasonForStoppingProcess(ReasonForStoppingProcess.Defunct.name());
-			} else {
-				logger.debug("DefunctProcessDetector.run()", childProcess.getDuccProcess().getDuccId(), "Process with PID:"+childProcess.getPid()+" Not Defunct");
-
-			}
-		} catch( Exception e) {
-			logger.error("DefunctProcessDetector.run()", childProcess.getDuccProcess().getDuccId(), e);
-		}
-		
-	}
-	public static void main(String[] args) {
-		try {
-			DuccLogger logger = new DuccLogger(DefunctProcessDetector.class);
-			DuccIdFactory factory = new DuccIdFactory();
-			DuccId duccId = factory.next();
-			NodeIdentity nid = new NodeIdentity();
-			IDuccProcess process = new DuccProcess(duccId, nid);
-			ManagedProcess p = new ManagedProcess(process, null);
-			process.setProcessState(ProcessState.Initializing);
-			process.setPID(args[0]);
-			p.setPid(args[0]);
-			DefunctProcessDetector detector = new DefunctProcessDetector(p, logger);
-			detector.run();
-		} catch( Exception e) {
-			e.printStackTrace();
-		}
-	}
+  ManagedProcess childProcess;
+
+  DuccLogger logger;
+
+  public DefunctProcessDetector(ManagedProcess process, DuccLogger logger) {
+    childProcess = process;
+    this.logger = logger;
+  }
+
+  private boolean isDefunctProcess(String pid) throws Exception {
+    boolean zombie = false;
+    InputStreamReader in = null;
+    String[] command = { "/bin/ps", "-ef", pid };
+    try {
+      ProcessBuilder pb = new ProcessBuilder();
+      pb.command(command);
+
+      pb.redirectErrorStream(true);
+      java.lang.Process childProcess = pb.start();
+      in = new InputStreamReader(childProcess.getInputStream());
+      BufferedReader reader = new BufferedReader(in);
+      String line = null;
+      while ((line = reader.readLine()) != null) {
+        // the ps line will have string <defunct> if the
+        // process is a zombie
+        zombie = (line.indexOf("defunct") > 0);
+        if (zombie) {
+          logger.info("DefunctProcessDetector.isDefunctProcess", null,
+                  "Process with PID:" + pid + " Is Defunct - OS reports:" + line);
+          break;
+        }
+      }
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      if (in != null) {
+        try {
+          in.close();
+        } catch (Exception e) {
+        }
+      }
+    }
+    return zombie;
+  }
+
+  public void run() {
+    try {
+      if (isDefunctProcess(childProcess.getPid())) {
+        logger.info("DefunctProcessDetector.run()", childProcess.getDuccProcess().getDuccId(),
+                "Process with PID:" + childProcess.getPid()
+                        + " Is Defunct - Changing State to Stopped");
+        childProcess.getDuccProcess().setProcessState(ProcessState.Stopped);
+        childProcess.getDuccProcess()
+                .setReasonForStoppingProcess(ReasonForStoppingProcess.Defunct.name());
+      } else {
+        logger.debug("DefunctProcessDetector.run()", childProcess.getDuccProcess().getDuccId(),
+                "Process with PID:" + childProcess.getPid() + " Not Defunct");
+
+      }
+    } catch (Exception e) {
+      logger.error("DefunctProcessDetector.run()", childProcess.getDuccProcess().getDuccId(), e);
+    }
+
+  }
+
+  public static void main(String[] args) {
+    try {
+      DuccLogger logger = new DuccLogger(DefunctProcessDetector.class);
+      DuccIdFactory factory = new DuccIdFactory();
+      DuccId duccId = factory.next();
+      NodeIdentity nid = new NodeIdentity();
+      IDuccProcess process = new DuccProcess(duccId, nid);
+      ManagedProcess p = new ManagedProcess(process, null);
+      process.setProcessState(ProcessState.Initializing);
+      process.setPID(args[0]);
+      p.setPid(args[0]);
+      DefunctProcessDetector detector = new DefunctProcessDetector(p, logger);
+      detector.run();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
 
 }