You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2019/08/02 17:27:49 UTC
svn commit: r1864247 [10/10] - in
/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent:
./ config/ deploy/ deploy/uima/ event/ exceptions/ launcher/
metrics/collectors/ monitor/ processors/
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java Fri Aug 2 17:27:48 2019
@@ -37,222 +37,227 @@ import org.apache.uima.ducc.transport.ev
*
*/
public class DefaultNodeInventoryProcessor implements NodeInventoryProcessor {
- DuccLogger logger = new DuccLogger(this.getClass(), "AGENT");
- boolean inventoryChanged = true;
- private NodeAgent agent;
- private Map<DuccId, IDuccProcess> previousInventory;
- private int forceInventoryUpdateMaxThreshold = 0;
- private long counter = 0;
-
- public DefaultNodeInventoryProcessor(NodeAgent agent,
- String inventoryPublishRateSkipCount) {
- this.agent = agent;
- try {
- forceInventoryUpdateMaxThreshold = Integer
- .parseInt(inventoryPublishRateSkipCount);
- } catch (Exception e) {
- }
- // Dont allow 0
- if (forceInventoryUpdateMaxThreshold == 0) {
- forceInventoryUpdateMaxThreshold = 1;
- }
- }
-
- /**
- * Get a copy of agent {@code Process} inventory
- */
- public Map<DuccId, IDuccProcess> getInventory() {
- return agent.getInventoryCopy();
- }
-
- public void dispatchInventoryUpdate(DuccEventDispatcher dispatcher, String targetEndpoint, Map<DuccId, IDuccProcess> inventory) throws Exception {
- NodeInventoryUpdateDuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(inventory,agent.getLastORSequence(), agent.getIdentity());
- dispatcher.dispatch(targetEndpoint, duccEvent);
- logger.info("dispatchInventoryUpdate", null, "Agent dispatched inventory update event to endpoint:"+targetEndpoint);
- }
-
- /**
- *
- */
- public void process(Exchange outgoingMessage) throws Exception {
- String methodName = "process";
- // Get a deep copy of agent's inventory
- Map<DuccId, IDuccProcess> inventory = getInventory();
- // Determine if the inventory changed since the last publishing was done
- // First check if the inventory expanded or shrunk. If the same in size,
- // compare process states and PID. If either of the two changed for any
- // of the processes trigger immediate publish. If no changes found,
- // publish
- // according to skip counter
- // (ducc.agent.node.inventory.publish.rate.skip)
- // configured in ducc.properties.
- if (previousInventory != null) {
- if (agent.getEventListener().forceInvotoryUpdate()) {
- inventoryChanged = true;
- agent.getEventListener().resetForceInventoryUpdateFlag();
- }
- if (inventory.size() != previousInventory.size()) {
- inventoryChanged = true;
- } else {
- // Inventory maps are equal in size, check if all processes in
- // the current
- // inventory exist in the previous inventory snapshot. If not,
- // it means that
- // that perhaps a new process was added and one was removed. In
- // this case,
- // force the publish, since there was a change.
- for (Map.Entry<DuccId, IDuccProcess> currentProcess : inventory.entrySet()) {
- // Check if a process in the current inventory exists in a
- // previous
- // inventory snapshot
- if (previousInventory.containsKey(currentProcess.getKey())) {
- IDuccProcess previousProcess = previousInventory.get(currentProcess.getKey());
- // check if either PID or process state has changed
- if (currentProcess.getValue().getPID() != null && previousProcess.getPID() == null) {
- inventoryChanged = true;
- break;
- } else if (!currentProcess.getValue().getProcessState()
- .equals(previousProcess.getProcessState())) {
- inventoryChanged = true;
- break;
- } else {
- List<IUimaPipelineAEComponent> breakdown = currentProcess.getValue()
- .getUimaPipelineComponents();
- if (breakdown != null && breakdown.size() > 0) {
- List<IUimaPipelineAEComponent> previousBreakdown = previousProcess
- .getUimaPipelineComponents();
- if (previousBreakdown == null || previousBreakdown.size() == 0
- || breakdown.size() != previousBreakdown.size()) {
- inventoryChanged = true;
- } else {
- for (IUimaPipelineAEComponent uimaAeState : breakdown) {
- boolean found = false;
- for (IUimaPipelineAEComponent previousUimaAeState : previousBreakdown) {
- if (uimaAeState.getAeName().equals(previousUimaAeState.getAeName())) {
- found = true;
- if (!uimaAeState.getAeState().equals(previousUimaAeState.getAeState())
- || uimaAeState.getInitializationTime() != previousUimaAeState
- .getInitializationTime()) {
- inventoryChanged = true;
- break;
- }
- }
- }
- if (!found) {
- inventoryChanged = true;
- }
-
- if (inventoryChanged) {
- break;
- }
-
- }
- }
-
- }
- }
- } else {
- // New inventory contains a process not in the previous
- // snapshot
- inventoryChanged = true;
- break;
- }
- }
- }
- }
-
- // Get this inventory snapshot
- previousInventory = inventory;
- // Broadcast inventory if there is a change or configured number of
- // epochs
- // passed since the last broadcast. This is configured in
- // ducc.properties with
- // property ducc.agent.node.inventory.publish.rate.skip
- try {
- if (inventory.size() > 0 && (inventoryChanged || // if there is
- // inventory
- // change,
- // publish
- forceInventoryUpdateMaxThreshold == 0 || // skip rate in
- // ducc.properties
- // is zero,
- // publish
- (counter > 0 && (counter % forceInventoryUpdateMaxThreshold) == 0))) { // if
- // reached
- // skip
- // rate,
- // publish
-
- StringBuffer sb = new StringBuffer("Node Inventory (" + inventory.size() + ")");
- for (Map.Entry<DuccId, IDuccProcess> p : inventory.entrySet()) {
- /*
- * long endInitLong = 0; String endInit = ""; ITimeWindow wInit =
- * p.getValue().getTimeWindowInit(); if(wInit != null) { endInit =
- * wInit.getEnd(); endInitLong = wInit.getEndLong(); } long startRunLong = 0;
- * String startRun = ""; ITimeWindow wRun = p.getValue().getTimeWindowRun();
- * if(wRun != null) { startRun = wRun.getStart(); startRunLong =
- * wRun.getStartLong(); } if(endInitLong > startRunLong) {
- * logger.warn(methodName, null,
- * "endInit:"+endInitLong+" "+"startRun:"+startRunLong); }
- */
- if (p.getValue().getUimaPipelineComponents() == null) {
- p.getValue().setUimaPipelineComponents(new ArrayList<IUimaPipelineAEComponent>());
- }
- if (!p.getValue().getProcessState().equals(ProcessState.Initializing)) {
- p.getValue().getUimaPipelineComponents().clear();
- }
- int pipelineInitStats = (p.getValue().getUimaPipelineComponents() == null) ? 0
- : p.getValue().getUimaPipelineComponents().size();
- StringBuffer gcInfo = new StringBuffer();
- if (p.getValue().getGarbageCollectionStats() != null) {
- gcInfo.append(" GC Total=")
- .append(p.getValue().getGarbageCollectionStats().getCollectionCount())
- .append(" GC Time=")
- .append(p.getValue().getGarbageCollectionStats().getCollectionTime()).append(" ");
-
- }
- sb.append("\n\t[Process Type=").append(p.getValue().getProcessType()).append(" DUCC ID=")
- .append(p.getValue().getDuccId()).append(" PID=").append(p.getValue().getPID())
- .append(" State=").append(p.getValue().getProcessState()).append(" Resident Memory=")
- .append(p.getValue().getResidentMemory()).append(gcInfo.toString())
- .append(" Init Stats List Size:" + pipelineInitStats)
- .append(" Reason: " + p.getValue().getReasonForStoppingProcess()).append("] ");
- if (p.getValue().getProcessState().equals(ProcessState.Stopped)
- || p.getValue().getProcessState().equals(ProcessState.Failed)
- || p.getValue().getProcessState().equals(ProcessState.Killed)) {
- sb.append(" Reason:" + p.getValue().getReasonForStoppingProcess());
- sb.append(" Extended Reason:" + p.getValue().getExtendedReasonForStoppingProcess());
- }
-
- if (!p.getValue().getProcessState().equals(ProcessState.Running)
- && !p.getValue().getProcessState().equals(ProcessState.Initializing)) {
- sb.append(" Exit Code=" + p.getValue().getProcessExitCode());
- }
-
- }
- logger.info(methodName, null,
- "Agent " + agent.getIdentity().getCanonicalName() + " Posting Inventory:" + sb.toString());
- outgoingMessage.getIn().setBody(
- new NodeInventoryUpdateDuccEvent(inventory, agent.getLastORSequence(), agent.getIdentity()));
-
- } else {
- // Add null to the body of the message. A filter
- // defined in the Camel route (AgentConfiguration.java)
- // has a predicate to check for null body and throws
- // away such a message.
- outgoingMessage.getIn().setBody(null);
- }
- } catch (Exception e) {
- logger.error(methodName, null, e);
- } finally {
- if (inventoryChanged) {
- counter = 0;
- } else {
- counter++;
- }
- inventoryChanged = false;
- }
+ DuccLogger logger = new DuccLogger(this.getClass(), "AGENT");
- }
+ boolean inventoryChanged = true;
+
+ private NodeAgent agent;
+
+ private Map<DuccId, IDuccProcess> previousInventory;
+
+ private int forceInventoryUpdateMaxThreshold = 0;
+
+ private long counter = 0;
+
+ public DefaultNodeInventoryProcessor(NodeAgent agent, String inventoryPublishRateSkipCount) {
+ this.agent = agent;
+ try {
+ forceInventoryUpdateMaxThreshold = Integer.parseInt(inventoryPublishRateSkipCount);
+ } catch (Exception e) {
+ }
+ // Dont allow 0
+ if (forceInventoryUpdateMaxThreshold == 0) {
+ forceInventoryUpdateMaxThreshold = 1;
+ }
+ }
+
+ /**
+ * Get a copy of agent {@code Process} inventory
+ */
+ public Map<DuccId, IDuccProcess> getInventory() {
+ return agent.getInventoryCopy();
+ }
+
+ public void dispatchInventoryUpdate(DuccEventDispatcher dispatcher, String targetEndpoint,
+ Map<DuccId, IDuccProcess> inventory) throws Exception {
+ NodeInventoryUpdateDuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(inventory,
+ agent.getLastORSequence(), agent.getIdentity());
+ dispatcher.dispatch(targetEndpoint, duccEvent);
+ logger.info("dispatchInventoryUpdate", null,
+ "Agent dispatched inventory update event to endpoint:" + targetEndpoint);
+ }
+
+ /**
+ *
+ */
+ public void process(Exchange outgoingMessage) throws Exception {
+ String methodName = "process";
+ // Get a deep copy of agent's inventory
+ Map<DuccId, IDuccProcess> inventory = getInventory();
+ // Determine if the inventory changed since the last publishing was done
+ // First check if the inventory expanded or shrunk. If the same in size,
+ // compare process states and PID. If either of the two changed for any
+ // of the processes trigger immediate publish. If no changes found,
+ // publish
+ // according to skip counter
+ // (ducc.agent.node.inventory.publish.rate.skip)
+ // configured in ducc.properties.
+ if (previousInventory != null) {
+ if (agent.getEventListener().forceInvotoryUpdate()) {
+ inventoryChanged = true;
+ agent.getEventListener().resetForceInventoryUpdateFlag();
+ }
+ if (inventory.size() != previousInventory.size()) {
+ inventoryChanged = true;
+ } else {
+ // Inventory maps are equal in size, check if all processes in
+ // the current
+ // inventory exist in the previous inventory snapshot. If not,
+ // it means that
+ // that perhaps a new process was added and one was removed. In
+ // this case,
+ // force the publish, since there was a change.
+ for (Map.Entry<DuccId, IDuccProcess> currentProcess : inventory.entrySet()) {
+ // Check if a process in the current inventory exists in a
+ // previous
+ // inventory snapshot
+ if (previousInventory.containsKey(currentProcess.getKey())) {
+ IDuccProcess previousProcess = previousInventory.get(currentProcess.getKey());
+ // check if either PID or process state has changed
+ if (currentProcess.getValue().getPID() != null && previousProcess.getPID() == null) {
+ inventoryChanged = true;
+ break;
+ } else if (!currentProcess.getValue().getProcessState()
+ .equals(previousProcess.getProcessState())) {
+ inventoryChanged = true;
+ break;
+ } else {
+ List<IUimaPipelineAEComponent> breakdown = currentProcess.getValue()
+ .getUimaPipelineComponents();
+ if (breakdown != null && breakdown.size() > 0) {
+ List<IUimaPipelineAEComponent> previousBreakdown = previousProcess
+ .getUimaPipelineComponents();
+ if (previousBreakdown == null || previousBreakdown.size() == 0
+ || breakdown.size() != previousBreakdown.size()) {
+ inventoryChanged = true;
+ } else {
+ for (IUimaPipelineAEComponent uimaAeState : breakdown) {
+ boolean found = false;
+ for (IUimaPipelineAEComponent previousUimaAeState : previousBreakdown) {
+ if (uimaAeState.getAeName().equals(previousUimaAeState.getAeName())) {
+ found = true;
+ if (!uimaAeState.getAeState().equals(previousUimaAeState.getAeState())
+ || uimaAeState.getInitializationTime() != previousUimaAeState
+ .getInitializationTime()) {
+ inventoryChanged = true;
+ break;
+ }
+ }
+ }
+ if (!found) {
+ inventoryChanged = true;
+ }
+
+ if (inventoryChanged) {
+ break;
+ }
+
+ }
+ }
+
+ }
+ }
+ } else {
+ // New inventory contains a process not in the previous
+ // snapshot
+ inventoryChanged = true;
+ break;
+ }
+ }
+ }
+ }
+
+ // Get this inventory snapshot
+ previousInventory = inventory;
+ // Broadcast inventory if there is a change or configured number of
+ // epochs
+ // passed since the last broadcast. This is configured in
+ // ducc.properties with
+ // property ducc.agent.node.inventory.publish.rate.skip
+ try {
+ if (inventory.size() > 0 && (inventoryChanged || // if there is
+ // inventory
+ // change,
+ // publish
+ forceInventoryUpdateMaxThreshold == 0 || // skip rate in
+ // ducc.properties
+ // is zero,
+ // publish
+ (counter > 0 && (counter % forceInventoryUpdateMaxThreshold) == 0))) { // if
+ // reached
+ // skip
+ // rate,
+ // publish
+
+ StringBuffer sb = new StringBuffer("Node Inventory (" + inventory.size() + ")");
+ for (Map.Entry<DuccId, IDuccProcess> p : inventory.entrySet()) {
+ /*
+ * long endInitLong = 0; String endInit = ""; ITimeWindow wInit =
+ * p.getValue().getTimeWindowInit(); if(wInit != null) { endInit = wInit.getEnd();
+ * endInitLong = wInit.getEndLong(); } long startRunLong = 0; String startRun = "";
+ * ITimeWindow wRun = p.getValue().getTimeWindowRun(); if(wRun != null) { startRun =
+ * wRun.getStart(); startRunLong = wRun.getStartLong(); } if(endInitLong > startRunLong) {
+ * logger.warn(methodName, null, "endInit:"+endInitLong+" "+"startRun:"+startRunLong); }
+ */
+ if (p.getValue().getUimaPipelineComponents() == null) {
+ p.getValue().setUimaPipelineComponents(new ArrayList<IUimaPipelineAEComponent>());
+ }
+ if (!p.getValue().getProcessState().equals(ProcessState.Initializing)) {
+ p.getValue().getUimaPipelineComponents().clear();
+ }
+ int pipelineInitStats = (p.getValue().getUimaPipelineComponents() == null) ? 0
+ : p.getValue().getUimaPipelineComponents().size();
+ StringBuffer gcInfo = new StringBuffer();
+ if (p.getValue().getGarbageCollectionStats() != null) {
+ gcInfo.append(" GC Total=")
+ .append(p.getValue().getGarbageCollectionStats().getCollectionCount())
+ .append(" GC Time=")
+ .append(p.getValue().getGarbageCollectionStats().getCollectionTime())
+ .append(" ");
+
+ }
+ sb.append("\n\t[Process Type=").append(p.getValue().getProcessType()).append(" DUCC ID=")
+ .append(p.getValue().getDuccId()).append(" PID=").append(p.getValue().getPID())
+ .append(" State=").append(p.getValue().getProcessState())
+ .append(" Resident Memory=").append(p.getValue().getResidentMemory())
+ .append(gcInfo.toString()).append(" Init Stats List Size:" + pipelineInitStats)
+ .append(" Reason: " + p.getValue().getReasonForStoppingProcess()).append("] ");
+ if (p.getValue().getProcessState().equals(ProcessState.Stopped)
+ || p.getValue().getProcessState().equals(ProcessState.Failed)
+ || p.getValue().getProcessState().equals(ProcessState.Killed)) {
+ sb.append(" Reason:" + p.getValue().getReasonForStoppingProcess());
+ sb.append(" Extended Reason:" + p.getValue().getExtendedReasonForStoppingProcess());
+ }
+
+ if (!p.getValue().getProcessState().equals(ProcessState.Running)
+ && !p.getValue().getProcessState().equals(ProcessState.Initializing)) {
+ sb.append(" Exit Code=" + p.getValue().getProcessExitCode());
+ }
+
+ }
+ logger.info(methodName, null, "Agent " + agent.getIdentity().getCanonicalName()
+ + " Posting Inventory:" + sb.toString());
+ outgoingMessage.getIn().setBody(new NodeInventoryUpdateDuccEvent(inventory,
+ agent.getLastORSequence(), agent.getIdentity()));
+
+ } else {
+ // Add null to the body of the message. A filter
+ // defined in the Camel route (AgentConfiguration.java)
+ // has a predicate to check for null body and throws
+ // away such a message.
+ outgoingMessage.getIn().setBody(null);
+ }
+ } catch (Exception e) {
+ logger.error(methodName, null, e);
+ } finally {
+ if (inventoryChanged) {
+ counter = 0;
+ } else {
+ counter++;
+ }
+ inventoryChanged = false;
+ }
+
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeMetricsProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeMetricsProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeMetricsProcessor.java Fri Aug 2 17:27:48 2019
@@ -40,68 +40,67 @@ import org.apache.uima.ducc.common.node.
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.transport.event.NodeMetricsUpdateDuccEvent;
+public class DefaultNodeMetricsProcessor extends BaseProcessor implements NodeMetricsProcessor {
+ private NodeAgent agent;
-public class DefaultNodeMetricsProcessor extends BaseProcessor implements
- NodeMetricsProcessor {
- private NodeAgent agent;
-
- private ExecutorService pool = Executors.newFixedThreadPool(1);
-
- DuccLogger logger = DuccLogger.getLogger(this.getClass(), Agent.COMPONENT_NAME);
-
- /*
- public DefaultNodeMetricsProcessor(final NodeAgent agent) throws Exception {
- this.agent = agent;
- }
- */
- public void setAgent(NodeAgent agent ) {
- this.agent = agent;
- }
- public void process(Exchange exchange) throws Exception {
- String methodName = "process";
- try {
-
- DefaultNodeMemoryCollector collector = new DefaultNodeMemoryCollector();
- Future<NodeMemory> nmiFuture = pool.submit(collector);
-
- DefaultNodeLoadAverageCollector loadAvgCollector =
- new DefaultNodeLoadAverageCollector();
- Future<NodeLoadAverage> loadFuture = pool.submit(loadAvgCollector);
-
- NodeCpuCollector cpuCollector = new NodeCpuCollector();
-// Future<NodeCpuInfo> cpuFuture = pool.submit(cpuCollector);
-
- NodeCpuInfo cpuInfo = new NodeCpuInfo(agent.numProcessors, cpuCollector.call().getCurrentLoad());
-
- NodeUsersCollector nodeUsersCollector = new NodeUsersCollector(agent, logger);
- Future<TreeMap<String,NodeUsersInfo>> nuiFuture = pool.submit(nodeUsersCollector);
- boolean cpuReportingEnabled = agent.cgroupsManager.isCpuReportingEnabled();
-
- NodeMetrics nodeMetrics =
- new NodeMetrics(agent.getIdentity(), nmiFuture.get(), loadFuture.get(),
- cpuInfo, nuiFuture.get(), cpuReportingEnabled);
- if ( agent.isStopping()) {
- nodeMetrics.disableNode(); // sends Unavailable status to clients (RM,WS)
- logger.info(methodName, null,">>>>>>>>>>>>>>>>> Agent publishing State="+nodeMetrics.getNodeStatus()+" in Outgoing NodeMetrics");
- }
- //Node node = new DuccNode(new NodeIdentity(), nodeMetrics);
- // jrc 2011-07-30 I think this needs to be agent.getIdentity(), not create a new identity.
- Node node = new DuccNode(agent.getIdentity(), nodeMetrics, agent.useCgroups);
-
- // Make the agent aware how much memory is available on the node. Do this once.
- if ( agent.getNodeInfo() == null ) {
- agent.setNodeInfo(node);
- }
- logger.info(methodName, null, "... Agent "+node.getNodeIdentity().getCanonicalName()+" Posting Users:"+
- node.getNodeMetrics().getNodeUsersMap().size());
-
- NodeMetricsUpdateDuccEvent event = new NodeMetricsUpdateDuccEvent(node,agent.getInventoryRef().size());
- exchange.getIn().setBody(event, NodeMetricsUpdateDuccEvent.class);
-
- } catch( Exception e) {
- e.printStackTrace();
- }
+ private ExecutorService pool = Executors.newFixedThreadPool(1);
- }
+ DuccLogger logger = DuccLogger.getLogger(this.getClass(), Agent.COMPONENT_NAME);
+
+ /*
+ * public DefaultNodeMetricsProcessor(final NodeAgent agent) throws Exception { this.agent =
+ * agent; }
+ */
+ public void setAgent(NodeAgent agent) {
+ this.agent = agent;
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ String methodName = "process";
+ try {
+
+ DefaultNodeMemoryCollector collector = new DefaultNodeMemoryCollector();
+ Future<NodeMemory> nmiFuture = pool.submit(collector);
+
+ DefaultNodeLoadAverageCollector loadAvgCollector = new DefaultNodeLoadAverageCollector();
+ Future<NodeLoadAverage> loadFuture = pool.submit(loadAvgCollector);
+
+ NodeCpuCollector cpuCollector = new NodeCpuCollector();
+ // Future<NodeCpuInfo> cpuFuture = pool.submit(cpuCollector);
+
+ NodeCpuInfo cpuInfo = new NodeCpuInfo(agent.numProcessors,
+ cpuCollector.call().getCurrentLoad());
+
+ NodeUsersCollector nodeUsersCollector = new NodeUsersCollector(agent, logger);
+ Future<TreeMap<String, NodeUsersInfo>> nuiFuture = pool.submit(nodeUsersCollector);
+ boolean cpuReportingEnabled = agent.cgroupsManager.isCpuReportingEnabled();
+
+ NodeMetrics nodeMetrics = new NodeMetrics(agent.getIdentity(), nmiFuture.get(),
+ loadFuture.get(), cpuInfo, nuiFuture.get(), cpuReportingEnabled);
+ if (agent.isStopping()) {
+ nodeMetrics.disableNode(); // sends Unavailable status to clients (RM,WS)
+ logger.info(methodName, null, ">>>>>>>>>>>>>>>>> Agent publishing State="
+ + nodeMetrics.getNodeStatus() + " in Outgoing NodeMetrics");
+ }
+ // Node node = new DuccNode(new NodeIdentity(), nodeMetrics);
+ // jrc 2011-07-30 I think this needs to be agent.getIdentity(), not create a new identity.
+ Node node = new DuccNode(agent.getIdentity(), nodeMetrics, agent.useCgroups);
+
+ // Make the agent aware how much memory is available on the node. Do this once.
+ if (agent.getNodeInfo() == null) {
+ agent.setNodeInfo(node);
+ }
+ logger.info(methodName, null, "... Agent " + node.getNodeIdentity().getCanonicalName()
+ + " Posting Users:" + node.getNodeMetrics().getNodeUsersMap().size());
+
+ NodeMetricsUpdateDuccEvent event = new NodeMetricsUpdateDuccEvent(node,
+ agent.getInventoryRef().size());
+ exchange.getIn().setBody(event, NodeMetricsUpdateDuccEvent.class);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultProcessMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultProcessMetricsProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultProcessMetricsProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultProcessMetricsProcessor.java Fri Aug 2 17:27:48 2019
@@ -22,21 +22,22 @@ import org.apache.camel.Exchange;
import org.apache.uima.ducc.agent.NodeAgent;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
-
public class DefaultProcessMetricsProcessor implements ProcessMetricsProcessor {
- // private NodeAgent agent;
- // private IDuccProcess process;
-
- public DefaultProcessMetricsProcessor( IDuccProcess process, NodeAgent agent) {
- // this.agent = agent;
- // this.process = process;
- }
- public void process(Exchange arg0) throws Exception {
- }
- @Override
- public void stop() {
- // TODO Auto-generated method stub
-
- }
+ // private NodeAgent agent;
+ // private IDuccProcess process;
+
+ public DefaultProcessMetricsProcessor(IDuccProcess process, NodeAgent agent) {
+ // this.agent = agent;
+ // this.process = process;
+ }
+
+ public void process(Exchange arg0) throws Exception {
+ }
+
+ @Override
+ public void stop() {
+ // TODO Auto-generated method stub
+
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxNodeMetricsProcessor.java Fri Aug 2 17:27:48 2019
@@ -41,160 +41,171 @@ import org.apache.uima.ducc.common.node.
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.transport.event.NodeMetricsUpdateDuccEvent;
+public class LinuxNodeMetricsProcessor extends BaseProcessor implements NodeMetricsProcessor {
+ DuccLogger logger = DuccLogger.getLogger(this.getClass(), Agent.COMPONENT_NAME);
-public class LinuxNodeMetricsProcessor extends BaseProcessor implements
- NodeMetricsProcessor {
- DuccLogger logger = DuccLogger.getLogger(this.getClass(), Agent.COMPONENT_NAME);
- public static String[] MeminfoTargetFields = new String[] {"MemTotal:","MemFree:","SwapTotal:","SwapFree:"};
-
- private NodeAgent agent;
- private String osname;
- private String osversion;
- private String osarch;
- private final ExecutorService pool;
- private RandomAccessFile memInfoFile;
- private RandomAccessFile loadAvgFile;
- //private Node node;
- private int swapThreshold = 0;
-// public LinuxNodeMetricsProcessor(NodeAgent agent, String memInfoFilePath,
-// String loadAvgFilePath) throws FileNotFoundException {
- public LinuxNodeMetricsProcessor() {
- super();
-// this.agent = agent;
- pool = Executors.newCachedThreadPool();
- // open files and keep them open until stop() is called
-// memInfoFile = new RandomAccessFile(memInfoFilePath, "r");
-// loadAvgFile = new RandomAccessFile(loadAvgFilePath, "r");
- //node = new DuccNode(agent.getIdentity(), null);
-
- osname = System.getProperty("os.name");
- osversion = System.getProperty("os.version");
- osarch = System.getProperty("os.arch");
-
- if ( System.getProperty("ducc.node.min.swap.threshold") != null ) {
- try {
- swapThreshold = Integer.valueOf(System.getProperty("ducc.node.min.swap.threshold"));
- logger.info("ctor", null, "Ducc Node Min Swap Threshold:"+swapThreshold);
- } catch( Exception e) {
- }
- }
- }
- public void setAgent(NodeAgent agent) {
- this.agent = agent;
- }
- public void initMemInfo(String memInfoFilePath) throws Exception {
- this.memInfoFile = new RandomAccessFile(memInfoFilePath, "r");
-
- }
- public void initLoadAvg(String loadAvgFilePath) throws Exception {
- this.loadAvgFile = new RandomAccessFile(loadAvgFilePath, "r");
+ public static String[] MeminfoTargetFields = new String[] { "MemTotal:", "MemFree:", "SwapTotal:",
+ "SwapFree:" };
+
+ private NodeAgent agent;
+
+ private String osname;
+
+ private String osversion;
+
+ private String osarch;
+
+ private final ExecutorService pool;
+
+ private RandomAccessFile memInfoFile;
+
+ private RandomAccessFile loadAvgFile;
+
+ // private Node node;
+ private int swapThreshold = 0;
+
+ // public LinuxNodeMetricsProcessor(NodeAgent agent, String memInfoFilePath,
+ // String loadAvgFilePath) throws FileNotFoundException {
+ public LinuxNodeMetricsProcessor() {
+ super();
+ // this.agent = agent;
+ pool = Executors.newCachedThreadPool();
+ // open files and keep them open until stop() is called
+ // memInfoFile = new RandomAccessFile(memInfoFilePath, "r");
+ // loadAvgFile = new RandomAccessFile(loadAvgFilePath, "r");
+ // node = new DuccNode(agent.getIdentity(), null);
+
+ osname = System.getProperty("os.name");
+ osversion = System.getProperty("os.version");
+ osarch = System.getProperty("os.arch");
+
+ if (System.getProperty("ducc.node.min.swap.threshold") != null) {
+ try {
+ swapThreshold = Integer.valueOf(System.getProperty("ducc.node.min.swap.threshold"));
+ logger.info("ctor", null, "Ducc Node Min Swap Threshold:" + swapThreshold);
+ } catch (Exception e) {
+ }
}
+ }
- public void stop() {
- try {
- if (memInfoFile != null) {
- memInfoFile.close();
- }
- if (loadAvgFile != null) {
- loadAvgFile.close();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- /**
- * Collects node's metrics and dumps it to a JMS topic. Currently collects
- * memory utilization from /proc/meminfo and average load from
- * /proc/loadavg. This method is called from NodeAgentStatsGenerator at
- * fixed intervals.
- *
- */
+ public void setAgent(NodeAgent agent) {
+ this.agent = agent;
+ }
+
+ public void initMemInfo(String memInfoFilePath) throws Exception {
+ this.memInfoFile = new RandomAccessFile(memInfoFilePath, "r");
+
+ }
+
+ public void initLoadAvg(String loadAvgFilePath) throws Exception {
+ this.loadAvgFile = new RandomAccessFile(loadAvgFilePath, "r");
+ }
+
+ public void stop() {
+ try {
+ if (memInfoFile != null) {
+ memInfoFile.close();
+ }
+ if (loadAvgFile != null) {
+ loadAvgFile.close();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Collects node's metrics and dumps it to a JMS topic. Currently collects memory utilization from
+ * /proc/meminfo and average load from /proc/loadavg. This method is called from
+ * NodeAgentStatsGenerator at fixed intervals.
+ *
+ */
public void process(Exchange e) {
- String methodName = "process";
- try {
- // every 10th node metrics publication log the status of CGroups
- if ( ( NodeAgent.logCounter.incrementAndGet() % 10 ) == 0 ) {
- if ( agent.useCgroups ) {
- logger.info(methodName, null, "\t****\n\t**** Agent CGroups status: enabled");
-
- } else {
- logger.info(methodName, null, "\t****\n\t**** Agent CGroups status: disabled. Reason:"+NodeAgent.cgroupFailureReason);
-
- }
- }
-
- NodeMemInfoCollector memCollector = new NodeMemInfoCollector(MeminfoTargetFields);
- Future<NodeMemory> nmiFuture = pool.submit(memCollector);
-
- NodeLoadAverageCollector loadAvgCollector = new NodeLoadAverageCollector();
-
- Future<NodeLoadAverage> loadFuture = pool.submit(loadAvgCollector);
- NodeCpuCollector cpuCollector = new NodeCpuCollector();
-// Future<NodeCpuInfo> cpuFuture = pool.submit(cpuCollector);
- NodeCpuInfo cpuInfo = new NodeCpuInfo(agent.numProcessors, String.valueOf(cpuCollector.call()));
-
- e.getIn().setHeader("node", agent.getIdentity().getCanonicalName());
- NodeMemory memInfo = nmiFuture.get();
- TreeMap<String, NodeUsersInfo> users = null;
- // begin collecting user processes and activate rogue process detector
- // only after the agent receives the first Ducc state publication.
- if ( agent.receivedDuccState ) {
- NodeUsersCollector nodeUsersCollector = new NodeUsersCollector(agent, logger);
-
- logger.debug(methodName, null, "... Agent Collecting User Processes");
-
- Future<TreeMap<String,NodeUsersInfo>> nuiFuture =
- pool.submit(nodeUsersCollector);
- users = nuiFuture.get();
- } else {
- users = new TreeMap<String, NodeUsersInfo>();
- }
- NodeLoadAverage lav = loadFuture.get();
- boolean cpuReportingEnabled = false;
- if ( agent.cgroupsManager != null ) {
- cpuReportingEnabled = agent.cgroupsManager.isCpuReportingEnabled();
- }
- NodeMetrics nodeMetrics = new NodeMetrics(agent.getIdentity(), memInfo, lav,
- cpuInfo, users, cpuReportingEnabled);
- if ( agent.isStopping()) {
- nodeMetrics.disableNode(); // sends Unavailable status to clients (RM,WS)
- }
-
- Node node = new DuccNode(agent.getIdentity(), nodeMetrics, agent.useCgroups);
- // Make the agent aware how much memory is available on the node. Do this once.
- if ( agent.getNodeInfo() == null ) {
- agent.setNodeInfo(node);
- }
-
- ((DuccNode)node).duccLingExists(agent.duccLingExists());
- ((DuccNode)node).runWithDuccLing(agent.runWithDuccLing());
- logger.info(methodName, null, "... Agent "+node.getNodeIdentity().getCanonicalName()+
- " OS Name:" + osname +
- " OS Version:" + osversion +
- " OS Arch:" + osarch +
- " CPU Count:" + cpuInfo.getAvailableProcessors() +
- " CPU Load Average:" +lav.getLoadAvg1() +
- " Posting Memory (KB):"
- + node.getNodeMetrics().getNodeMemory().getMemTotal()+
- " Memory Free (KB):"+node.getNodeMetrics().getNodeMemory().getMemFree()+
- " Swap Total (KB):"+node.getNodeMetrics().getNodeMemory().getSwapTotal()+
- " Swap Free (KB):"+node.getNodeMetrics().getNodeMemory().getSwapFree()+
- " Low Swap Threshold Defined in ducc.properties (KB):"+swapThreshold +
- " CPU Reporting Enabled:"+cpuReportingEnabled +
- " Node Status:"+nodeMetrics.getNodeStatus()) ;
-
- logger.trace(methodName, null, "... Agent "+node.getNodeIdentity().getCanonicalName()+" Posting Users:"+
- node.getNodeMetrics().getNodeUsersMap().size());
- // Check if swap free is less than defined minimum threshold (check ducc.properties)
- if ( swapThreshold > 0 && ( node.getNodeMetrics().getNodeMemory().getSwapFree() < swapThreshold)) {
- agent.killProcessDueToLowSwapSpace(swapThreshold);
- }
- NodeMetricsUpdateDuccEvent updateEvent = new NodeMetricsUpdateDuccEvent(node,agent.getInventoryRef().size());
- e.getIn().setBody(updateEvent, NodeMetricsUpdateDuccEvent.class);
-
- } catch (Exception ex) {
- logger.error(methodName, null, ex, new Object[] { "Agent" });
- }
- }
+ String methodName = "process";
+ try {
+ // every 10th node metrics publication log the status of CGroups
+ if ((NodeAgent.logCounter.incrementAndGet() % 10) == 0) {
+ if (agent.useCgroups) {
+ logger.info(methodName, null, "\t****\n\t**** Agent CGroups status: enabled");
+
+ } else {
+ logger.info(methodName, null, "\t****\n\t**** Agent CGroups status: disabled. Reason:"
+ + NodeAgent.cgroupFailureReason);
+
+ }
+ }
+
+ NodeMemInfoCollector memCollector = new NodeMemInfoCollector(MeminfoTargetFields);
+ Future<NodeMemory> nmiFuture = pool.submit(memCollector);
+
+ NodeLoadAverageCollector loadAvgCollector = new NodeLoadAverageCollector();
+
+ Future<NodeLoadAverage> loadFuture = pool.submit(loadAvgCollector);
+ NodeCpuCollector cpuCollector = new NodeCpuCollector();
+ // Future<NodeCpuInfo> cpuFuture = pool.submit(cpuCollector);
+ NodeCpuInfo cpuInfo = new NodeCpuInfo(agent.numProcessors,
+ String.valueOf(cpuCollector.call()));
+
+ e.getIn().setHeader("node", agent.getIdentity().getCanonicalName());
+ NodeMemory memInfo = nmiFuture.get();
+ TreeMap<String, NodeUsersInfo> users = null;
+ // begin collecting user processes and activate rogue process detector
+ // only after the agent receives the first Ducc state publication.
+ if (agent.receivedDuccState) {
+ NodeUsersCollector nodeUsersCollector = new NodeUsersCollector(agent, logger);
+
+ logger.debug(methodName, null, "... Agent Collecting User Processes");
+
+ Future<TreeMap<String, NodeUsersInfo>> nuiFuture = pool.submit(nodeUsersCollector);
+ users = nuiFuture.get();
+ } else {
+ users = new TreeMap<String, NodeUsersInfo>();
+ }
+ NodeLoadAverage lav = loadFuture.get();
+ boolean cpuReportingEnabled = false;
+ if (agent.cgroupsManager != null) {
+ cpuReportingEnabled = agent.cgroupsManager.isCpuReportingEnabled();
+ }
+ NodeMetrics nodeMetrics = new NodeMetrics(agent.getIdentity(), memInfo, lav, cpuInfo, users,
+ cpuReportingEnabled);
+ if (agent.isStopping()) {
+ nodeMetrics.disableNode(); // sends Unavailable status to clients (RM,WS)
+ }
+
+ Node node = new DuccNode(agent.getIdentity(), nodeMetrics, agent.useCgroups);
+ // Make the agent aware how much memory is available on the node. Do this once.
+ if (agent.getNodeInfo() == null) {
+ agent.setNodeInfo(node);
+ }
+
+ ((DuccNode) node).duccLingExists(agent.duccLingExists());
+ ((DuccNode) node).runWithDuccLing(agent.runWithDuccLing());
+ logger.info(methodName, null,
+ "... Agent " + node.getNodeIdentity().getCanonicalName() + " OS Name:" + osname
+ + " OS Version:" + osversion + " OS Arch:" + osarch + " CPU Count:"
+ + cpuInfo.getAvailableProcessors() + " CPU Load Average:" + lav.getLoadAvg1()
+ + " Posting Memory (KB):"
+ + node.getNodeMetrics().getNodeMemory().getMemTotal() + " Memory Free (KB):"
+ + node.getNodeMetrics().getNodeMemory().getMemFree() + " Swap Total (KB):"
+ + node.getNodeMetrics().getNodeMemory().getSwapTotal() + " Swap Free (KB):"
+ + node.getNodeMetrics().getNodeMemory().getSwapFree()
+ + " Low Swap Threshold Defined in ducc.properties (KB):" + swapThreshold
+ + " CPU Reporting Enabled:" + cpuReportingEnabled + " Node Status:"
+ + nodeMetrics.getNodeStatus());
+
+ logger.trace(methodName, null, "... Agent " + node.getNodeIdentity().getCanonicalName()
+ + " Posting Users:" + node.getNodeMetrics().getNodeUsersMap().size());
+ // Check if swap free is less than defined minimum threshold (check ducc.properties)
+ if (swapThreshold > 0
+ && (node.getNodeMetrics().getNodeMemory().getSwapFree() < swapThreshold)) {
+ agent.killProcessDueToLowSwapSpace(swapThreshold);
+ }
+ NodeMetricsUpdateDuccEvent updateEvent = new NodeMetricsUpdateDuccEvent(node,
+ agent.getInventoryRef().size());
+ e.getIn().setBody(updateEvent, NodeMetricsUpdateDuccEvent.class);
+
+ } catch (Exception ex) {
+ logger.error(methodName, null, ex, new Object[] { "Agent" });
+ }
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java Fri Aug 2 17:27:48 2019
@@ -43,164 +43,159 @@ import org.apache.uima.ducc.transport.ev
import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
-public class LinuxProcessMetricsProcessor extends BaseProcessor implements
- ProcessMetricsProcessor {
+public class LinuxProcessMetricsProcessor extends BaseProcessor implements ProcessMetricsProcessor {
+
+ private long previousCPUReadingInNanos = 0;
+
+ private long previousSnapshotTime = 0;
+
+ private final ExecutorService pool;
+
+ private IDuccProcess process;
+
+ private DuccGarbageStatsCollector gcStatsCollector;
+
+ private int blockSize = 4096; // default, OS specific
+
+ private DuccLogger logger;
+
+ private ManagedProcess managedProcess;
+
+ private NodeAgent agent;
+
+ private int fudgeFactor = 5; // default is 5%
+
+ private volatile boolean closed = true;
+
+ private long percentCPU = 0;
+
+ public LinuxProcessMetricsProcessor(DuccLogger logger, IDuccProcess process, NodeAgent agent,
+ ManagedProcess managedProcess) throws FileNotFoundException {
+ this.logger = logger;
+ this.managedProcess = managedProcess;
+ this.agent = agent;
+ pool = Executors.newCachedThreadPool();
+ this.process = process;
+ gcStatsCollector = new DuccGarbageStatsCollector(logger, process);
+
+ // keep a refernce to this so that we can call close() when the process
+ // terminates. We need to
+ // close fds to stat and statm files
+ managedProcess.setMetricsProcessor(this);
+
+ blockSize = agent.getOSPageSize();
+
+ if (System.getProperty("ducc.agent.share.size.fudge.factor") != null) {
+ try {
+ fudgeFactor = Integer.parseInt(System.getProperty("ducc.agent.share.size.fudge.factor"));
+ } catch (NumberFormatException e) {
+ e.printStackTrace();
+ }
+ }
+ closed = false;
+ }
+
+ public void stop() {
+ try {
+ if (pool != null) {
+ pool.shutdown();
+ }
+ } catch (Exception e) {
+ logger.error("LinuxProcessMetricsProcessor.stop()", null, e);
+
+ }
+ }
+
+ public void close() {
+ closed = true;
+ try {
+ this.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private boolean collectStats(ProcessState state) {
+ if (process.getProcessState().equals(ProcessState.Stopped)
+ || process.getProcessState().equals(ProcessState.Killed)
+ || process.getProcessState().equals(ProcessState.Failed)
+ || process.getProcessState().equals(ProcessState.Stopping)) {
+ return false; // dont collect stats
+ }
+ return true;
+ }
+
+ private long getSwapUsage() throws Exception {
+ long swapUsage = -1;
+ if (agent.useCgroups) {
+
+ String containerId = agent.cgroupsManager.getContainerId(managedProcess);
+
+ ProcessSwapUsageCollector processSwapCollector = new ProcessSwapUsageCollector(logger,
+ agent.cgroupsManager, containerId);
+ logger.debug("LinuxProcessMetricsProcessor.getSwapUsage", null,
+ "Fetching Swap Usage PID:" + process.getPID());
+ Future<ProcessSwapSpaceUsage> processFaults = pool.submit(processSwapCollector);
+ swapUsage = processFaults.get().getSwapUsage();
+ logger.debug("LinuxProcessMetricsProcessor.getSwapUsage", null,
+ " Process Swap Usage:" + swapUsage);
+ }
+ return swapUsage;
+ }
+
+ private long getFaults() throws Exception {
+ long faults = -1;
+ if (agent.useCgroups) {
+ String containerId = agent.cgroupsManager.getContainerId(managedProcess);
+
+ ProcessMajorFaultCollector processFaultsCollector = new ProcessMajorFaultCollector(logger,
+ agent.cgroupsManager, containerId);
+ logger.debug("LinuxProcessMetricsProcessor.getFaults", null,
+ "Fetching Page Faults PID:" + process.getPID());
+ Future<ProcessMemoryPageLoadUsage> processFaults = pool.submit(processFaultsCollector);
+ faults = processFaults.get().getMajorFaults();
+ logger.debug("LinuxProcessMetricsProcessor.getFaults", null,
+ " Process Faults (pgpgin):" + faults);
+ }
+ return faults;
+ }
+
+ private long getRss() throws Exception {
+ long rss = -1;
+ if (agent.useCgroups) {
+ String containerId = agent.cgroupsManager.getContainerId(managedProcess);
+
+ ProcessResidentMemoryCollector processRSSCollector = new ProcessResidentMemoryCollector(
+ logger, agent.cgroupsManager, containerId);
+ logger.debug("LinuxProcessMetricsProcessor.getRss", null,
+ "Fetching RSS Usage for PID:" + process.getPID());
+ Future<ProcessResidentMemory> processRss = pool.submit(processRSSCollector);
+ rss = processRss.get().get();
+ logger.debug("LinuxProcessMetricsProcessor.getRss", null, " Process RSS:" + rss);
+ }
+ return rss;
+ }
+
+ private long getCpuUsage() throws Exception {
+ long cpuUsage = -1;
+ if (agent.useCgroups) {
+ String containerId = agent.cgroupsManager.getContainerId(managedProcess);
+
+ Future<ProcessCpuUsage> processCpuUsage = null;
+ ProcessCpuUsageCollector processCpuUsageCollector = new ProcessCpuUsageCollector(logger,
+ agent.cgroupsManager, containerId);
+ logger.debug("LinuxProcessMetricsProcessor.getCpuUsage", null,
+ "Fetching CPU Usage for PID:" + process.getPID());
+ processCpuUsage = pool.submit(processCpuUsageCollector);
+ long cpuUsageInNanos = processCpuUsage.get().getCpuUsage();
+ // cpuUsage comes from cpuacct.usage and is in nanos
+ cpuUsage = cpuUsageInNanos;// / 1000000 ; // normalize into millis
+ logger.debug("LinuxProcessMetricsProcessor.getCpuUsage", null, "CPU USAGE:" + cpuUsageInNanos
+ + " CLOCK RATE:" + agent.cpuClockRate + " Total CPU USAGE:" + cpuUsage);
+ }
+ return cpuUsage;
+ }
- private long previousCPUReadingInNanos = 0;
-
- private long previousSnapshotTime = 0;
-
- private final ExecutorService pool;
-
- private IDuccProcess process;
-
- private DuccGarbageStatsCollector gcStatsCollector;
-
- private int blockSize = 4096; // default, OS specific
-
- private DuccLogger logger;
-
- private ManagedProcess managedProcess;
-
- private NodeAgent agent;
-
- private int fudgeFactor = 5; // default is 5%
-
- private volatile boolean closed = true;
-
-
- private long percentCPU = 0;
-
-
- public LinuxProcessMetricsProcessor(DuccLogger logger,
- IDuccProcess process, NodeAgent agent, ManagedProcess managedProcess) throws FileNotFoundException {
- this.logger = logger;
- this.managedProcess = managedProcess;
- this.agent = agent;
- pool = Executors.newCachedThreadPool();
- this.process = process;
- gcStatsCollector = new DuccGarbageStatsCollector(logger, process);
-
- // keep a refernce to this so that we can call close() when the process
- // terminates. We need to
- // close fds to stat and statm files
- managedProcess.setMetricsProcessor(this);
-
- blockSize = agent.getOSPageSize();
-
- if (System.getProperty("ducc.agent.share.size.fudge.factor") != null) {
- try {
- fudgeFactor = Integer.parseInt(System
- .getProperty("ducc.agent.share.size.fudge.factor"));
- } catch (NumberFormatException e) {
- e.printStackTrace();
- }
- }
- closed = false;
- }
-
- public void stop() {
- try {
- if (pool != null) {
- pool.shutdown();
- }
- } catch (Exception e) {
- logger.error("LinuxProcessMetricsProcessor.stop()", null, e);
-
- }
- }
-
- public void close() {
- closed = true;
- try {
- this.stop();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private boolean collectStats(ProcessState state) {
- if (process.getProcessState().equals(ProcessState.Stopped)
- || process.getProcessState().equals(ProcessState.Killed)
- || process.getProcessState().equals(ProcessState.Failed)
- || process.getProcessState().equals(ProcessState.Stopping)) {
- return false; // dont collect stats
- }
- return true;
- }
-
- private long getSwapUsage() throws Exception {
- long swapUsage = -1;
- if (agent.useCgroups) {
-
- String containerId = agent.cgroupsManager
- .getContainerId(managedProcess);
-
- ProcessSwapUsageCollector processSwapCollector = new ProcessSwapUsageCollector(
- logger, agent.cgroupsManager, containerId);
- logger.debug("LinuxProcessMetricsProcessor.getSwapUsage", null,
- "Fetching Swap Usage PID:" + process.getPID());
- Future<ProcessSwapSpaceUsage> processFaults = pool
- .submit(processSwapCollector);
- swapUsage = processFaults.get().getSwapUsage();
- logger.debug("LinuxProcessMetricsProcessor.getSwapUsage", null,
- " Process Swap Usage:" + swapUsage);
- }
- return swapUsage;
- }
-
- private long getFaults() throws Exception {
- long faults = -1;
- if (agent.useCgroups) {
- String containerId = agent.cgroupsManager.getContainerId(managedProcess);
-
- ProcessMajorFaultCollector processFaultsCollector =
- new ProcessMajorFaultCollector(logger, agent.cgroupsManager, containerId);
- logger.debug("LinuxProcessMetricsProcessor.getFaults",null,"Fetching Page Faults PID:"+process.getPID());
- Future<ProcessMemoryPageLoadUsage> processFaults = pool.submit(processFaultsCollector);
- faults = processFaults.get().getMajorFaults();
- logger.debug(
- "LinuxProcessMetricsProcessor.getFaults",null," Process Faults (pgpgin):"+faults);
- }
- return faults;
- }
- private long getRss() throws Exception {
- long rss = -1;
- if (agent.useCgroups) {
- String containerId = agent.cgroupsManager.getContainerId(managedProcess);
-
- ProcessResidentMemoryCollector processRSSCollector =
- new ProcessResidentMemoryCollector(logger, agent.cgroupsManager, containerId);
- logger.debug("LinuxProcessMetricsProcessor.getRss",null,"Fetching RSS Usage for PID:"+process.getPID());
- Future<ProcessResidentMemory> processRss = pool.submit(processRSSCollector);
- rss = processRss.get().get();
- logger.debug(
- "LinuxProcessMetricsProcessor.getRss",null," Process RSS:"+rss);
- }
- return rss;
- }
-
- private long getCpuUsage() throws Exception {
- long cpuUsage=-1;
- if (agent.useCgroups) {
- String containerId = agent.cgroupsManager.getContainerId(managedProcess);
-
- Future<ProcessCpuUsage> processCpuUsage = null;
- ProcessCpuUsageCollector processCpuUsageCollector =
- new ProcessCpuUsageCollector(logger, agent.cgroupsManager, containerId);
- logger.debug("LinuxProcessMetricsProcessor.getCpuUsage",null,"Fetching CPU Usage for PID:"+process.getPID());
- processCpuUsage = pool
- .submit(processCpuUsageCollector);
- long cpuUsageInNanos = processCpuUsage.get().getCpuUsage();
- // cpuUsage comes from cpuacct.usage and is in nanos
- cpuUsage = cpuUsageInNanos;// / 1000000 ; // normalize into millis
- logger.debug(
- "LinuxProcessMetricsProcessor.getCpuUsage",null,
- "CPU USAGE:"+cpuUsageInNanos+ " CLOCK RATE:"+agent.cpuClockRate+" Total CPU USAGE:"+cpuUsage);
- }
- return cpuUsage;
- }
-
private long getCpuTime(long totalCpuUsageInNanos) throws Exception {
long cp = -1;
if (agent.useCgroups) {
@@ -215,9 +210,9 @@ public class LinuxProcessMetricsProcesso
if (process.getTimeWindowRun() != null) {
timeRunning += process.getTimeWindowRun().getElapsedMillis();
}
- long totalCpuUsageInMillis = totalCpuUsageInNanos/1000000;
+ long totalCpuUsageInMillis = totalCpuUsageInNanos / 1000000;
// normalize time in running state into seconds
- percentCPU = Math.round(100*( (totalCpuUsageInMillis*1.0)/ (timeRunning*1.0)));
+ percentCPU = Math.round(100 * ((totalCpuUsageInMillis * 1.0) / (timeRunning * 1.0)));
cp = percentCPU;
@@ -229,229 +224,192 @@ public class LinuxProcessMetricsProcesso
return cp;
}
- private long getCurrentCpu(long totalCpuUsageInNanos ) {
- long currentCpu=-1;
- // publish current CPU usage by computing a delta from the last time
- // CPU data was fetched.
- if ( agent.useCgroups) {
- //long totalCpuUsageInMillis = totalCpuUsageInNanos/1000000;
- long millisCPU = (totalCpuUsageInNanos - previousCPUReadingInNanos)/1000000 ;
- long millisRun = System.currentTimeMillis() - previousSnapshotTime ;
- currentCpu = Math.round(100*( (millisCPU*1.0)/(millisRun*1.0)) );
- previousCPUReadingInNanos = totalCpuUsageInNanos;
- previousSnapshotTime = System.currentTimeMillis();
- }
-
- return currentCpu;
- }
-
-
- private void killProcsIfExceedingMemoryThreshold() throws Exception {
- if ( !agent.useCgroups ) {
- return;
- }
- if (process.getSwapUsage() > 0
- && process.getSwapUsage() > managedProcess
- .getMaxSwapThreshold()) {
- } else {
- String containerId = agent.cgroupsManager
- .getContainerId(managedProcess);
-
- String[] cgroupPids = agent.cgroupsManager
- .getPidsInCgroup(containerId);
- logger.debug("LinuxProcessMetricsProcessor.process",null,"Container ID:"+containerId+" cgroup pids "+cgroupPids.length);
-
- // Use Memory Guard only if cgroups are disabled and fudge
- // factor > -1
-
- if ( fudgeFactor > -1
- && managedProcess.getProcessMemoryAssignment()
- .getMaxMemoryWithFudge() > 0) {
-
- long rss = (process.getResidentMemory() / 1024) / 1024; // normalize RSS into MB
-
- logger.trace(
- "process",
- null,
- "*** Process with PID:"
- + managedProcess.getPid()
- + " Assigned Memory (MB): "
- + managedProcess
- .getProcessMemoryAssignment()
- + " MBs. Current RSS (MB):" + rss);
- // check if process resident memory exceeds its memory
- // assignment calculate in the PM
- if (rss > managedProcess.getProcessMemoryAssignment()
- .getMaxMemoryWithFudge()) {
- logger.error(
- "process",
- null,
- "\n\n********************************************************\n\tProcess with PID:"
- + managedProcess.getPid()
- + " Exceeded its max memory assignment (including a fudge factor) of "
- + managedProcess
- .getProcessMemoryAssignment()
- .getMaxMemoryWithFudge()
- + " MBs. This Process Resident Memory Size: "
- + rss
- + " MBs .Killing process ...\n********************************************************\n\n");
- try {
- managedProcess.kill(); // mark it for death
- process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededShareSize
- .toString());
- agent.stopProcess(process);
-
- if (agent.useCgroups) {
- for (String pid : cgroupPids) {
- // skip the main process that was just
- // killed above. Only kill
- // its child processes.
- if (pid.equals(managedProcess
- .getDuccProcess().getPID())) {
- continue;
- }
- killChildProcess(pid, "-15");
- }
- }
- } catch (Exception ee) {
- if (!collectStats(process.getProcessState())) {
- return;
- }
- logger.error("process", null, ee);
- }
- return;
- }
- }
-
- }
-
- }
-
- private ProcessGarbageCollectionStats getGCStats() throws Exception {
-// if (!process.getProcessType().equals(ProcessType.Pop)) {
- if ( process.getProcessJmxUrl() != null
- && process.getProcessJmxUrl().trim().length() > 0 ) {
- logger.debug("LinuxProcessMetricsProcessor.getGCStats", null, "Collecting GC Stats");
- ProcessGarbageCollectionStats gcStats = gcStatsCollector
- .collect();
- return gcStats;
- }
- return new ProcessGarbageCollectionStats();
- }
- public boolean processIsActive() {
- return process.getProcessState().equals(ProcessState.Starting)
- ||
- process.getProcessState().equals(ProcessState.Started)
- ||
- process.getProcessState().equals(ProcessState.Initializing)
- ||
- process.getProcessState().equals(ProcessState.Running);
- }
- public void process(Exchange e) {
- // if process is stopping or already dead dont collect metrics. The
- // Camel route has just been stopped.
- if (closed || !processIsActive()) {
- logger.info("LinuxProcessMetricsProcessor.process", null,"Process with PID:"+process.getPID() +" not in Running or Initializing state. Terminating Process Metrics Collector");
- // Spin a thread to terminate Camel route which called this processor.
- // This thread will stop the route since the process for which metrics
- // collection was started is no longer running. This is a defensive
- // measure in cases when a route is not stopped as part of process
- // deallocation.
- Thread t = new Thread( new Runnable() {
- public void run() {
- agent.onProcessExit(process);
- }
- });
- t.start();
- return;
- }
- try {
-
- process.setSwapUsage(getSwapUsage());
- process.setMajorFaults(getFaults());
-
- long rssInBytes = getRss();
- process.setResidentMemory(rssInBytes);
-
- long totalCpuUsageInNanos = getCpuUsage();
-
- // set CPU time in terms of %
- process.setCpuTime(getCpuTime(totalCpuUsageInNanos));
-
- process.setCurrentCPU(getCurrentCpu(totalCpuUsageInNanos));
-
- ProcessGarbageCollectionStats gcStats = getGCStats();
- process.setGarbageCollectionStats(gcStats);
- logger.info(
- "process",
- null,
- "----------- PID:" + process.getPID() + " RSS:"
- + ((rssInBytes > -1) ? (rssInBytes / (1024 * 1024))+ " MB" : "-1")
- + " Total CPU Time (%):" + process.getCpuTime()
- + " Delta CPU Time (%):" + process.getCurrentCPU()
- + " Major Faults:" + process.getMajorFaults()
- + " Process Swap Usage:" + process.getSwapUsage()
- + " Max Swap Usage Allowed:"
- + managedProcess.getMaxSwapThreshold()
- + " Total GC Collection Count :"
- + gcStats.getCollectionCount()
- + " Total GC Collection Time :"
- + gcStats.getCollectionTime());
-
- killProcsIfExceedingMemoryThreshold();
-
- } catch (Exception exc) {
- if (!collectStats(process.getProcessState())) {
- return;
- }
- logger.error("LinuxProcessMetricsProcessor.process", null, exc);
- }
- }
-
- private void killChildProcess(final String pid, final String signal) {
- // spawn a thread that will do kill -15, wait for 1 minute and kill the
- // process
- // hard if it is still alive
- (new Thread() {
- public void run() {
- String c_launcher_path = Utils
- .resolvePlaceholderIfExists(
- System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
- System.getProperties());
- try {
- String[] killCmd = null;
- String useSpawn = System
- .getProperty("ducc.agent.launcher.use.ducc_spawn");
- if (useSpawn != null
- && useSpawn.toLowerCase().equals("true")) {
- killCmd = new String[] {
- c_launcher_path,
- "-u",
- ((ManagedProcess) managedProcess).getOwner(),
- "--",
- "/bin/kill",
- signal,
- ((ManagedProcess) managedProcess)
- .getDuccProcess().getPID() };
- } else {
- killCmd = new String[] {
- "/bin/kill",
- "-15",
- ((ManagedProcess) managedProcess)
- .getDuccProcess().getPID() };
- }
- ProcessBuilder pb = new ProcessBuilder(killCmd);
- Process p = pb.start();
- p.wait(1000 * 60); // wait for 1 minute and whack the
- // process if still alive
- p.destroy();
- } catch (Exception e) {
- logger.error("killChildProcess",
- managedProcess.getWorkDuccId(), e);
- }
- }
- }).start();
+ private long getCurrentCpu(long totalCpuUsageInNanos) {
+ long currentCpu = -1;
+ // publish current CPU usage by computing a delta from the last time
+ // CPU data was fetched.
+ if (agent.useCgroups) {
+ // long totalCpuUsageInMillis = totalCpuUsageInNanos/1000000;
+ long millisCPU = (totalCpuUsageInNanos - previousCPUReadingInNanos) / 1000000;
+ long millisRun = System.currentTimeMillis() - previousSnapshotTime;
+ currentCpu = Math.round(100 * ((millisCPU * 1.0) / (millisRun * 1.0)));
+ previousCPUReadingInNanos = totalCpuUsageInNanos;
+ previousSnapshotTime = System.currentTimeMillis();
+ }
+
+ return currentCpu;
+ }
+
+ private void killProcsIfExceedingMemoryThreshold() throws Exception {
+ if (!agent.useCgroups) {
+ return;
+ }
+ if (process.getSwapUsage() > 0
+ && process.getSwapUsage() > managedProcess.getMaxSwapThreshold()) {
+ } else {
+ String containerId = agent.cgroupsManager.getContainerId(managedProcess);
+
+ String[] cgroupPids = agent.cgroupsManager.getPidsInCgroup(containerId);
+ logger.debug("LinuxProcessMetricsProcessor.process", null,
+ "Container ID:" + containerId + " cgroup pids " + cgroupPids.length);
+
+ // Use Memory Guard only if cgroups are disabled and fudge
+ // factor > -1
+
+ if (fudgeFactor > -1
+ && managedProcess.getProcessMemoryAssignment().getMaxMemoryWithFudge() > 0) {
+
+ long rss = (process.getResidentMemory() / 1024) / 1024; // normalize RSS into MB
+
+ logger.trace("process", null,
+ "*** Process with PID:" + managedProcess.getPid() + " Assigned Memory (MB): "
+ + managedProcess.getProcessMemoryAssignment() + " MBs. Current RSS (MB):"
+ + rss);
+ // check if process resident memory exceeds its memory
+ // assignment calculate in the PM
+ if (rss > managedProcess.getProcessMemoryAssignment().getMaxMemoryWithFudge()) {
+ logger.error("process", null,
+ "\n\n********************************************************\n\tProcess with PID:"
+ + managedProcess.getPid()
+ + " Exceeded its max memory assignment (including a fudge factor) of "
+ + managedProcess.getProcessMemoryAssignment().getMaxMemoryWithFudge()
+ + " MBs. This Process Resident Memory Size: " + rss
+ + " MBs .Killing process ...\n********************************************************\n\n");
+ try {
+ managedProcess.kill(); // mark it for death
+ process.setReasonForStoppingProcess(
+ ReasonForStoppingProcess.ExceededShareSize.toString());
+ agent.stopProcess(process);
+
+ if (agent.useCgroups) {
+ for (String pid : cgroupPids) {
+ // skip the main process that was just
+ // killed above. Only kill
+ // its child processes.
+ if (pid.equals(managedProcess.getDuccProcess().getPID())) {
+ continue;
+ }
+ killChildProcess(pid, "-15");
+ }
+ }
+ } catch (Exception ee) {
+ if (!collectStats(process.getProcessState())) {
+ return;
+ }
+ logger.error("process", null, ee);
+ }
+ return;
+ }
+ }
+
+ }
+
+ }
+
+ private ProcessGarbageCollectionStats getGCStats() throws Exception {
+ // if (!process.getProcessType().equals(ProcessType.Pop)) {
+ if (process.getProcessJmxUrl() != null && process.getProcessJmxUrl().trim().length() > 0) {
+ logger.debug("LinuxProcessMetricsProcessor.getGCStats", null, "Collecting GC Stats");
+ ProcessGarbageCollectionStats gcStats = gcStatsCollector.collect();
+ return gcStats;
+ }
+ return new ProcessGarbageCollectionStats();
+ }
+
+ public boolean processIsActive() {
+ return process.getProcessState().equals(ProcessState.Starting)
+ || process.getProcessState().equals(ProcessState.Started)
+ || process.getProcessState().equals(ProcessState.Initializing)
+ || process.getProcessState().equals(ProcessState.Running);
+ }
+
+ public void process(Exchange e) {
+ // if process is stopping or already dead dont collect metrics. The
+ // Camel route has just been stopped.
+ if (closed || !processIsActive()) {
+ logger.info("LinuxProcessMetricsProcessor.process", null, "Process with PID:"
+ + process.getPID()
+ + " not in Running or Initializing state. Terminating Process Metrics Collector");
+ // Spin a thread to terminate Camel route which called this processor.
+ // This thread will stop the route since the process for which metrics
+ // collection was started is no longer running. This is a defensive
+ // measure in cases when a route is not stopped as part of process
+ // deallocation.
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ agent.onProcessExit(process);
+ }
+ });
+ t.start();
+ return;
+ }
+ try {
+
+ process.setSwapUsage(getSwapUsage());
+ process.setMajorFaults(getFaults());
+
+ long rssInBytes = getRss();
+ process.setResidentMemory(rssInBytes);
+
+ long totalCpuUsageInNanos = getCpuUsage();
+
+ // set CPU time in terms of %
+ process.setCpuTime(getCpuTime(totalCpuUsageInNanos));
+
+ process.setCurrentCPU(getCurrentCpu(totalCpuUsageInNanos));
+
+ ProcessGarbageCollectionStats gcStats = getGCStats();
+ process.setGarbageCollectionStats(gcStats);
+ logger.info("process", null,
+ "----------- PID:" + process.getPID() + " RSS:"
+ + ((rssInBytes > -1) ? (rssInBytes / (1024 * 1024)) + " MB" : "-1")
+ + " Total CPU Time (%):" + process.getCpuTime() + " Delta CPU Time (%):"
+ + process.getCurrentCPU() + " Major Faults:" + process.getMajorFaults()
+ + " Process Swap Usage:" + process.getSwapUsage() + " Max Swap Usage Allowed:"
+ + managedProcess.getMaxSwapThreshold() + " Total GC Collection Count :"
+ + gcStats.getCollectionCount() + " Total GC Collection Time :"
+ + gcStats.getCollectionTime());
+
+ killProcsIfExceedingMemoryThreshold();
+
+ } catch (Exception exc) {
+ if (!collectStats(process.getProcessState())) {
+ return;
+ }
+ logger.error("LinuxProcessMetricsProcessor.process", null, exc);
+ }
+ }
+
+ private void killChildProcess(final String pid, final String signal) {
+ // spawn a thread that will do kill -15, wait for 1 minute and kill the
+ // process
+ // hard if it is still alive
+ (new Thread() {
+ public void run() {
+ String c_launcher_path = Utils.resolvePlaceholderIfExists(
+ System.getProperty("ducc.agent.launcher.ducc_spawn_path"), System.getProperties());
+ try {
+ String[] killCmd = null;
+ String useSpawn = System.getProperty("ducc.agent.launcher.use.ducc_spawn");
+ if (useSpawn != null && useSpawn.toLowerCase().equals("true")) {
+ killCmd = new String[] { c_launcher_path, "-u",
+ ((ManagedProcess) managedProcess).getOwner(), "--", "/bin/kill", signal,
+ ((ManagedProcess) managedProcess).getDuccProcess().getPID() };
+ } else {
+ killCmd = new String[] { "/bin/kill", "-15",
+ ((ManagedProcess) managedProcess).getDuccProcess().getPID() };
+ }
+ ProcessBuilder pb = new ProcessBuilder(killCmd);
+ Process p = pb.start();
+ p.wait(1000 * 60); // wait for 1 minute and whack the
+ // process if still alive
+ p.destroy();
+ } catch (Exception e) {
+ logger.error("killChildProcess", managedProcess.getWorkDuccId(), e);
+ }
+ }
+ }).start();
- }
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/NodeInventoryProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/NodeInventoryProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/NodeInventoryProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/NodeInventoryProcessor.java Fri Aug 2 17:27:48 2019
@@ -24,7 +24,6 @@ import org.apache.camel.Processor;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
-
public interface NodeInventoryProcessor extends Processor {
- public Map<DuccId, IDuccProcess> getInventory();
+ public Map<DuccId, IDuccProcess> getInventory();
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/NodeMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/NodeMetricsProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/NodeMetricsProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/NodeMetricsProcessor.java Fri Aug 2 17:27:48 2019
@@ -22,5 +22,5 @@ import org.apache.camel.Processor;
import org.apache.uima.ducc.agent.NodeAgent;
public interface NodeMetricsProcessor extends Processor {
- public void setAgent(NodeAgent agent);
+ public void setAgent(NodeAgent agent);
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/ProcessLifecycleProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/ProcessLifecycleProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/ProcessLifecycleProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/ProcessLifecycleProcessor.java Fri Aug 2 17:27:48 2019
@@ -21,25 +21,32 @@ package org.apache.uima.ducc.agent.proce
import org.apache.camel.Exchange;
public class ProcessLifecycleProcessor extends BaseProcessor {
-
- //private static final String windowsCommandSeparator = "&";
- //private static final String nixCommandSeparator = " ; ";
- public ProcessLifecycleProcessor() {
- super();
- }
- /**
- * Method called by Camel if a message is of type:
- * <ul> StartProcessEvent</ul>
- * <ul> StopProcessEvent</ul>
- * <ul> PurgeProcessEvent</ul>
- *
- * This method checks if a message contains this host name (or IP) as a target. If this agent
- * is not a target, the message is ignored.
- *
- */
- public void process(final Exchange e) throws Exception {
- System.out.println("... Agent Received Request");
- }
+ // private static final String windowsCommandSeparator = "&";
+ // private static final String nixCommandSeparator = " ; ";
+ public ProcessLifecycleProcessor() {
+ super();
+ }
+
+ /**
+ * Method called by Camel if a message is of type:
+ *
+ * <ul>
+ * StartProcessEvent
+ * </ul>
+ * <ul>
+ * StopProcessEvent
+ * </ul>
+ * <ul>
+ * PurgeProcessEvent
+ * </ul>
+ *
+ * This method checks if a message contains this host name (or IP) as a target. If this agent is
+ * not a target, the message is ignored.
+ *
+ */
+ public void process(final Exchange e) throws Exception {
+ System.out.println("... Agent Received Request");
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/ProcessMetricsProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/ProcessMetricsProcessor.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/ProcessMetricsProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/ProcessMetricsProcessor.java Fri Aug 2 17:27:48 2019
@@ -21,5 +21,5 @@ package org.apache.uima.ducc.agent.proce
import org.apache.camel.Processor;
public interface ProcessMetricsProcessor extends Processor {
- public void stop();
+ public void stop();
}