You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2015/06/12 15:52:12 UTC

svn commit: r1685092 - in /uima/sandbox/uima-ducc/trunk: uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/agent...

Author: cwiklik
Date: Fri Jun 12 13:52:12 2015
New Revision: 1685092

URL: http://svn.apache.org/r1685092
Log:
UIMA-4462 JP notifies its Agent when it detects WI error and before terminating itself. 

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/agent/ProcessStateUpdate.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java?rev=1685092&r1=1685091&r2=1685092&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java Fri Jun 12 13:52:12 2015
@@ -1104,10 +1104,15 @@ public class NodeAgent extends AbstractD
             undeployProcess(processEntry.getValue());
           } 
           else if (duccEvent.getState().equals(ProcessState.Stopping)) { 
-              deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping);
-              processEntry.getValue().
-              	    setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededErrorThreshold.toString());
-              deployedProcess.setStopping();
+        	  if ( duccEvent.getMessage() != null && duccEvent.getMessage().equals(ReasonForStoppingProcess.ExceededErrorThreshold.toString())) {
+                  processEntry.getValue().
+            	    setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededErrorThreshold.toString());
+        	  }
+              if ( !deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Stopped) && 
+            	   !deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Stopping) )  {
+            	  deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping);
+                  deployedProcess.setStopping();
+              }
           }
           if (duccEvent.getUimaPipeline() != null) {
             StringBuffer buffer = new StringBuffer("\t\tUima Pipeline -");

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java?rev=1685092&r1=1685091&r2=1685092&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java Fri Jun 12 13:52:12 2015
@@ -21,7 +21,6 @@ package org.apache.uima.ducc.agent.launc
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintStream;
-
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -36,6 +35,7 @@ import org.apache.uima.ducc.common.utils
 import org.apache.uima.ducc.transport.cmdline.ICommandLine;
 import org.apache.uima.ducc.transport.event.common.DuccProcess;
 import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccReasonForProcessNotRunning.ReasonForStopping;
 import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
 import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
@@ -424,10 +424,20 @@ public class ManagedProcess implements P
 							getDuccProcess().setReasonForStoppingProcess(
 									errors.trim());
 						} else {
-							// Process terminated unexpectadly. It stopped on its own or due to some
-							// external event not initiated by an agent
-							getDuccProcess().setReasonForStoppingProcess(
-									ReasonForStoppingProcess.Croaked.toString());
+                            // JP should not be marked as CROAKED if it terminates 
+							// due to a process error. On such error, the JP sends Stopping event
+							// to its agent along with a message=ReasonForStoppingProcess.ExceededErrorThreshold. 
+							if ( getDuccProcess().getProcessState().equals(ProcessState.Stopping)) {
+								// the reason was already set while handling JPs Stopping event
+								getDuccProcess().setProcessState(ProcessState.Stopped); // now the JP is dead
+								
+								//getDuccProcess().setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededErrorThreshold.toString());
+							} else {
+								// Process terminated unexpectedly. It stopped on its own due to Ducc framework
+								// error or due to some external event not initiated by an agent
+								getDuccProcess().setReasonForStoppingProcess(
+										ReasonForStoppingProcess.Croaked.toString());
+							}
 						}
 					}
 					

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/agent/ProcessStateUpdate.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/agent/ProcessStateUpdate.java?rev=1685092&r1=1685091&r2=1685092&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/agent/ProcessStateUpdate.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/agent/ProcessStateUpdate.java Fri Jun 12 13:52:12 2015
@@ -36,6 +36,7 @@ public class ProcessStateUpdate implemen
 	String socketEndpoint=null;
 	String duccProcessId;
 	String processJmxUrl;
+	String message;
 	List<IUimaPipelineAEComponent> uimaPipeline;
 	
 	public ProcessStateUpdate(ProcessState state, String pid, String duccProcessId) {
@@ -51,6 +52,7 @@ public class ProcessStateUpdate implemen
 		this.pid = pid;
 		this.duccProcessId = duccProcessId;
 		this.processJmxUrl = processJmxUrl;
+		this.message = processJmxUrl;
 		this.uimaPipeline = uimaPipeline;
 	}
 	/**
@@ -86,6 +88,9 @@ public class ProcessStateUpdate implemen
 	public String getProcessJmxUrl() {
 		return processJmxUrl;
 	}
+	public String getMessage() {
+		return message;
+	}
   public String getSocketEndpoint() {
     return socketEndpoint;
   }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java?rev=1685092&r1=1685091&r2=1685092&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java Fri Jun 12 13:52:12 2015
@@ -94,13 +94,12 @@ implements IAgentSession, IJobProcessMan
 	 * 
 	 */
 	public void notify(ProcessStateUpdate state) {
-		if ( stopped || state.getState().equals(ProcessState.Stopping)) {
+		if ( stopped || (state.getState().equals(ProcessState.Stopping)&& state.getMessage() == null  )) {
 			return;
 		}
 		try {
 		    ProcessStateUpdateDuccEvent duccEvent = 
 				new ProcessStateUpdateDuccEvent(state);
-            //logger.info("notifyAgentWithStatus",null," >>>>>>> UIMA AS Service Deployed - PID:"+pid);
 
             if (endpoint != null ) {
               state.setSocketEndpoint(endpoint);
@@ -108,7 +107,7 @@ implements IAgentSession, IJobProcessMan
 			//	send the process update to the remote
 			dispatcher.dispatch(duccEvent, System.getenv("IP"));
 			String jmx = state.getProcessJmxUrl() == null ? "N/A" : state.getProcessJmxUrl();
-			logger.debug("notifyAgentWithStatus",null,"... Job Process State Changed - PID:"+pid+". Process State: "+state.getState().toString()+". JMX Url:"+jmx+" Dispatched State Update Event to Agent with IP:"+System.getenv("IP"));
+			logger.info("notifyAgentWithStatus",null,"... Job Process State Changed - PID:"+pid+". Process State: "+state.getState().toString()+". JMX Url:"+jmx+" Dispatched State Update Event to Agent with IP:"+System.getenv("IP"));
 		} catch( Exception e) {
 			e.printStackTrace();
 		}

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1685092&r1=1685091&r2=1685092&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Fri Jun 12 13:52:12 2015
@@ -41,6 +41,7 @@ import org.apache.uima.ducc.container.ne
 import org.apache.uima.ducc.container.net.impl.MetaCasTransaction;
 import org.apache.uima.ducc.container.net.impl.PerformanceMetrics;
 import org.apache.uima.ducc.container.net.impl.TransactionId;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
 
 public class HttpWorkerThread implements Runnable {
@@ -293,7 +294,25 @@ public class HttpWorkerThread implements
 	                        } else {
 		                    	logger.warn("run", null, "Worker thread exiting due to error while processing a WI");
 	                        }
-        					duccComponent.setState(ProcessState.Stopping);
+	        				logger.info("run", null, "JP Terminating Due to WI Error - Notify Agent");
+
+        					// send an update to the agent.
+	                        duccComponent.setState(ProcessState.Stopping, ReasonForStoppingProcess.ExceededErrorThreshold.toString());
+        					// sleep for awhile to let the agent handle 
+	                        // Stopping event. 
+	                        // Reason for the sleep: there may be a race condition
+	                        // here, where the JP sends a Stopping event to 
+	                        // its agent and immediately exits. Before the
+	                        // agent finishes handling of Stopping event its
+	                        // internal thread detects process termination 
+	                        // and may mark the JP as croaked. Sleep should
+	                        // reduce the risk of this race but there is still 
+	                        // a chance that agent doesn't handle Stopping
+	                        // event before it detects JP terminating. Unlikely
+	                        // but theoretically possible.
+	                        try {
+    	                        Thread.sleep(3000);
+        					} catch( InterruptedException e) {}
         					/* *****************************************/
         					/* *****************************************/
         					/* *****************************************/
@@ -301,8 +320,7 @@ public class HttpWorkerThread implements
         					/* *****************************************/
         					logger.warn("run", null,"Terminating Job Process - Work Item Failed");
 
-        					// Stop the JVM hard. Agent will check the exit code
-        					// so make it -1 to mark it as CROAK
+        					// Stop the JVM hard. 
         					Runtime.getRuntime().halt(-1);
         					/* *****************************************/
         					/* *****************************************/

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1685092&r1=1685091&r2=1685092&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Fri Jun 12 13:52:12 2015
@@ -77,15 +77,24 @@ implements IJobProcessor{
 		this.processorInstance = pc;
 	}
 	public void setState(ProcessState state) {
+		setState(state,super.getProcessJmxUrl() );
+	}
+	public void setState(ProcessState state, String message) {
 		synchronized(currentState) {
 			if ( currentState.name().equals(ProcessState.FailedInitialization.name()) ) {
 				return;
 			}
+			if ( message == null ) {
+				message = super.getProcessJmxUrl();
+			}
 			if ( !state.name().equals(currentState.name())) {
 				currentState = state;
-				agent.notify(currentState, super.getProcessJmxUrl());
+				logger.info("setState", null, "Notifying Agent New State:"+state.name());
+
+				agent.notify(currentState, message);
 			} 
 		}
+		
 	}
     public void setThreadSleepTime(int sleepTime) {
     	threadSleepTime = sleepTime;