You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2019/06/21 18:08:35 UTC

svn commit: r1861797 - in /uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent: NodeAgent.java config/AgentConfiguration.java processors/DefaultNodeInventoryProcessor.java

Author: cwiklik
Date: Fri Jun 21 18:08:35 2019
New Revision: 1861797

URL: http://svn.apache.org/viewvc?rev=1861797&view=rev
Log:
UIMA-6061 Modified to send inventory update to the OR as soon as child process state changes

Modified:
    uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
    uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
    uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java?rev=1861797&r1=1861796&r2=1861797&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java Fri Jun 21 18:08:35 2019
@@ -57,6 +57,7 @@ import org.apache.camel.Route;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.commons.lang.SerializationUtils;
+import org.apache.hadoop.conf.Configured;
 import org.apache.uima.ducc.agent.config.AgentConfiguration;
 import org.apache.uima.ducc.agent.event.AgentEventListener;
 import org.apache.uima.ducc.agent.event.ProcessLifecycleObserver;
@@ -69,6 +70,7 @@ import org.apache.uima.ducc.agent.launch
 import org.apache.uima.ducc.agent.launcher.SigKillCommand;
 import org.apache.uima.ducc.agent.launcher.SigTermCommand;
 import org.apache.uima.ducc.agent.metrics.collectors.NodeUsersCollector;
+import org.apache.uima.ducc.agent.processors.DefaultNodeInventoryProcessor;
 import org.apache.uima.ducc.common.Node;
 import org.apache.uima.ducc.common.NodeIdentity;
 import org.apache.uima.ducc.common.admin.event.DuccAdminEvent;
@@ -1354,8 +1356,22 @@ public class NodeAgent extends AbstractD
             return;
           } else if (changeState(processEntry.getValue().getProcessState())) {
         	  logger.info(methodName, null,"=============== PID:"+processEntry.getValue().getPID()+" Changing State - current state:"+processEntry.getValue().getProcessState()+" New State:"+duccEvent.getState());
-    		  processEntry.getValue().setProcessState(duccEvent.getState());
-        	  
+  		      processEntry.getValue().setProcessState(duccEvent.getState());
+  		      DuccEventDispatcher dispatcher = 
+  		    		configurationFactory.getORDispatcher(super.getContext());
+  		      try {
+      		    DefaultNodeInventoryProcessor processor = 
+      		    		configurationFactory.nodeInventoryProcessor(this);
+          		Map<DuccId, IDuccProcess> inventoryCopy
+          			= (Map<DuccId, IDuccProcess>)SerializationUtils.clone((ConcurrentHashMap<DuccId, IDuccProcess>) inventory);
+ 
+          		processor.dispatchInventoryUpdate(dispatcher, configurationFactory.getInventoryUpdateEndpoint(), inventoryCopy);
+      		    logger.info(methodName, null,"Sent Node Inventory Update to the OR - process PID:"+processEntry.getValue().getPID());
+
+  		      } catch( Exception e) {
+  		    	logger.warn("", null,e);
+  		      }
+   		    
             // if the process is Stopping, it must have hit an error threshold
           }
           // Check if MemoryCollector should be created for this

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java?rev=1861797&r1=1861796&r2=1861797&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java Fri Jun 21 18:08:35 2019
@@ -24,6 +24,7 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Map.Entry;
 
 import javax.annotation.PostConstruct;
@@ -136,6 +137,7 @@ public class AgentConfiguration {
   @Autowired
   CommonConfiguration common;
 
+  DefaultNodeInventoryProcessor inventoryProcessor=null;
   /**
    * Creates {@code AgentEventListener} that will handle incoming messages.
    *
@@ -499,7 +501,9 @@ public class AgentConfiguration {
     }
     return null;
   }
-
+  public String getInventoryUpdateEndpoint() {
+	  return common.nodeInventoryEndpoint;
+  }
   public void startNodeMetrics(NodeAgent agent) throws Exception {
 
   	  nodeMetricsProcessor.setAgent(agent);
@@ -544,8 +548,11 @@ public class AgentConfiguration {
 
   }
 
-  public NodeInventoryProcessor nodeInventoryProcessor(NodeAgent agent) {
-    return new DefaultNodeInventoryProcessor(agent, inventoryPublishRateSkipCount);
+  public DefaultNodeInventoryProcessor nodeInventoryProcessor(NodeAgent agent) {
+	  if ( Objects.isNull(inventoryProcessor) ) {
+		  inventoryProcessor = new DefaultNodeInventoryProcessor(agent, inventoryPublishRateSkipCount);
+	  }
+    return inventoryProcessor;
   }
 
   public void stopInventoryRoute() {

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java?rev=1861797&r1=1861796&r2=1861797&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java Fri Jun 21 18:08:35 2019
@@ -19,7 +19,6 @@
 package org.apache.uima.ducc.agent.processors;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -28,6 +27,7 @@ import org.apache.uima.ducc.agent.NodeAg
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.id.DuccId;
 import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
 import org.apache.uima.ducc.transport.event.NodeInventoryUpdateDuccEvent;
 import org.apache.uima.ducc.transport.event.common.IDuccProcess;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
@@ -65,6 +65,12 @@ public class DefaultNodeInventoryProcess
 		return agent.getInventoryCopy();
 	}
 
+	public void dispatchInventoryUpdate(DuccEventDispatcher dispatcher, String targetEndpoint, Map<DuccId, IDuccProcess> inventory) throws Exception {
+		NodeInventoryUpdateDuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(inventory,agent.getLastORSequence(), agent.getIdentity());
+		dispatcher.dispatch(targetEndpoint, duccEvent);
+		logger.info("dispatchInventoryUpdate", null, "Agent dispatched inventory update event to endpoint:"+targetEndpoint);
+	}
+
 	/**
 	 * 
 	 */
@@ -81,7 +87,7 @@ public class DefaultNodeInventoryProcess
 		// (ducc.agent.node.inventory.publish.rate.skip)
 		// configured in ducc.properties.
 		if (previousInventory != null) {
-			if ( agent.getEventListener().forceInvotoryUpdate()) {
+			if (agent.getEventListener().forceInvotoryUpdate()) {
 				inventoryChanged = true;
 				agent.getEventListener().resetForceInventoryUpdateFlag();
 			}
@@ -95,17 +101,14 @@ public class DefaultNodeInventoryProcess
 				// that perhaps a new process was added and one was removed. In
 				// this case,
 				// force the publish, since there was a change.
-				for (Map.Entry<DuccId, IDuccProcess> currentProcess : inventory
-						.entrySet()) {
+				for (Map.Entry<DuccId, IDuccProcess> currentProcess : inventory.entrySet()) {
 					// Check if a process in the current inventory exists in a
 					// previous
 					// inventory snapshot
 					if (previousInventory.containsKey(currentProcess.getKey())) {
-						IDuccProcess previousProcess = previousInventory
-								.get(currentProcess.getKey());
+						IDuccProcess previousProcess = previousInventory.get(currentProcess.getKey());
 						// check if either PID or process state has changed
-						if (currentProcess.getValue().getPID() != null
-								&& previousProcess.getPID() == null) {
+						if (currentProcess.getValue().getPID() != null && previousProcess.getPID() == null) {
 							inventoryChanged = true;
 							break;
 						} else if (!currentProcess.getValue().getProcessState()
@@ -113,30 +116,22 @@ public class DefaultNodeInventoryProcess
 							inventoryChanged = true;
 							break;
 						} else {
-							List<IUimaPipelineAEComponent> breakdown = currentProcess
-									.getValue().getUimaPipelineComponents();
+							List<IUimaPipelineAEComponent> breakdown = currentProcess.getValue()
+									.getUimaPipelineComponents();
 							if (breakdown != null && breakdown.size() > 0) {
 								List<IUimaPipelineAEComponent> previousBreakdown = previousProcess
 										.getUimaPipelineComponents();
-								if (previousBreakdown == null
-										|| previousBreakdown.size() == 0
-										|| breakdown.size() != previousBreakdown
-												.size()) {
+								if (previousBreakdown == null || previousBreakdown.size() == 0
+										|| breakdown.size() != previousBreakdown.size()) {
 									inventoryChanged = true;
 								} else {
 									for (IUimaPipelineAEComponent uimaAeState : breakdown) {
 										boolean found = false;
 										for (IUimaPipelineAEComponent previousUimaAeState : previousBreakdown) {
-											if (uimaAeState.getAeName().equals(
-													previousUimaAeState
-															.getAeName())) {
+											if (uimaAeState.getAeName().equals(previousUimaAeState.getAeName())) {
 												found = true;
-												if (!uimaAeState
-														.getAeState()
-														.equals(previousUimaAeState
-																.getAeState())
-														|| uimaAeState
-																.getInitializationTime() != previousUimaAeState
+												if (!uimaAeState.getAeState().equals(previousUimaAeState.getAeState())
+														|| uimaAeState.getInitializationTime() != previousUimaAeState
 																.getInitializationTime()) {
 													inventoryChanged = true;
 													break;
@@ -188,79 +183,57 @@ public class DefaultNodeInventoryProcess
 																							// rate,
 																							// publish
 
-				StringBuffer sb = new StringBuffer("Node Inventory ("
-						+ inventory.size() + ")");
+				StringBuffer sb = new StringBuffer("Node Inventory (" + inventory.size() + ")");
 				for (Map.Entry<DuccId, IDuccProcess> p : inventory.entrySet()) {
 					/*
-					 * long endInitLong = 0; String endInit = ""; ITimeWindow
-					 * wInit = p.getValue().getTimeWindowInit(); if(wInit !=
-					 * null) { endInit = wInit.getEnd(); endInitLong =
-					 * wInit.getEndLong(); } long startRunLong = 0; String
-					 * startRun = ""; ITimeWindow wRun =
-					 * p.getValue().getTimeWindowRun(); if(wRun != null) {
-					 * startRun = wRun.getStart(); startRunLong =
+					 * long endInitLong = 0; String endInit = ""; ITimeWindow wInit =
+					 * p.getValue().getTimeWindowInit(); if(wInit != null) { endInit =
+					 * wInit.getEnd(); endInitLong = wInit.getEndLong(); } long startRunLong = 0;
+					 * String startRun = ""; ITimeWindow wRun = p.getValue().getTimeWindowRun();
+					 * if(wRun != null) { startRun = wRun.getStart(); startRunLong =
 					 * wRun.getStartLong(); } if(endInitLong > startRunLong) {
 					 * logger.warn(methodName, null,
 					 * "endInit:"+endInitLong+" "+"startRun:"+startRunLong); }
 					 */
 					if (p.getValue().getUimaPipelineComponents() == null) {
-						p.getValue().setUimaPipelineComponents(
-								new ArrayList<IUimaPipelineAEComponent>());
+						p.getValue().setUimaPipelineComponents(new ArrayList<IUimaPipelineAEComponent>());
 					}
-					if ( !p.getValue().getProcessState().equals(ProcessState.Initializing)) {
+					if (!p.getValue().getProcessState().equals(ProcessState.Initializing)) {
 						p.getValue().getUimaPipelineComponents().clear();
 					}
-					int pipelineInitStats = (p.getValue()
-							.getUimaPipelineComponents() == null) ? 0 : p
-							.getValue().getUimaPipelineComponents().size();
+					int pipelineInitStats = (p.getValue().getUimaPipelineComponents() == null) ? 0
+							: p.getValue().getUimaPipelineComponents().size();
 					StringBuffer gcInfo = new StringBuffer();
-					if (p.getValue().getGarbageCollectionStats() != null ) {
+					if (p.getValue().getGarbageCollectionStats() != null) {
 						gcInfo.append(" GC Total=")
 								.append(p.getValue().getGarbageCollectionStats().getCollectionCount())
 								.append(" GC Time=")
-								.append(p.getValue().getGarbageCollectionStats().getCollectionTime())
-								.append(" ");
-
+								.append(p.getValue().getGarbageCollectionStats().getCollectionTime()).append(" ");
 
 					}
-					sb.append("\n\t[Process Type=")
-							.append(p.getValue().getProcessType())
-							.append(" DUCC ID=")
-							.append(p.getValue().getDuccId())
-							.append(" PID=")
-							.append(p.getValue().getPID())
-							.append(" State=")
-							.append(p.getValue().getProcessState())
-							.append(" Resident Memory=")
-							.append(p.getValue().getResidentMemory())
-							.append(gcInfo.toString())
-							.append(" Init Stats List Size:"
-									+ pipelineInitStats)
-							.append(" Reason: "+p.getValue().getReasonForStoppingProcess())		
-							.append("] ");
-					if (p.getValue().getProcessState()
-							.equals(ProcessState.Stopped)
-							|| p.getValue().getProcessState()
-									.equals(ProcessState.Failed)
-							|| p.getValue().getProcessState()
-									.equals(ProcessState.Killed)) {
-						sb.append(" Reason:"
-								+ p.getValue().getReasonForStoppingProcess());
-						sb.append(" Extended Reason:"+p.getValue().getExtendedReasonForStoppingProcess());
+					sb.append("\n\t[Process Type=").append(p.getValue().getProcessType()).append(" DUCC ID=")
+							.append(p.getValue().getDuccId()).append(" PID=").append(p.getValue().getPID())
+							.append(" State=").append(p.getValue().getProcessState()).append(" Resident Memory=")
+							.append(p.getValue().getResidentMemory()).append(gcInfo.toString())
+							.append(" Init Stats List Size:" + pipelineInitStats)
+							.append(" Reason: " + p.getValue().getReasonForStoppingProcess()).append("] ");
+					if (p.getValue().getProcessState().equals(ProcessState.Stopped)
+							|| p.getValue().getProcessState().equals(ProcessState.Failed)
+							|| p.getValue().getProcessState().equals(ProcessState.Killed)) {
+						sb.append(" Reason:" + p.getValue().getReasonForStoppingProcess());
+						sb.append(" Extended Reason:" + p.getValue().getExtendedReasonForStoppingProcess());
 					}
 
-					if (!p.getValue().getProcessState()
-							.equals(ProcessState.Running)
-							&& !p.getValue().getProcessState()
-					                .equals(ProcessState.Initializing)) {
-					    sb.append(" Exit Code=" + p.getValue().getProcessExitCode());
-					}					
+					if (!p.getValue().getProcessState().equals(ProcessState.Running)
+							&& !p.getValue().getProcessState().equals(ProcessState.Initializing)) {
+						sb.append(" Exit Code=" + p.getValue().getProcessExitCode());
+					}
 
 				}
-				logger.info(methodName, null, "Agent "
-						+ agent.getIdentity().getCanonicalName() + " Posting Inventory:"
-						+ sb.toString());
-				outgoingMessage.getIn().setBody(new NodeInventoryUpdateDuccEvent(inventory,agent.getLastORSequence(), agent.getIdentity()));
+				logger.info(methodName, null,
+						"Agent " + agent.getIdentity().getCanonicalName() + " Posting Inventory:" + sb.toString());
+				outgoingMessage.getIn().setBody(
+						new NodeInventoryUpdateDuccEvent(inventory, agent.getLastORSequence(), agent.getIdentity()));
 
 			} else {
 				// Add null to the body of the message. A filter
@@ -279,6 +252,7 @@ public class DefaultNodeInventoryProcess
 			}
 			inventoryChanged = false;
 		}
+
 	}
 
 }