You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2016/11/03 20:20:37 UTC

svn commit: r1767963 - /uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java

Author: cwiklik
Date: Thu Nov  3 20:20:37 2016
New Revision: 1767963

URL: http://svn.apache.org/viewvc?rev=1767963&view=rev
Log:
UIMA-5157 when agent receives stop request, it will immediately send SIGTERM to all child processes, wait awhile and kill itself regardless if the child processes terminate.  

Modified:
    uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java?rev=1767963&r1=1767962&r2=1767963&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java Thu Nov  3 20:20:37 2016
@@ -24,18 +24,20 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileReader;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
@@ -88,6 +90,7 @@ import org.apache.uima.ducc.transport.ev
 import org.apache.uima.ducc.transport.event.common.ITimeWindow;
 import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment;
 import org.apache.uima.ducc.transport.event.common.TimeWindow;
+import org.linkedin.util.exceptions.InternalException;
 
 public class NodeAgent extends AbstractDuccComponent implements Agent, ProcessLifecycleObserver {
   public static DuccLogger logger = DuccLogger.getLogger(NodeAgent.class, COMPONENT_NAME);
@@ -1317,6 +1320,104 @@ public class NodeAgent extends AbstractD
     }
   }
 
+  class AgentStreamConsumer implements Runnable {
+	  private InputStream theStream;
+	  
+	  AgentStreamConsumer(InputStream is) {
+		theStream = is;  
+	  }
+  	public void run() {
+  	  String methodName = "AgentStreamConsumer.run";
+
+  		BufferedReader bufferedReader = null;
+        try
+        {
+          bufferedReader = new BufferedReader(new InputStreamReader(theStream));
+          String line = null;
+          while ((line = bufferedReader.readLine()) != null) {
+            //outputBuffer.append(line + "\n");
+          }
+        }
+        catch (Throwable t)
+        {
+    		logger.warn(methodName, null, t);  
+          t.printStackTrace();
+        }
+        finally
+        {
+          try
+          {
+            bufferedReader.close();
+          } catch( Exception e) {}
+        }
+	}
+}	  
+  enum SIGNAL {
+	SIGTERM("-15"),
+    SIGKILL("-9");
+    
+	String signal="";
+	SIGNAL(String kind) {
+		  signal = kind;
+	}
+	public String get() {
+		return signal;
+	}
+  };
+  class ProcessRunner implements Runnable {
+	  String pid = "";
+	  SIGNAL signal;
+	  
+	  public ProcessRunner(final String pid, SIGNAL signal ) {
+		  this.pid = pid;
+		  this.signal = signal;
+	  }
+	  public void run() {
+		  String methodName = "ProcesRunner.run";
+            String[] sigTermCmd = {"/bin/kill",signal.get(), pid};
+            ProcessBuilder pb = new ProcessBuilder(sigTermCmd);
+            try {
+            	// launch kill SIGTERM
+            	final Process process = pb.start();
+                Thread inputStreamConsumerThread = new Thread( new AgentStreamConsumer(process.getInputStream()) );
+                inputStreamConsumerThread.start();
+            	
+                Thread errorStreamConsumerThread = new Thread( new AgentStreamConsumer(process.getErrorStream()) );
+                errorStreamConsumerThread.start();
+                
+                process.waitFor();
+            } catch ( Exception e ) {
+            	e.printStackTrace();
+        		logger.warn(methodName, null, e);  
+            }
+ 	  }
+  }
+  
+  /**
+   * This method is called when an agent receives a STOP request. It
+   * sends SIGTERM to all child processes and starts a timer. If the 
+   * timer pops and child processes are still running, the agent takes
+   * itself out via halt()
+   */
+  private void stopChildProcesses() {
+	  String methodName = "stopNow";
+	  try {
+	      for (ManagedProcess deployedProcess : deployedProcesses) {
+            String pid = deployedProcess.getDuccProcess().getPID();
+            if (pid == null || pid.trim().length() == 0 || deployedProcess.isStopping()) {
+            	continue;
+            }
+            logger.info(methodName, null, "....Stopping Process - DuccId:" + deployedProcess.getDuccProcess().getDuccId()
+	                    + " PID:" + pid+" Sending SIGTERM");
+			deployedProcess.setStopping();
+            ExecutorService executor = Executors.newSingleThreadExecutor();
+            executor.execute( new ProcessRunner(pid,SIGNAL.SIGTERM));
+	      }
+	      
+	  } catch( Exception e) {
+		logger.warn(methodName, null, e);  
+	  }
+  }
   /**
    * Kills a given process
    * 
@@ -1680,6 +1781,12 @@ public class NodeAgent extends AbstractD
     // if ( configurationFactory.getAgentPingDispatcher() != null ) {
     // configurationFactory.getAgentPingDispatcher().stop();
     // }
+    
+    // Dispatch SIGTERM to all child processes
+    stopChildProcesses();
+    
+    
+    /*    
     synchronized (monitor) {
       Iterator<ManagedProcess> it = deployedProcesses.iterator();
       while (it.hasNext()) {
@@ -1726,11 +1833,29 @@ public class NodeAgent extends AbstractD
       } catch (Exception e) {
       }
     }
-    logger.info("stop", null, "Agent managed processes have stopped");
+*/
+    logger.info("stop", null, "Agent Sent SIGTERM to ALL Child Processes - Number of Deployed Processes:"+deployedProcesses.size());
+    
     // Stop publishing inventory. Once the route is down the agent forces last publication
     // sending an empty process map.
     configurationFactory.stopInventoryRoute();
-    
+
+    if ( deployedProcesses.size() > 0 ) {
+    	// wait for awhile 
+      synchronized (this) {
+    	  long waittime = 60000;
+    	  if (configurationFactory.processStopTimeout != null ) {
+  	         try {
+  	    		 waittime = Long.parseLong(configurationFactory.processStopTimeout);
+  	         } catch( NumberFormatException e) {
+  	        	 logger.warn("stop", null, e);
+  	         }
+    	  }
+         logger.info("stop", null, "Waiting", waittime, "ms to send final NodeInventory.");
+         wait(waittime);
+      }
+    }
+
     // Send an empty process map as the final inventory 
     HashMap<DuccId, IDuccProcess> emptyMap = 
     		new HashMap<DuccId, IDuccProcess>();
@@ -1738,6 +1863,22 @@ public class NodeAgent extends AbstractD
     inventoryDispatcher.dispatch(duccEvent);
     logger.info("stop", null, "Agent published final inventory");
     
+    Thread t = new Thread( new Runnable() {
+    	public void run() {
+    		try {
+    		    logger.info("stop", null, "Agent waiting for 10 seconds before terminating itself via System.exit(1) ");
+    			Thread.currentThread().wait(10000);
+    		} catch( InterruptedException e ) {
+    			
+    		} finally{
+    		    logger.info("stop", null, "Agent calling System.exit(1) ");
+    			System.exit(1);
+    		}
+    	}
+    });
+    t.start();
+    
+    
     // Delay this thread to make sure that at least one last node inventory publish occurs before Agent goes away. Add extra 30 secs 
     // to the delay to make sure the publish happens.
 //    synchronized (this) {