You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2016/11/03 20:20:37 UTC
svn commit: r1767963 -
/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
Author: cwiklik
Date: Thu Nov 3 20:20:37 2016
New Revision: 1767963
URL: http://svn.apache.org/viewvc?rev=1767963&view=rev
Log:
UIMA-5157 when agent receives stop request, it will immediately send SIGTERM to all child processes, wait awhile and kill itself regardless if the child processes terminate.
Modified:
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java?rev=1767963&r1=1767962&r2=1767963&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java Thu Nov 3 20:20:37 2016
@@ -24,18 +24,20 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
@@ -88,6 +90,7 @@ import org.apache.uima.ducc.transport.ev
import org.apache.uima.ducc.transport.event.common.ITimeWindow;
import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment;
import org.apache.uima.ducc.transport.event.common.TimeWindow;
+import org.linkedin.util.exceptions.InternalException;
public class NodeAgent extends AbstractDuccComponent implements Agent, ProcessLifecycleObserver {
public static DuccLogger logger = DuccLogger.getLogger(NodeAgent.class, COMPONENT_NAME);
@@ -1317,6 +1320,104 @@ public class NodeAgent extends AbstractD
}
}
+ class AgentStreamConsumer implements Runnable {
+ private InputStream theStream;
+
+ AgentStreamConsumer(InputStream is) {
+ theStream = is;
+ }
+ public void run() {
+ String methodName = "AgentStreamConsumer.run";
+
+ BufferedReader bufferedReader = null;
+ try
+ {
+ bufferedReader = new BufferedReader(new InputStreamReader(theStream));
+ String line = null;
+ while ((line = bufferedReader.readLine()) != null) {
+ //outputBuffer.append(line + "\n");
+ }
+ }
+ catch (Throwable t)
+ {
+ logger.warn(methodName, null, t);
+ t.printStackTrace();
+ }
+ finally
+ {
+ try
+ {
+ bufferedReader.close();
+ } catch( Exception e) {}
+ }
+ }
+}
+ enum SIGNAL {
+ SIGTERM("-15"),
+ SIGKILL("-9");
+
+ String signal="";
+ SIGNAL(String kind) {
+ signal = kind;
+ }
+ public String get() {
+ return signal;
+ }
+ };
+ class ProcessRunner implements Runnable {
+ String pid = "";
+ SIGNAL signal;
+
+ public ProcessRunner(final String pid, SIGNAL signal ) {
+ this.pid = pid;
+ this.signal = signal;
+ }
+ public void run() {
+ String methodName = "ProcesRunner.run";
+ String[] sigTermCmd = {"/bin/kill",signal.get(), pid};
+ ProcessBuilder pb = new ProcessBuilder(sigTermCmd);
+ try {
+ // launch kill SIGTERM
+ final Process process = pb.start();
+ Thread inputStreamConsumerThread = new Thread( new AgentStreamConsumer(process.getInputStream()) );
+ inputStreamConsumerThread.start();
+
+ Thread errorStreamConsumerThread = new Thread( new AgentStreamConsumer(process.getErrorStream()) );
+ errorStreamConsumerThread.start();
+
+ process.waitFor();
+ } catch ( Exception e ) {
+ e.printStackTrace();
+ logger.warn(methodName, null, e);
+ }
+ }
+ }
+
+ /**
+ * This method is called when an agent receives a STOP request. It
+ * sends SIGTERM to all child processes and starts a timer. If the
+ * timer pops and child processes are still running, the agent takes
+ * itself out via halt()
+ */
+ private void stopChildProcesses() {
+ String methodName = "stopNow";
+ try {
+ for (ManagedProcess deployedProcess : deployedProcesses) {
+ String pid = deployedProcess.getDuccProcess().getPID();
+ if (pid == null || pid.trim().length() == 0 || deployedProcess.isStopping()) {
+ continue;
+ }
+ logger.info(methodName, null, "....Stopping Process - DuccId:" + deployedProcess.getDuccProcess().getDuccId()
+ + " PID:" + pid+" Sending SIGTERM");
+ deployedProcess.setStopping();
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ executor.execute( new ProcessRunner(pid,SIGNAL.SIGTERM));
+ }
+
+ } catch( Exception e) {
+ logger.warn(methodName, null, e);
+ }
+ }
/**
* Kills a given process
*
@@ -1680,6 +1781,12 @@ public class NodeAgent extends AbstractD
// if ( configurationFactory.getAgentPingDispatcher() != null ) {
// configurationFactory.getAgentPingDispatcher().stop();
// }
+
+ // Dispatch SIGTERM to all child processes
+ stopChildProcesses();
+
+
+ /*
synchronized (monitor) {
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
@@ -1726,11 +1833,29 @@ public class NodeAgent extends AbstractD
} catch (Exception e) {
}
}
- logger.info("stop", null, "Agent managed processes have stopped");
+*/
+ logger.info("stop", null, "Agent Sent SIGTERM to ALL Child Processes - Number of Deployed Processes:"+deployedProcesses.size());
+
// Stop publishing inventory. Once the route is down the agent forces last publication
// sending an empty process map.
configurationFactory.stopInventoryRoute();
-
+
+ if ( deployedProcesses.size() > 0 ) {
+ // wait for awhile
+ synchronized (this) {
+ long waittime = 60000;
+ if (configurationFactory.processStopTimeout != null ) {
+ try {
+ waittime = Long.parseLong(configurationFactory.processStopTimeout);
+ } catch( NumberFormatException e) {
+ logger.warn("stop", null, e);
+ }
+ }
+ logger.info("stop", null, "Waiting", waittime, "ms to send final NodeInventory.");
+ wait(waittime);
+ }
+ }
+
// Send an empty process map as the final inventory
HashMap<DuccId, IDuccProcess> emptyMap =
new HashMap<DuccId, IDuccProcess>();
@@ -1738,6 +1863,22 @@ public class NodeAgent extends AbstractD
inventoryDispatcher.dispatch(duccEvent);
logger.info("stop", null, "Agent published final inventory");
+ Thread t = new Thread( new Runnable() {
+ public void run() {
+ try {
+ logger.info("stop", null, "Agent waiting for 10 seconds before terminating itself via System.exit(1) ");
+ Thread.currentThread().wait(10000);
+ } catch( InterruptedException e ) {
+
+ } finally{
+ logger.info("stop", null, "Agent calling System.exit(1) ");
+ System.exit(1);
+ }
+ }
+ });
+ t.start();
+
+
// Delay this thread to make sure that at least one last node inventory publish occurs before Agent goes away. Add extra 30 secs
// to the delay to make sure the publish happens.
// synchronized (this) {