You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2014/04/11 20:20:08 UTC

svn commit: r1586738 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent: NodeAgent.java processors/LinuxProcessMetricsProcessor.java

Author: cwiklik
Date: Fri Apr 11 18:20:08 2014
New Revision: 1586738

URL: http://svn.apache.org/r1586738
Log:
UIMA-3735 when stopping a process first stop its Camel route that collects metrics

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java?rev=1586738&r1=1586737&r2=1586738&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java Fri Apr 11 18:20:08 2014
@@ -42,7 +42,6 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.model.RouteDefinition;
 import org.apache.commons.lang.SerializationUtils;
 import org.apache.uima.ducc.agent.config.AgentConfiguration;
 import org.apache.uima.ducc.agent.event.ProcessLifecycleObserver;
@@ -911,7 +910,11 @@ public class NodeAgent extends AbstractD
           if (processEntry.getValue().isDeallocated()) {
             // stop collecting process stats from /proc/<pid>/statm
             if (duccEvent.getPid() != null) {
-              super.getContext().stopRoute(duccEvent.getPid());
+            	try {
+                    super.getContext().stopRoute(duccEvent.getPid());
+            	} catch( Exception e) {
+            		logger.error(methodName, null, "Unable to stop Camel route for PID:"+duccEvent.getPid());
+            	}
             }
             return;
           }
@@ -978,7 +981,14 @@ public class NodeAgent extends AbstractD
           } else if (duccEvent.getState().equals(ProcessState.Stopped)
                   || duccEvent.getState().equals(ProcessState.Failed)
                   || duccEvent.getState().equals(ProcessState.Killed)) {
-            super.getContext().stopRoute(duccEvent.getPid());
+            	try {
+              	  // stop collecting process stats from /proc/<pid>/statm
+                    super.getContext().stopRoute(duccEvent.getPid());
+            	} catch( Exception e) {
+                    logger.error(methodName, null, "....Unable to stop Camel route for PID:" + duccEvent.getPid());
+            	}
+
+//            super.getContext().stopRoute(duccEvent.getPid());
             
             // remove route from context, otherwise the routes accumulate over time causing memory leak
             super.getContext().removeRoute(duccEvent.getPid());
@@ -1011,7 +1021,11 @@ public class NodeAgent extends AbstractD
             deployedProcess.kill();
             logger.info(methodName, null, ">>>> Agent Handling Process FailedInitialization. PID:"
                     + duccEvent.getPid() + " Killing Process");
-            super.getContext().stopRoute(duccEvent.getPid());
+           try {
+               super.getContext().stopRoute(duccEvent.getPid());
+           } catch( Exception e) {
+        	   logger.error(methodName, null, "Unable to stop Camel route for PID:"+duccEvent.getPid());
+           }
             logger.info(methodName, null,
                     "----------- Agent Stopped ProcessMemoryUsagePollingRouter for Process:"
                             + duccEvent.getPid() + ". Process Failed Initialization");
@@ -1025,7 +1039,7 @@ public class NodeAgent extends AbstractD
             deployedProcess.kill();
             logger.info(methodName, null, ">>>> Agent Handling Process InitializationTimeout. PID:"
                     + duccEvent.getPid() + " Killing Process");
-            super.getContext().stopRoute(duccEvent.getPid());
+          
             logger.info(methodName, null,
                     "----------- Agent Stopped ProcessMemoryUsagePollingRouter for Process:"
                             + duccEvent.getPid()
@@ -1134,6 +1148,13 @@ public class NodeAgent extends AbstractD
           logger.info(methodName, null, "....Undeploying Process - DuccId:" + process.getDuccId()
                   + " PID:" + pid);
           if (pid != null) {
+        	try {
+          	  // stop collecting process stats from /proc/<pid>/statm
+                super.getContext().stopRoute(pid);
+                logger.info(methodName, null, "Stopped Camel Route Collecting Metrics For PID:"+pid);
+        	} catch( Exception e) {
+                logger.error(methodName, null, "....Unable to stop Camel route for PID:" + pid);
+        	}
             // Mark the process as stopping. When the process exits,
             // the agent can determine
             // if the process died on its own (due to say, user code
@@ -1211,8 +1232,13 @@ public class NodeAgent extends AbstractD
       ProcessStateUpdateDuccEvent event = new ProcessStateUpdateDuccEvent(processStateUpdate);
       // cleanup Camel route associated with a process that just stopped
       if ( process.getPID() != null && super.getContext().getRoute(process.getPID()) != null ) {
-          super.getContext().stopRoute(process.getPID());
-          
+//          super.getContext().stopRoute(process.getPID());
+      	try {
+        	  // stop collecting process stats from /proc/<pid>/statm
+              super.getContext().stopRoute(process.getPID());
+      	} catch( Exception e) {
+              logger.error(methodName, null, "....Unable to stop Camel route for PID:" + process.getPID());
+      	}
           // remove route from context, otherwise the routes accumulate over time causing memory leak
           super.getContext().removeRoute(process.getPID());
           StringBuffer sb = new StringBuffer("\n");

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java?rev=1586738&r1=1586737&r2=1586738&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java Fri Apr 11 18:20:08 2014
@@ -130,11 +130,33 @@ public class LinuxProcessMetricsProcesso
     }
   }
 
+  private boolean collectStats(ProcessState state) {
+	    if (process.getProcessState().equals(ProcessState.Stopped)
+	            || process.getProcessState().equals(ProcessState.Killed)
+	            || process.getProcessState().equals(ProcessState.Failed)
+	            || process.getProcessState().equals(ProcessState.Stopping)  ) {
+	    	return false;  // dont collect stats
+	    }
+	  return true;
+  }
   public void process(Exchange e) {
 	  if ( closed ) { // files closed 
 		  return;
 	  }
-    if (process.getProcessState().equals(ProcessState.Initializing)
+	  // if process is stopping or already dead dont collect metrics. The Camel
+	  // route has just been stopped.
+	  if ( !collectStats(process.getProcessState() ) ) {
+		  return;
+	  }
+//	    if (process.getProcessState().equals(ProcessState.Stopped)
+//	            || process.getProcessState().equals(ProcessState.Killed)
+//	            || process.getProcessState().equals(ProcessState.Failed)
+//	            || process.getProcessState().equals(ProcessState.Stopping)  ) {
+//	    	return;  // dont collect stats
+//	    }
+
+	  
+	  if (process.getProcessState().equals(ProcessState.Initializing)
             || process.getProcessState().equals(ProcessState.Running))
       try {
 
@@ -163,6 +185,11 @@ public class LinuxProcessMetricsProcesso
                 
                 ProcessMajorFaultCollector processMajorFaultUsageCollector = 
                 		new ProcessMajorFaultCollector(logger, pid);
+          	  // if process is stopping or already dead dont collect metrics. The Camel
+          	  // route has just been stopped.
+          	  if ( !collectStats(process.getProcessState() ) ) {
+          		  return;
+          	  }
 
                 processMajorFaultUsage = pool
                         .submit(processMajorFaultUsageCollector);
@@ -171,6 +198,12 @@ public class LinuxProcessMetricsProcesso
                 ProcessCpuUsageCollector processCpuUsageCollector = new ProcessCpuUsageCollector(logger,
                         pid, processStatFile, 42, 0);
 
+          	  // if process is stopping or already dead dont collect metrics. The Camel
+          	  // route has just been stopped.
+          	  if ( !collectStats(process.getProcessState() ) ) {
+          		  return;
+          	  }
+
                 processCpuUsage = pool.submit(processCpuUsageCollector);
                 totalCpuUsage += (processCpuUsage.get().getTotalJiffies()/ agent.cpuClockRate);
                 
@@ -178,6 +211,12 @@ public class LinuxProcessMetricsProcesso
                         new RandomAccessFile("/proc/" + pid + "/statm", "r");
                 ProcessResidentMemoryCollector collector = new ProcessResidentMemoryCollector(rStatmFile, 2,
                         0);
+          	  // if process is stopping or already dead dont collect metrics. The Camel
+          	  // route has just been stopped.
+          	  if ( !collectStats(process.getProcessState() ) ) {
+          		  return;
+          	  }
+
                 Future<ProcessResidentMemory> prm = pool.submit(collector);
                
                 totalRss += prm.get().get();
@@ -193,6 +232,11 @@ public class LinuxProcessMetricsProcesso
              ProcessMajorFaultCollector processMajorFaultUsageCollector = 
             		 new ProcessMajorFaultCollector(logger, process.getPID());
 
+       	  // if process is stopping or already dead dont collect metrics. The Camel
+       	  // route has just been stopped.
+       	  if ( !collectStats(process.getProcessState() ) ) {
+       		  return;
+       	  }
              processMajorFaultUsage = pool
                      .submit(processMajorFaultUsageCollector);
              totalFaults = processMajorFaultUsage.get().getMajorFaults();
@@ -200,11 +244,22 @@ public class LinuxProcessMetricsProcesso
              ProcessCpuUsageCollector processCpuUsageCollector = new ProcessCpuUsageCollector(logger,
                      process.getPID(), processStatFile, 42, 0);
 
+       	  // if process is stopping or already dead dont collect metrics. The Camel
+       	  // route has just been stopped.
+       	  if ( !collectStats(process.getProcessState() ) ) {
+       		  return;
+       	  }
              processCpuUsage = pool.submit(processCpuUsageCollector);
              totalCpuUsage = processCpuUsage.get().getTotalJiffies()/ agent.cpuClockRate;
              
              ProcessResidentMemoryCollector collector = new ProcessResidentMemoryCollector(statmFile, 2,
                      0);
+       	  // if process is stopping or already dead dont collect metrics. The Camel
+       	  // route has just been stopped.
+       	  if ( !collectStats(process.getProcessState() ) ) {
+       		  return;
+       	  }
+
              Future<ProcessResidentMemory> prm = pool.submit(collector);
              
              totalRss = prm.get().get();