You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2019/08/02 17:27:49 UTC
svn commit: r1864247 [6/10] - in
/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent:
./ config/ deploy/ deploy/uima/ event/ exceptions/ launcher/
metrics/collectors/ monitor/ processors/
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsTest.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsTest.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsTest.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsTest.java Fri Aug 2 17:27:48 2019
@@ -28,117 +28,126 @@ import org.apache.uima.ducc.agent.NodeAg
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.id.DuccIdFactory;
-
public class CGroupsTest {
- public static DuccLogger logger = DuccLogger.getLogger(NodeAgent.class, "CGroupsTest");
- CGroupsManager cgroupsManager = null;
- DuccIdFactory idFactory = null;
- Lock lock = new ReentrantLock();
-
- public static void main(String[] args) {
- try {
- CGroupsTest tester = new CGroupsTest();
- tester.initialize();
- if ( args.length > 1) {
- // run concurrent threads
- tester.run(Long.parseLong(args[0]), true);
- } else {
- // run sequentially
- tester.run(Long.parseLong(args[0]), false);
- }
- } catch( Exception e) {
- e.printStackTrace();
- }
- }
- public void run(long howMany, boolean concurrent) {
- try {
- CGroupsTest tester = new CGroupsTest();
- tester.initialize();
- ExecutorService executor = Executors.newCachedThreadPool();
- // more than 1 arg to this program = concurrent
- if ( concurrent ) {
- for (int i = 0; i < howMany; i++) {
- WorkerThread t = new WorkerThread();
- executor.execute(t);
- // if the wait below is removed, cgroup creation fails
- synchronized(t) {
- // NOTE: waiting for 100ms seems to make cgcreate working.
- // Tested 10ms and got a failure to create cgroup. Weird.
- t.wait(100);
- }
- }
- } else {
- for (int i = 0; i < howMany; i++) {
- WorkerThread t = new WorkerThread();
- Future<?> f = executor.submit(t);
- f.get();
-
- }
- }
- executor.shutdownNow();
- } catch( Exception e) {
- e.printStackTrace();
- }
-
- }
- public void initialize() throws Exception {
-
- idFactory = new DuccIdFactory();
- String cgroupsUtilsDirs = System.getProperty("ducc.agent.launcher.cgroups.utils.dir");
- String cgUtilsPath=null;
- if (cgroupsUtilsDirs == null) {
- cgUtilsPath = "/bin"; // default
- }
- // get the top level cgroup folder from ducc.properties. If
- // not defined, use /cgroup/ducc as default
- String cgroupsBaseDir = System.getProperty("ducc.agent.launcher.cgroups.basedir");
- if (cgroupsBaseDir == null) {
- cgroupsBaseDir = "/cgroup/ducc";
- }
- String cgroupsSubsystems = "memory,cpu";
- long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
+ public static DuccLogger logger = DuccLogger.getLogger(NodeAgent.class, "CGroupsTest");
+
+ CGroupsManager cgroupsManager = null;
+
+ DuccIdFactory idFactory = null;
- cgroupsManager =
- new CGroupsManager(cgUtilsPath, cgroupsBaseDir, cgroupsSubsystems, logger, maxTimeToWaitForProcessToStop);
+ Lock lock = new ReentrantLock();
-
- }
-
- public class WorkerThread implements Runnable {
- public WorkerThread() {
-
- }
- public void run() {
- try {
- String containerId;
- lock.lock();
- containerId = idFactory.next().toString()+"."+idFactory.next().toString();
-
- System.out.println(">>>> Thread::"+Thread.currentThread().getId()+" creating cgroup with id:"+containerId);
- if ( !cgroupsManager.createContainer(containerId, System.getProperty("user.name"),
- cgroupsManager.getUserGroupName(System.getProperty("user.name")),
- true) ) {
- System.out.println("Thread::"+Thread.currentThread().getId()+" Failure to create cgroup with id:"+containerId);
- System.exit(-1);
-
- } else {
- if ( cgroupsManager.cgroupExists(cgroupsManager.getDuccCGroupBaseDir() + "/" + containerId) ) {
- System.out.println("Thread::"+Thread.currentThread().getId()+" Success creating cgroup with id:"+containerId);
-
- cgroupsManager.setContainerSwappiness(containerId, cgroupsManager.getDuccUid(), true, 10);
- } else {
- System.out.println("Failed to validate existance of cgroup with id:"+containerId);
- System.exit(-1);
- }
- }
- cgroupsManager.destroyContainer(containerId, cgroupsManager.getDuccUid(), NodeAgent.SIGTERM);
- System.out.println("Cgroup "+containerId+" Removed");
- } catch( Exception e ) {
- e.printStackTrace();
- //System.exit(-1);
- } finally {
- lock.unlock();
- }
- }
- }
+ public static void main(String[] args) {
+ try {
+ CGroupsTest tester = new CGroupsTest();
+ tester.initialize();
+ if (args.length > 1) {
+ // run concurrent threads
+ tester.run(Long.parseLong(args[0]), true);
+ } else {
+ // run sequentially
+ tester.run(Long.parseLong(args[0]), false);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void run(long howMany, boolean concurrent) {
+ try {
+ CGroupsTest tester = new CGroupsTest();
+ tester.initialize();
+ ExecutorService executor = Executors.newCachedThreadPool();
+ // more than 1 arg to this program = concurrent
+ if (concurrent) {
+ for (int i = 0; i < howMany; i++) {
+ WorkerThread t = new WorkerThread();
+ executor.execute(t);
+ // if the wait below is removed, cgroup creation fails
+ synchronized (t) {
+ // NOTE: waiting for 100ms seems to make cgcreate working.
+ // Tested 10ms and got a failure to create cgroup. Weird.
+ t.wait(100);
+ }
+ }
+ } else {
+ for (int i = 0; i < howMany; i++) {
+ WorkerThread t = new WorkerThread();
+ Future<?> f = executor.submit(t);
+ f.get();
+
+ }
+ }
+ executor.shutdownNow();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public void initialize() throws Exception {
+
+ idFactory = new DuccIdFactory();
+ String cgroupsUtilsDirs = System.getProperty("ducc.agent.launcher.cgroups.utils.dir");
+ String cgUtilsPath = null;
+ if (cgroupsUtilsDirs == null) {
+ cgUtilsPath = "/bin"; // default
+ }
+ // get the top level cgroup folder from ducc.properties. If
+ // not defined, use /cgroup/ducc as default
+ String cgroupsBaseDir = System.getProperty("ducc.agent.launcher.cgroups.basedir");
+ if (cgroupsBaseDir == null) {
+ cgroupsBaseDir = "/cgroup/ducc";
+ }
+ String cgroupsSubsystems = "memory,cpu";
+ long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
+
+ cgroupsManager = new CGroupsManager(cgUtilsPath, cgroupsBaseDir, cgroupsSubsystems, logger,
+ maxTimeToWaitForProcessToStop);
+
+ }
+
+ public class WorkerThread implements Runnable {
+ public WorkerThread() {
+
+ }
+
+ public void run() {
+ try {
+ String containerId;
+ lock.lock();
+ containerId = idFactory.next().toString() + "." + idFactory.next().toString();
+
+ System.out.println(">>>> Thread::" + Thread.currentThread().getId()
+ + " creating cgroup with id:" + containerId);
+ if (!cgroupsManager.createContainer(containerId, System.getProperty("user.name"),
+ cgroupsManager.getUserGroupName(System.getProperty("user.name")), true)) {
+ System.out.println("Thread::" + Thread.currentThread().getId()
+ + " Failure to create cgroup with id:" + containerId);
+ System.exit(-1);
+
+ } else {
+ if (cgroupsManager
+ .cgroupExists(cgroupsManager.getDuccCGroupBaseDir() + "/" + containerId)) {
+ System.out.println("Thread::" + Thread.currentThread().getId()
+ + " Success creating cgroup with id:" + containerId);
+
+ cgroupsManager.setContainerSwappiness(containerId, cgroupsManager.getDuccUid(), true,
+ 10);
+ } else {
+ System.out.println("Failed to validate existance of cgroup with id:" + containerId);
+ System.exit(-1);
+ }
+ }
+ cgroupsManager.destroyContainer(containerId, cgroupsManager.getDuccUid(),
+ NodeAgent.SIGTERM);
+ System.out.println("Cgroup " + containerId + " Removed");
+ } catch (Exception e) {
+ e.printStackTrace();
+ // System.exit(-1);
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CommandBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CommandBuilder.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CommandBuilder.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CommandBuilder.java Fri Aug 2 17:27:48 2019
@@ -24,39 +24,35 @@ import org.apache.uima.internal.util.Arr
public class CommandBuilder {
- private static final String ducclingPath = Utils.resolvePlaceholderIfExists(
- System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
- System.getProperties());
-
- public CommandBuilder() {
-
- }
- private static boolean useDuccling(ManagedProcess process) {
- if ( process.isAgentProcess() || Utils.isWindows()) {
- return false;
- }
- // On non-windows check if we should spawn the process via ducc_ling
- String useDuccling = System
- .getProperty("ducc.agent.launcher.use.ducc_spawn");
- return ("true".equalsIgnoreCase(useDuccling) ? true : false );
- }
-
- public static String[] deployableStopCommand(ICommandLine cmdLine, ManagedProcess process) {
- String[] cmd;
- // Duccling, with no logging, always run by ducc, no need for
- // workingdir
- String[] ducclingNolog = new String[] { ducclingPath, "-u",
- process.getOwner(), "--" };
-
- if ( useDuccling(process) ) {
- cmd = (String[]) ArrayUtils.combine( ducclingNolog,
- (String[]) ArrayUtils.combine(new String[] { cmdLine.getExecutable() },
- cmdLine.getCommandLine()));
- } else {
- cmd = (String[]) ArrayUtils.combine(
- new String[] { cmdLine.getExecutable() },
- cmdLine.getCommandLine());
- }
- return cmd;
- }
+ private static final String ducclingPath = Utils.resolvePlaceholderIfExists(
+ System.getProperty("ducc.agent.launcher.ducc_spawn_path"), System.getProperties());
+
+ public CommandBuilder() {
+
+ }
+
+ private static boolean useDuccling(ManagedProcess process) {
+ if (process.isAgentProcess() || Utils.isWindows()) {
+ return false;
+ }
+ // On non-windows check if we should spawn the process via ducc_ling
+ String useDuccling = System.getProperty("ducc.agent.launcher.use.ducc_spawn");
+ return ("true".equalsIgnoreCase(useDuccling) ? true : false);
+ }
+
+ public static String[] deployableStopCommand(ICommandLine cmdLine, ManagedProcess process) {
+ String[] cmd;
+ // Duccling, with no logging, always run by ducc, no need for
+ // workingdir
+ String[] ducclingNolog = new String[] { ducclingPath, "-u", process.getOwner(), "--" };
+
+ if (useDuccling(process)) {
+ cmd = (String[]) ArrayUtils.combine(ducclingNolog, (String[]) ArrayUtils
+ .combine(new String[] { cmdLine.getExecutable() }, cmdLine.getCommandLine()));
+ } else {
+ cmd = (String[]) ArrayUtils.combine(new String[] { cmdLine.getExecutable() },
+ cmdLine.getCommandLine());
+ }
+ return cmd;
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java Fri Aug 2 17:27:48 2019
@@ -34,210 +34,186 @@ import org.apache.uima.ducc.transport.ev
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
public abstract class CommandExecutor implements Callable<Process> {
- protected Process managedProcess = null;
- protected String host;
- protected String ip;
- protected ICommandLine cmdLine;
- protected NodeAgent agent;
-
- public abstract void stop() throws Exception;
-
- public abstract Process exec(ICommandLine commandLine,
- Map<String, String> processEnv) throws Exception;
-
- public CommandExecutor(NodeAgent agent, ICommandLine cmdLine, String host,
- String ip, Process managedProcess) throws Exception {
- this.agent = agent;
- this.host = host;
- this.ip = ip;
- this.managedProcess = managedProcess;
- this.cmdLine = cmdLine;
- }
-
- /**
- * Called after process is launched. Assign PID, state and finally drain
- * process streams.
- *
- * @param process
- * - launched process
- */
- protected void postExecStep(java.lang.Process process, DuccLogger logger,
- boolean isKillCmd) {
- String methodName="postExecStep";
-
- if (!isKillCmd) {
- int pid = Utils.getPID(process);
- if (pid != -1) {
- ((ManagedProcess) managedProcess).setPid(String.valueOf(pid));
- ((ManagedProcess) managedProcess).getDuccProcess().setPID(
- String.valueOf(pid));
- /*
- boolean isAPorJD = ((ManagedProcess) managedProcess).isJd()
- || ((ManagedProcess) managedProcess).getDuccProcess()
- .getProcessType().equals(ProcessType.Pop);
-*/
- // JDs and APs dond't report internal status to the agent
- // (initializing or running) so assume these start and enter
- // Running state
- /*
- if (isAPorJD
- && !((ManagedProcess) managedProcess).getDuccProcess()
- .getProcessState().equals(ProcessState.Stopped)) {
- ((ManagedProcess) managedProcess).getDuccProcess()
- .setProcessState(ProcessState.Running);
- }
-*/
- if ( !((ManagedProcess) managedProcess).getDuccProcess()
- .getProcessState().equals(ProcessState.Stopped) ||
- !((ManagedProcess) managedProcess).getDuccProcess()
- .getProcessState().equals(ProcessState.Stopping) ||
- !((ManagedProcess) managedProcess).getDuccProcess()
- .getProcessState().equals(ProcessState.Failed) ||
- !((ManagedProcess) managedProcess).getDuccProcess()
- .getProcessState().equals(ProcessState.FailedInitialization)
- ) {
- ((ManagedProcess) managedProcess).getDuccProcess()
- .setProcessState(ProcessState.Started);
-
- }
- ((ManagedProcess) managedProcess).setPid(String.valueOf(pid));
- ((ManagedProcess) managedProcess)
- .getDuccProcess().setPID(String.valueOf(pid));
-
- try {
- synchronized (this) {
- // wait for 5 seconds before starting the camel route
- // responsible for collecting process related stats.
- // Allow
- // enough time for the process to start.
- wait(5000);
- }
- logger.info(methodName, null,
- ">>>>>>>>> PID:"
- + String.valueOf(pid)
- + " Process State:"
- + ((ManagedProcess) managedProcess)
- .getDuccProcess().getProcessState());
-
- if ( !((ManagedProcess) managedProcess).getDuccProcess()
- .getProcessState().equals(ProcessState.Stopped) &&
- !((ManagedProcess) managedProcess).getDuccProcess()
- .getProcessState().equals(ProcessState.Stopping) &&
- !((ManagedProcess) managedProcess).getDuccProcess()
- .getProcessState().equals(ProcessState.Failed) &&
- !((ManagedProcess) managedProcess).getDuccProcess()
- .getProcessState().equals(ProcessState.FailedInitialization)
- ) {
- RouteBuilder rb = agent.new ProcessMemoryUsageRoute(agent,
- ((ManagedProcess) managedProcess).getDuccProcess(),
- (ManagedProcess) managedProcess);
- agent.getContext().addRoutes(rb);
- agent.getContext().startRoute(String.valueOf(pid));
- logger.info(methodName, null,
- "Started Process Metric Gathering Thread For PID:"
- + String.valueOf(pid));
-
- StringBuffer sb = new StringBuffer();
- for (Route route : agent.getContext().getRoutes()) {
- sb.append("Camel Context - RouteId:" + route.getId()
- + "\n");
- }
- logger.info(methodName, null, sb.toString());
-
- logger.info(methodName, null,
- "Started Process Metric Gathering Thread For PID:"
- + String.valueOf(pid));
-
-
- }
-
- /*
- RouteBuilder rb = agent.new ProcessMemoryUsageRoute(agent,
- ((ManagedProcess) managedProcess).getDuccProcess(),
- (ManagedProcess) managedProcess);
- agent.getContext().addRoutes(rb);
- agent.getContext().startRoute(String.valueOf(pid));
- logger.info(methodName, null,
- "Started Process Metric Gathering Thread For PID:"
- + String.valueOf(pid));
-
- StringBuffer sb = new StringBuffer();
- for (Route route : agent.getContext().getRoutes()) {
- sb.append("Camel Context - RouteId:" + route.getId()
- + "\n");
- }
- logger.info(methodName, null, sb.toString());
-
- logger.info(methodName, null,
- "Started Process Metric Gathering Thread For PID:"
- + String.valueOf(pid));
- */
- } catch (Exception e) {
- logger.error("postExecStep", null, e);
- }
-
- }
- }
-
- // Drain process streams in dedicated threads.
- ((ManagedProcess) managedProcess).drainProcessStreams(process, logger,System.out, isKillCmd);
- }
-
- /**
- * Called by Executor to exec a process.
- */
- public Process call() throws Exception {
- Process deployedProcess = null;
- try {
- // ICommandLine commandLine = ((ManagedProcess)
- // managedProcess).getCommandLine();
- Map<String, String> env = new HashMap<String, String>();
- if (!isKillCommand(cmdLine)) {
- // UIMA-4935 Moved setting of JpUniqueId to DuccComamndExecutor where the cmdLine is not shared
- // Enrich environment for the new process. Via these settings
- // the UIMA AS
- // service wrapper can notify the agent of its state.
- env.put(IDuccUser.EnvironmentVariable.DUCC_IP.value(), ip);
- env.put(IDuccUser.EnvironmentVariable.DUCC_NODENAME.value(), host);
- // Add "friendly" process name for coordination with JD and OR
- env.put(IDuccUser.EnvironmentVariable.DUCC_ID_PROCESS.value(), ((ManagedProcess) managedProcess)
- .getDuccId().getFriendly()+"");
- // Add unique process id. The process will send this along with its state update
- env.put(IDuccUser.EnvironmentVariable.DUCC_PROCESS_UNIQUEID.value(), ((ManagedProcess) managedProcess).getDuccId().getUnique());
-
- if (((ManagedProcess) managedProcess).getDuccProcess()
- .getProcessType()
- .equals(ProcessType.Job_Uima_AS_Process)) {
- IDuccStandardInfo processInfo = ((ManagedProcess) managedProcess)
- .getProcessInfo();
- long maxInitTime = 0;
-
- if (processInfo != null) {
- maxInitTime = processInfo
- .getProcessInitializationTimeMax();
- }
- agent.getLogger().info("CommandExecutor.call",
- ((ManagedProcess) managedProcess).getWorkDuccId(),
- "Starting Process Initialization Monitor with Max Process Initialization Time:" + maxInitTime);
- ((ManagedProcess) managedProcess)
- .startInitializationTimer(maxInitTime);
- }
- }
- deployedProcess = exec(cmdLine, env);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (((ManagedProcess) managedProcess).getDuccProcess()
- .getProcessType().equals(ProcessType.Job_Uima_AS_Process)) {
- ((ManagedProcess) managedProcess).stopInitializationTimer();
- }
- }
- return deployedProcess;
- }
-
- protected boolean isKillCommand(ICommandLine cmdLine) {
- return (cmdLine.getExecutable() != null && (cmdLine.getExecutable()
- .startsWith("/bin/kill") || cmdLine.getExecutable().startsWith(
- "taskkill")));
- }
+ protected Process managedProcess = null;
+
+ protected String host;
+
+ protected String ip;
+
+ protected ICommandLine cmdLine;
+
+ protected NodeAgent agent;
+
+ public abstract void stop() throws Exception;
+
+ public abstract Process exec(ICommandLine commandLine, Map<String, String> processEnv)
+ throws Exception;
+
+ public CommandExecutor(NodeAgent agent, ICommandLine cmdLine, String host, String ip,
+ Process managedProcess) throws Exception {
+ this.agent = agent;
+ this.host = host;
+ this.ip = ip;
+ this.managedProcess = managedProcess;
+ this.cmdLine = cmdLine;
+ }
+
+ /**
+ * Called after process is launched. Assign PID, state and finally drain process streams.
+ *
+ * @param process
+ * - launched process
+ */
+ protected void postExecStep(java.lang.Process process, DuccLogger logger, boolean isKillCmd) {
+ String methodName = "postExecStep";
+
+ if (!isKillCmd) {
+ int pid = Utils.getPID(process);
+ if (pid != -1) {
+ ((ManagedProcess) managedProcess).setPid(String.valueOf(pid));
+ ((ManagedProcess) managedProcess).getDuccProcess().setPID(String.valueOf(pid));
+ /*
+ * boolean isAPorJD = ((ManagedProcess) managedProcess).isJd() || ((ManagedProcess)
+ * managedProcess).getDuccProcess() .getProcessType().equals(ProcessType.Pop);
+ */
+ // JDs and APs dond't report internal status to the agent
+ // (initializing or running) so assume these start and enter
+ // Running state
+ /*
+ * if (isAPorJD && !((ManagedProcess) managedProcess).getDuccProcess()
+ * .getProcessState().equals(ProcessState.Stopped)) { ((ManagedProcess)
+ * managedProcess).getDuccProcess() .setProcessState(ProcessState.Running); }
+ */
+ if (!((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+ .equals(ProcessState.Stopped)
+ || !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+ .equals(ProcessState.Stopping)
+ || !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+ .equals(ProcessState.Failed)
+ || !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+ .equals(ProcessState.FailedInitialization)) {
+ ((ManagedProcess) managedProcess).getDuccProcess().setProcessState(ProcessState.Started);
+
+ }
+ ((ManagedProcess) managedProcess).setPid(String.valueOf(pid));
+ ((ManagedProcess) managedProcess).getDuccProcess().setPID(String.valueOf(pid));
+
+ try {
+ synchronized (this) {
+ // wait for 5 seconds before starting the camel route
+ // responsible for collecting process related stats.
+ // Allow
+ // enough time for the process to start.
+ wait(5000);
+ }
+ logger.info(methodName, null, ">>>>>>>>> PID:" + String.valueOf(pid) + " Process State:"
+ + ((ManagedProcess) managedProcess).getDuccProcess().getProcessState());
+
+ if (!((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+ .equals(ProcessState.Stopped)
+ && !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+ .equals(ProcessState.Stopping)
+ && !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+ .equals(ProcessState.Failed)
+ && !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+ .equals(ProcessState.FailedInitialization)) {
+ RouteBuilder rb = agent.new ProcessMemoryUsageRoute(agent,
+ ((ManagedProcess) managedProcess).getDuccProcess(),
+ (ManagedProcess) managedProcess);
+ agent.getContext().addRoutes(rb);
+ agent.getContext().startRoute(String.valueOf(pid));
+ logger.info(methodName, null,
+ "Started Process Metric Gathering Thread For PID:" + String.valueOf(pid));
+
+ StringBuffer sb = new StringBuffer();
+ for (Route route : agent.getContext().getRoutes()) {
+ sb.append("Camel Context - RouteId:" + route.getId() + "\n");
+ }
+ logger.info(methodName, null, sb.toString());
+
+ logger.info(methodName, null,
+ "Started Process Metric Gathering Thread For PID:" + String.valueOf(pid));
+
+ }
+
+ /*
+ * RouteBuilder rb = agent.new ProcessMemoryUsageRoute(agent, ((ManagedProcess)
+ * managedProcess).getDuccProcess(), (ManagedProcess) managedProcess);
+ * agent.getContext().addRoutes(rb); agent.getContext().startRoute(String.valueOf(pid));
+ * logger.info(methodName, null, "Started Process Metric Gathering Thread For PID:" +
+ * String.valueOf(pid));
+ *
+ * StringBuffer sb = new StringBuffer(); for (Route route :
+ * agent.getContext().getRoutes()) { sb.append("Camel Context - RouteId:" + route.getId()
+ * + "\n"); } logger.info(methodName, null, sb.toString());
+ *
+ * logger.info(methodName, null, "Started Process Metric Gathering Thread For PID:" +
+ * String.valueOf(pid));
+ */
+ } catch (Exception e) {
+ logger.error("postExecStep", null, e);
+ }
+
+ }
+ }
+
+ // Drain process streams in dedicated threads.
+ ((ManagedProcess) managedProcess).drainProcessStreams(process, logger, System.out, isKillCmd);
+ }
+
+ /**
+ * Called by Executor to exec a process.
+ */
+ public Process call() throws Exception {
+ Process deployedProcess = null;
+ try {
+ // ICommandLine commandLine = ((ManagedProcess)
+ // managedProcess).getCommandLine();
+ Map<String, String> env = new HashMap<String, String>();
+ if (!isKillCommand(cmdLine)) {
+ // UIMA-4935 Moved setting of JpUniqueId to DuccComamndExecutor where the cmdLine is not
+ // shared
+ // Enrich environment for the new process. Via these settings
+ // the UIMA AS
+ // service wrapper can notify the agent of its state.
+ env.put(IDuccUser.EnvironmentVariable.DUCC_IP.value(), ip);
+ env.put(IDuccUser.EnvironmentVariable.DUCC_NODENAME.value(), host);
+ // Add "friendly" process name for coordination with JD and OR
+ env.put(IDuccUser.EnvironmentVariable.DUCC_ID_PROCESS.value(),
+ ((ManagedProcess) managedProcess).getDuccId().getFriendly() + "");
+ // Add unique process id. The process will send this along with its state update
+ env.put(IDuccUser.EnvironmentVariable.DUCC_PROCESS_UNIQUEID.value(),
+ ((ManagedProcess) managedProcess).getDuccId().getUnique());
+
+ if (((ManagedProcess) managedProcess).getDuccProcess().getProcessType()
+ .equals(ProcessType.Job_Uima_AS_Process)) {
+ IDuccStandardInfo processInfo = ((ManagedProcess) managedProcess).getProcessInfo();
+ long maxInitTime = 0;
+
+ if (processInfo != null) {
+ maxInitTime = processInfo.getProcessInitializationTimeMax();
+ }
+ agent.getLogger().info("CommandExecutor.call",
+ ((ManagedProcess) managedProcess).getWorkDuccId(),
+ "Starting Process Initialization Monitor with Max Process Initialization Time:"
+ + maxInitTime);
+ ((ManagedProcess) managedProcess).startInitializationTimer(maxInitTime);
+ }
+ }
+ deployedProcess = exec(cmdLine, env);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (((ManagedProcess) managedProcess).getDuccProcess().getProcessType()
+ .equals(ProcessType.Job_Uima_AS_Process)) {
+ ((ManagedProcess) managedProcess).stopInitializationTimer();
+ }
+ }
+ return deployedProcess;
+ }
+
+ protected boolean isKillCommand(ICommandLine cmdLine) {
+ return (cmdLine.getExecutable() != null && (cmdLine.getExecutable().startsWith("/bin/kill")
+ || cmdLine.getExecutable().startsWith("taskkill")));
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DefunctProcessDetector.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DefunctProcessDetector.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DefunctProcessDetector.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DefunctProcessDetector.java Fri Aug 2 17:27:48 2019
@@ -31,81 +31,87 @@ import org.apache.uima.ducc.transport.ev
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
public class DefunctProcessDetector implements Runnable {
- ManagedProcess childProcess;
- DuccLogger logger;
- public DefunctProcessDetector(ManagedProcess process, DuccLogger logger) {
- childProcess = process;
- this.logger = logger;
- }
- private boolean isDefunctProcess(String pid) throws Exception {
- boolean zombie = false;
- InputStreamReader in = null;
- String[] command = {
- "/bin/ps",
- "-ef",
- pid};
- try {
- ProcessBuilder pb = new ProcessBuilder();
- pb.command(command);
-
- pb.redirectErrorStream(true);
- java.lang.Process childProcess = pb.start();
- in = new InputStreamReader(childProcess.getInputStream());
- BufferedReader reader = new BufferedReader(in);
- String line = null;
- while ((line = reader.readLine()) != null) {
- // the ps line will have string <defunct> if the
- // process is a zombie
- zombie = (line.indexOf("defunct") > 0);
- if ( zombie ) {
- logger.info("DefunctProcessDetector.isDefunctProcess", null, "Process with PID:"+pid+" Is Defunct - OS reports:"+line);
- break;
- }
- }
- } catch (Exception e) {
- throw e;
- } finally {
- if (in != null) {
- try {
- in.close();
- } catch (Exception e) {
- }
- }
- }
- return zombie;
- }
-
- public void run() {
- try {
- if (isDefunctProcess(childProcess.getPid())) {
- logger.info("DefunctProcessDetector.run()", childProcess.getDuccProcess().getDuccId(), "Process with PID:"+childProcess.getPid()+" Is Defunct - Changing State to Stopped");
- childProcess.getDuccProcess().setProcessState(ProcessState.Stopped);
- childProcess.getDuccProcess().setReasonForStoppingProcess(ReasonForStoppingProcess.Defunct.name());
- } else {
- logger.debug("DefunctProcessDetector.run()", childProcess.getDuccProcess().getDuccId(), "Process with PID:"+childProcess.getPid()+" Not Defunct");
-
- }
- } catch( Exception e) {
- logger.error("DefunctProcessDetector.run()", childProcess.getDuccProcess().getDuccId(), e);
- }
-
- }
- public static void main(String[] args) {
- try {
- DuccLogger logger = new DuccLogger(DefunctProcessDetector.class);
- DuccIdFactory factory = new DuccIdFactory();
- DuccId duccId = factory.next();
- NodeIdentity nid = new NodeIdentity();
- IDuccProcess process = new DuccProcess(duccId, nid);
- ManagedProcess p = new ManagedProcess(process, null);
- process.setProcessState(ProcessState.Initializing);
- process.setPID(args[0]);
- p.setPid(args[0]);
- DefunctProcessDetector detector = new DefunctProcessDetector(p, logger);
- detector.run();
- } catch( Exception e) {
- e.printStackTrace();
- }
- }
+ ManagedProcess childProcess;
+
+ DuccLogger logger;
+
+ public DefunctProcessDetector(ManagedProcess process, DuccLogger logger) {
+ childProcess = process;
+ this.logger = logger;
+ }
+
+ private boolean isDefunctProcess(String pid) throws Exception {
+ boolean zombie = false;
+ InputStreamReader in = null;
+ String[] command = { "/bin/ps", "-ef", pid };
+ try {
+ ProcessBuilder pb = new ProcessBuilder();
+ pb.command(command);
+
+ pb.redirectErrorStream(true);
+ java.lang.Process childProcess = pb.start();
+ in = new InputStreamReader(childProcess.getInputStream());
+ BufferedReader reader = new BufferedReader(in);
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ // the ps line will have string <defunct> if the
+ // process is a zombie
+ zombie = (line.indexOf("defunct") > 0);
+ if (zombie) {
+ logger.info("DefunctProcessDetector.isDefunctProcess", null,
+ "Process with PID:" + pid + " Is Defunct - OS reports:" + line);
+ break;
+ }
+ }
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+ return zombie;
+ }
+
+ public void run() {
+ try {
+ if (isDefunctProcess(childProcess.getPid())) {
+ logger.info("DefunctProcessDetector.run()", childProcess.getDuccProcess().getDuccId(),
+ "Process with PID:" + childProcess.getPid()
+ + " Is Defunct - Changing State to Stopped");
+ childProcess.getDuccProcess().setProcessState(ProcessState.Stopped);
+ childProcess.getDuccProcess()
+ .setReasonForStoppingProcess(ReasonForStoppingProcess.Defunct.name());
+ } else {
+ logger.debug("DefunctProcessDetector.run()", childProcess.getDuccProcess().getDuccId(),
+ "Process with PID:" + childProcess.getPid() + " Not Defunct");
+
+ }
+ } catch (Exception e) {
+ logger.error("DefunctProcessDetector.run()", childProcess.getDuccProcess().getDuccId(), e);
+ }
+
+ }
+
+ public static void main(String[] args) {
+ try {
+ DuccLogger logger = new DuccLogger(DefunctProcessDetector.class);
+ DuccIdFactory factory = new DuccIdFactory();
+ DuccId duccId = factory.next();
+ NodeIdentity nid = new NodeIdentity();
+ IDuccProcess process = new DuccProcess(duccId, nid);
+ ManagedProcess p = new ManagedProcess(process, null);
+ process.setProcessState(ProcessState.Initializing);
+ process.setPID(args[0]);
+ p.setPid(args[0]);
+ DefunctProcessDetector detector = new DefunctProcessDetector(p, logger);
+ detector.run();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
}