You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2019/06/21 18:08:35 UTC
svn commit: r1861797 - in
/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent:
NodeAgent.java config/AgentConfiguration.java
processors/DefaultNodeInventoryProcessor.java
Author: cwiklik
Date: Fri Jun 21 18:08:35 2019
New Revision: 1861797
URL: http://svn.apache.org/viewvc?rev=1861797&view=rev
Log:
UIMA-6061 Modified to send inventory update to the OR as soon as child process state changes
Modified:
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java?rev=1861797&r1=1861796&r2=1861797&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java Fri Jun 21 18:08:35 2019
@@ -57,6 +57,7 @@ import org.apache.camel.Route;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.builder.RouteBuilder;
import org.apache.commons.lang.SerializationUtils;
+import org.apache.hadoop.conf.Configured;
import org.apache.uima.ducc.agent.config.AgentConfiguration;
import org.apache.uima.ducc.agent.event.AgentEventListener;
import org.apache.uima.ducc.agent.event.ProcessLifecycleObserver;
@@ -69,6 +70,7 @@ import org.apache.uima.ducc.agent.launch
import org.apache.uima.ducc.agent.launcher.SigKillCommand;
import org.apache.uima.ducc.agent.launcher.SigTermCommand;
import org.apache.uima.ducc.agent.metrics.collectors.NodeUsersCollector;
+import org.apache.uima.ducc.agent.processors.DefaultNodeInventoryProcessor;
import org.apache.uima.ducc.common.Node;
import org.apache.uima.ducc.common.NodeIdentity;
import org.apache.uima.ducc.common.admin.event.DuccAdminEvent;
@@ -1354,8 +1356,22 @@ public class NodeAgent extends AbstractD
return;
} else if (changeState(processEntry.getValue().getProcessState())) {
logger.info(methodName, null,"=============== PID:"+processEntry.getValue().getPID()+" Changing State - current state:"+processEntry.getValue().getProcessState()+" New State:"+duccEvent.getState());
- processEntry.getValue().setProcessState(duccEvent.getState());
-
+ processEntry.getValue().setProcessState(duccEvent.getState());
+ DuccEventDispatcher dispatcher =
+ configurationFactory.getORDispatcher(super.getContext());
+ try {
+ DefaultNodeInventoryProcessor processor =
+ configurationFactory.nodeInventoryProcessor(this);
+ Map<DuccId, IDuccProcess> inventoryCopy
+ = (Map<DuccId, IDuccProcess>)SerializationUtils.clone((ConcurrentHashMap<DuccId, IDuccProcess>) inventory);
+
+ processor.dispatchInventoryUpdate(dispatcher, configurationFactory.getInventoryUpdateEndpoint(), inventoryCopy);
+ logger.info(methodName, null,"Sent Node Inventory Update to the OR - process PID:"+processEntry.getValue().getPID());
+
+ } catch( Exception e) {
+ logger.warn("", null,e);
+ }
+
// if the process is Stopping, it must have hit an error threshold
}
// Check if MemoryCollector should be created for this
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java?rev=1861797&r1=1861796&r2=1861797&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java Fri Jun 21 18:08:35 2019
@@ -24,6 +24,7 @@ import java.net.Socket;
import java.net.SocketException;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Map.Entry;
import javax.annotation.PostConstruct;
@@ -136,6 +137,7 @@ public class AgentConfiguration {
@Autowired
CommonConfiguration common;
+ DefaultNodeInventoryProcessor inventoryProcessor=null;
/**
* Creates {@code AgentEventListener} that will handle incoming messages.
*
@@ -499,7 +501,9 @@ public class AgentConfiguration {
}
return null;
}
-
+ public String getInventoryUpdateEndpoint() {
+ return common.nodeInventoryEndpoint;
+ }
public void startNodeMetrics(NodeAgent agent) throws Exception {
nodeMetricsProcessor.setAgent(agent);
@@ -544,8 +548,11 @@ public class AgentConfiguration {
}
- public NodeInventoryProcessor nodeInventoryProcessor(NodeAgent agent) {
- return new DefaultNodeInventoryProcessor(agent, inventoryPublishRateSkipCount);
+ public DefaultNodeInventoryProcessor nodeInventoryProcessor(NodeAgent agent) {
+ if ( Objects.isNull(inventoryProcessor) ) {
+ inventoryProcessor = new DefaultNodeInventoryProcessor(agent, inventoryPublishRateSkipCount);
+ }
+ return inventoryProcessor;
}
public void stopInventoryRoute() {
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java?rev=1861797&r1=1861796&r2=1861797&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java Fri Jun 21 18:08:35 2019
@@ -19,7 +19,6 @@
package org.apache.uima.ducc.agent.processors;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -28,6 +27,7 @@ import org.apache.uima.ducc.agent.NodeAg
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
import org.apache.uima.ducc.transport.event.NodeInventoryUpdateDuccEvent;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
@@ -65,6 +65,12 @@ public class DefaultNodeInventoryProcess
return agent.getInventoryCopy();
}
+ public void dispatchInventoryUpdate(DuccEventDispatcher dispatcher, String targetEndpoint, Map<DuccId, IDuccProcess> inventory) throws Exception {
+ NodeInventoryUpdateDuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(inventory,agent.getLastORSequence(), agent.getIdentity());
+ dispatcher.dispatch(targetEndpoint, duccEvent);
+ logger.info("dispatchInventoryUpdate", null, "Agent dispatched inventory update event to endpoint:"+targetEndpoint);
+ }
+
/**
*
*/
@@ -81,7 +87,7 @@ public class DefaultNodeInventoryProcess
// (ducc.agent.node.inventory.publish.rate.skip)
// configured in ducc.properties.
if (previousInventory != null) {
- if ( agent.getEventListener().forceInvotoryUpdate()) {
+ if (agent.getEventListener().forceInvotoryUpdate()) {
inventoryChanged = true;
agent.getEventListener().resetForceInventoryUpdateFlag();
}
@@ -95,17 +101,14 @@ public class DefaultNodeInventoryProcess
// that perhaps a new process was added and one was removed. In
// this case,
// force the publish, since there was a change.
- for (Map.Entry<DuccId, IDuccProcess> currentProcess : inventory
- .entrySet()) {
+ for (Map.Entry<DuccId, IDuccProcess> currentProcess : inventory.entrySet()) {
// Check if a process in the current inventory exists in a
// previous
// inventory snapshot
if (previousInventory.containsKey(currentProcess.getKey())) {
- IDuccProcess previousProcess = previousInventory
- .get(currentProcess.getKey());
+ IDuccProcess previousProcess = previousInventory.get(currentProcess.getKey());
// check if either PID or process state has changed
- if (currentProcess.getValue().getPID() != null
- && previousProcess.getPID() == null) {
+ if (currentProcess.getValue().getPID() != null && previousProcess.getPID() == null) {
inventoryChanged = true;
break;
} else if (!currentProcess.getValue().getProcessState()
@@ -113,30 +116,22 @@ public class DefaultNodeInventoryProcess
inventoryChanged = true;
break;
} else {
- List<IUimaPipelineAEComponent> breakdown = currentProcess
- .getValue().getUimaPipelineComponents();
+ List<IUimaPipelineAEComponent> breakdown = currentProcess.getValue()
+ .getUimaPipelineComponents();
if (breakdown != null && breakdown.size() > 0) {
List<IUimaPipelineAEComponent> previousBreakdown = previousProcess
.getUimaPipelineComponents();
- if (previousBreakdown == null
- || previousBreakdown.size() == 0
- || breakdown.size() != previousBreakdown
- .size()) {
+ if (previousBreakdown == null || previousBreakdown.size() == 0
+ || breakdown.size() != previousBreakdown.size()) {
inventoryChanged = true;
} else {
for (IUimaPipelineAEComponent uimaAeState : breakdown) {
boolean found = false;
for (IUimaPipelineAEComponent previousUimaAeState : previousBreakdown) {
- if (uimaAeState.getAeName().equals(
- previousUimaAeState
- .getAeName())) {
+ if (uimaAeState.getAeName().equals(previousUimaAeState.getAeName())) {
found = true;
- if (!uimaAeState
- .getAeState()
- .equals(previousUimaAeState
- .getAeState())
- || uimaAeState
- .getInitializationTime() != previousUimaAeState
+ if (!uimaAeState.getAeState().equals(previousUimaAeState.getAeState())
+ || uimaAeState.getInitializationTime() != previousUimaAeState
.getInitializationTime()) {
inventoryChanged = true;
break;
@@ -188,79 +183,57 @@ public class DefaultNodeInventoryProcess
// rate,
// publish
- StringBuffer sb = new StringBuffer("Node Inventory ("
- + inventory.size() + ")");
+ StringBuffer sb = new StringBuffer("Node Inventory (" + inventory.size() + ")");
for (Map.Entry<DuccId, IDuccProcess> p : inventory.entrySet()) {
/*
- * long endInitLong = 0; String endInit = ""; ITimeWindow
- * wInit = p.getValue().getTimeWindowInit(); if(wInit !=
- * null) { endInit = wInit.getEnd(); endInitLong =
- * wInit.getEndLong(); } long startRunLong = 0; String
- * startRun = ""; ITimeWindow wRun =
- * p.getValue().getTimeWindowRun(); if(wRun != null) {
- * startRun = wRun.getStart(); startRunLong =
+ * long endInitLong = 0; String endInit = ""; ITimeWindow wInit =
+ * p.getValue().getTimeWindowInit(); if(wInit != null) { endInit =
+ * wInit.getEnd(); endInitLong = wInit.getEndLong(); } long startRunLong = 0;
+ * String startRun = ""; ITimeWindow wRun = p.getValue().getTimeWindowRun();
+ * if(wRun != null) { startRun = wRun.getStart(); startRunLong =
* wRun.getStartLong(); } if(endInitLong > startRunLong) {
* logger.warn(methodName, null,
* "endInit:"+endInitLong+" "+"startRun:"+startRunLong); }
*/
if (p.getValue().getUimaPipelineComponents() == null) {
- p.getValue().setUimaPipelineComponents(
- new ArrayList<IUimaPipelineAEComponent>());
+ p.getValue().setUimaPipelineComponents(new ArrayList<IUimaPipelineAEComponent>());
}
- if ( !p.getValue().getProcessState().equals(ProcessState.Initializing)) {
+ if (!p.getValue().getProcessState().equals(ProcessState.Initializing)) {
p.getValue().getUimaPipelineComponents().clear();
}
- int pipelineInitStats = (p.getValue()
- .getUimaPipelineComponents() == null) ? 0 : p
- .getValue().getUimaPipelineComponents().size();
+ int pipelineInitStats = (p.getValue().getUimaPipelineComponents() == null) ? 0
+ : p.getValue().getUimaPipelineComponents().size();
StringBuffer gcInfo = new StringBuffer();
- if (p.getValue().getGarbageCollectionStats() != null ) {
+ if (p.getValue().getGarbageCollectionStats() != null) {
gcInfo.append(" GC Total=")
.append(p.getValue().getGarbageCollectionStats().getCollectionCount())
.append(" GC Time=")
- .append(p.getValue().getGarbageCollectionStats().getCollectionTime())
- .append(" ");
-
+ .append(p.getValue().getGarbageCollectionStats().getCollectionTime()).append(" ");
}
- sb.append("\n\t[Process Type=")
- .append(p.getValue().getProcessType())
- .append(" DUCC ID=")
- .append(p.getValue().getDuccId())
- .append(" PID=")
- .append(p.getValue().getPID())
- .append(" State=")
- .append(p.getValue().getProcessState())
- .append(" Resident Memory=")
- .append(p.getValue().getResidentMemory())
- .append(gcInfo.toString())
- .append(" Init Stats List Size:"
- + pipelineInitStats)
- .append(" Reason: "+p.getValue().getReasonForStoppingProcess())
- .append("] ");
- if (p.getValue().getProcessState()
- .equals(ProcessState.Stopped)
- || p.getValue().getProcessState()
- .equals(ProcessState.Failed)
- || p.getValue().getProcessState()
- .equals(ProcessState.Killed)) {
- sb.append(" Reason:"
- + p.getValue().getReasonForStoppingProcess());
- sb.append(" Extended Reason:"+p.getValue().getExtendedReasonForStoppingProcess());
+ sb.append("\n\t[Process Type=").append(p.getValue().getProcessType()).append(" DUCC ID=")
+ .append(p.getValue().getDuccId()).append(" PID=").append(p.getValue().getPID())
+ .append(" State=").append(p.getValue().getProcessState()).append(" Resident Memory=")
+ .append(p.getValue().getResidentMemory()).append(gcInfo.toString())
+ .append(" Init Stats List Size:" + pipelineInitStats)
+ .append(" Reason: " + p.getValue().getReasonForStoppingProcess()).append("] ");
+ if (p.getValue().getProcessState().equals(ProcessState.Stopped)
+ || p.getValue().getProcessState().equals(ProcessState.Failed)
+ || p.getValue().getProcessState().equals(ProcessState.Killed)) {
+ sb.append(" Reason:" + p.getValue().getReasonForStoppingProcess());
+ sb.append(" Extended Reason:" + p.getValue().getExtendedReasonForStoppingProcess());
}
- if (!p.getValue().getProcessState()
- .equals(ProcessState.Running)
- && !p.getValue().getProcessState()
- .equals(ProcessState.Initializing)) {
- sb.append(" Exit Code=" + p.getValue().getProcessExitCode());
- }
+ if (!p.getValue().getProcessState().equals(ProcessState.Running)
+ && !p.getValue().getProcessState().equals(ProcessState.Initializing)) {
+ sb.append(" Exit Code=" + p.getValue().getProcessExitCode());
+ }
}
- logger.info(methodName, null, "Agent "
- + agent.getIdentity().getCanonicalName() + " Posting Inventory:"
- + sb.toString());
- outgoingMessage.getIn().setBody(new NodeInventoryUpdateDuccEvent(inventory,agent.getLastORSequence(), agent.getIdentity()));
+ logger.info(methodName, null,
+ "Agent " + agent.getIdentity().getCanonicalName() + " Posting Inventory:" + sb.toString());
+ outgoingMessage.getIn().setBody(
+ new NodeInventoryUpdateDuccEvent(inventory, agent.getLastORSequence(), agent.getIdentity()));
} else {
// Add null to the body of the message. A filter
@@ -279,6 +252,7 @@ public class DefaultNodeInventoryProcess
}
inventoryChanged = false;
}
+
}
}