You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2019/08/02 17:27:49 UTC
svn commit: r1864247 [3/10] - in
/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent:
./ config/ deploy/ deploy/uima/ event/ exceptions/ launcher/
metrics/collectors/ monitor/ processors/
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgentInfo.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgentInfo.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgentInfo.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgentInfo.java Fri Aug 2 17:27:48 2019
@@ -21,143 +21,148 @@ package org.apache.uima.ducc.agent;
import java.io.Serializable;
import java.util.Properties;
-
public class NodeAgentInfo implements Serializable {
- // UNREACHABLE is set if the agent fails to send a ping within a window
- public enum AgentState { INITIALIZING, READY, STOPPED, UNREACHABLE, FAILED };
- public static final String OSName = "os.name";
- public static final String OSVersion = "os.version";
- public static final String OSArchitecture = "os.arch";
- public static final String NodeCpus = "node.cpus";
- public static final String JavaVendor = "java.vendor";
- public static final String JavaVersion = "java.version";
-
- private static final long serialVersionUID = 1L;
- private String hostname;
- private String ip = "N/A";
- private int jmxPort;
- private Properties properties = new Properties();
- private String id;
-// private ProcessGroup[] groups;
- private AgentState status = AgentState.INITIALIZING;
- private String agentLog= "N/A";
- private boolean firstHeartbeat = true;
-
- public NodeAgentInfo(String hostname, String id) {
- super();
- this.hostname = hostname;
- this.id = id;
- }
- public boolean isFirstHeartbeat() {
- return firstHeartbeat;
- }
-
- /**
- * @return the status
- */
- public AgentState getStatus() {
- return status;
- }
-
- public String getAgentLog() {
- return agentLog;
- }
-
- public void setAgentLog(String agentLog) {
- this.agentLog = agentLog;
- }
-
- public String getIp() {
- return ip;
- }
-
- public void setIp(String ip) {
- this.ip = ip;
- }
-
- /**
- * @param status the status to set
- */
- public void setStatus(AgentState status) {
- this.status = status;
- }
-
- public Properties getProperties() {
- return properties;
- }
-
- public void setProperties( Properties properties ) {
- this.properties = properties;
- }
-
- public String getProperty(String key) {
- return properties.getProperty(key);
- }
-
- public void setProperty( String key, String value) {
- properties.put(key, value);
- }
- /**
- * @return the jmxPort
- */
- public int getJmxPort() {
- return jmxPort;
- }
-
- /**
- * @param jmxPort the jmxPort to set
- */
- public void setJmxPort(int jmxPort) {
- this.jmxPort = jmxPort;
- }
-
-
- /**
- * @return the hostname
- */
- public String getHostname() {
- return hostname;
- }
- /**
- * @return the id
- */
- public String getId() {
- return id;
- }
-
-// public void setProcessGroups(ProcessGroup[] groups) {
-// this.groups = groups;
-// }
-// public ProcessGroup[] getProcessGroups() {
-// return groups;
-// }
- public void dump() {
- if (firstHeartbeat ) {
- firstHeartbeat = false;
- System.out
- .println("+++++++++++ Controller Received Agent Info. \n\tNode:"
- + getHostname()
- + "\n\tAgent Node IP:"
- + getIp()
- + "\n\tAgent Node ID:"
- + getId()
- + "\n\tAgent Log:"
- + getAgentLog()
- + "\n\tAgent Jmx Port:"
- + getJmxPort()
- + "\n\tAgent Node OS:"
- + getProperty(NodeAgentInfo.OSName)
- + "\n\tAgent Node OS Level:"
- + getProperty(NodeAgentInfo.OSVersion)
- + "\n\tAgent Node OS Architecture:"
- + getProperty(NodeAgentInfo.OSArchitecture)
- + "\n\tAgent Node CPU Count:"
- + getProperty(NodeAgentInfo.NodeCpus)
- + "\n\tAgent Node Java Vendor:"
- + getProperty(NodeAgentInfo.JavaVendor)
- + "\n\tAgent Node Java Version:"
- + getProperty(NodeAgentInfo.JavaVersion));
- }
- }
+ // UNREACHABLE is set if the agent fails to send a ping within a window
+ public enum AgentState {
+ INITIALIZING, READY, STOPPED, UNREACHABLE, FAILED
+ };
+
+ public static final String OSName = "os.name";
+
+ public static final String OSVersion = "os.version";
+
+ public static final String OSArchitecture = "os.arch";
+
+ public static final String NodeCpus = "node.cpus";
+
+ public static final String JavaVendor = "java.vendor";
+
+ public static final String JavaVersion = "java.version";
+
+ private static final long serialVersionUID = 1L;
+
+ private String hostname;
+
+ private String ip = "N/A";
+
+ private int jmxPort;
+
+ private Properties properties = new Properties();
+
+ private String id;
+
+ // private ProcessGroup[] groups;
+ private AgentState status = AgentState.INITIALIZING;
+
+ private String agentLog = "N/A";
+
+ private boolean firstHeartbeat = true;
+
+ public NodeAgentInfo(String hostname, String id) {
+ super();
+ this.hostname = hostname;
+ this.id = id;
+ }
+
+ public boolean isFirstHeartbeat() {
+ return firstHeartbeat;
+ }
+
+ /**
+ * @return the status
+ */
+ public AgentState getStatus() {
+ return status;
+ }
+
+ public String getAgentLog() {
+ return agentLog;
+ }
+
+ public void setAgentLog(String agentLog) {
+ this.agentLog = agentLog;
+ }
+
+ public String getIp() {
+ return ip;
+ }
+
+ public void setIp(String ip) {
+ this.ip = ip;
+ }
+
+ /**
+ * @param status
+ * the status to set
+ */
+ public void setStatus(AgentState status) {
+ this.status = status;
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+
+ public String getProperty(String key) {
+ return properties.getProperty(key);
+ }
+
+ public void setProperty(String key, String value) {
+ properties.put(key, value);
+ }
+
+ /**
+ * @return the jmxPort
+ */
+ public int getJmxPort() {
+ return jmxPort;
+ }
+
+ /**
+ * @param jmxPort
+ * the jmxPort to set
+ */
+ public void setJmxPort(int jmxPort) {
+ this.jmxPort = jmxPort;
+ }
+
+ /**
+ * @return the hostname
+ */
+ public String getHostname() {
+ return hostname;
+ }
+
+ /**
+ * @return the id
+ */
+ public String getId() {
+ return id;
+ }
+
+ // public void setProcessGroups(ProcessGroup[] groups) {
+ // this.groups = groups;
+ // }
+ // public ProcessGroup[] getProcessGroups() {
+ // return groups;
+ // }
+ public void dump() {
+ if (firstHeartbeat) {
+ firstHeartbeat = false;
+ System.out.println("+++++++++++ Controller Received Agent Info. \n\tNode:" + getHostname()
+ + "\n\tAgent Node IP:" + getIp() + "\n\tAgent Node ID:" + getId() + "\n\tAgent Log:"
+ + getAgentLog() + "\n\tAgent Jmx Port:" + getJmxPort() + "\n\tAgent Node OS:"
+ + getProperty(NodeAgentInfo.OSName) + "\n\tAgent Node OS Level:"
+ + getProperty(NodeAgentInfo.OSVersion) + "\n\tAgent Node OS Architecture:"
+ + getProperty(NodeAgentInfo.OSArchitecture) + "\n\tAgent Node CPU Count:"
+ + getProperty(NodeAgentInfo.NodeCpus) + "\n\tAgent Node Java Vendor:"
+ + getProperty(NodeAgentInfo.JavaVendor) + "\n\tAgent Node Java Version:"
+ + getProperty(NodeAgentInfo.JavaVersion));
+ }
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeMetricsGenerator.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeMetricsGenerator.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeMetricsGenerator.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeMetricsGenerator.java Fri Aug 2 17:27:48 2019
@@ -22,26 +22,26 @@ import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.uima.ducc.agent.processors.LinuxNodeMetricsProcessor;
-
-
public class NodeMetricsGenerator {
- public NodeMetricsGenerator( int refreshRate, int timeToLive) {
+ public NodeMetricsGenerator(int refreshRate, int timeToLive) {
}
- protected LinuxNodeMetricsProcessor configure( CamelContext context, String brokerUrl, final String ducc_node_metrics_endpoint) throws Exception {
+
+ protected LinuxNodeMetricsProcessor configure(CamelContext context, String brokerUrl,
+ final String ducc_node_metrics_endpoint) throws Exception {
context.addRoutes(new RouteBuilder() {
public void configure() {
-// from("timer:nodeMetricsTimer?fixedRate=true&period=" + refreshRate).startupOrder(3)
-// .setHeader(DuccExchange.Event, constant(DuccExchange.NodeStatsEvent))
-// .setHeader(DuccExchange.DUCCNODENAME, constant(NodeAgent.getI().getHostname()))
-// .setHeader(DuccExchange.DUCCNODEIP, constant(NodeAgent.getAgentInfo().getIp()))
-// .process(nodeMetricsProcessor).to(
-// ducc_node_metrics_endpoint+"?explicitQosEnabled=true&timeToLive="+timeToLive);
+ // from("timer:nodeMetricsTimer?fixedRate=true&period=" + refreshRate).startupOrder(3)
+ // .setHeader(DuccExchange.Event, constant(DuccExchange.NodeStatsEvent))
+ // .setHeader(DuccExchange.DUCCNODENAME, constant(NodeAgent.getI().getHostname()))
+ // .setHeader(DuccExchange.DUCCNODEIP, constant(NodeAgent.getAgentInfo().getIp()))
+ // .process(nodeMetricsProcessor).to(
+ // ducc_node_metrics_endpoint+"?explicitQosEnabled=true&timeToLive="+timeToLive);
}
});
- // return nodeMetricsProcessor;
+ // return nodeMetricsProcessor;
return null;
}
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessLifecycleController.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessLifecycleController.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessLifecycleController.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessLifecycleController.java Fri Aug 2 17:27:48 2019
@@ -24,8 +24,9 @@ import org.apache.uima.ducc.transport.ev
import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment;
-
public interface ProcessLifecycleController {
- public void startProcess(IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info, DuccId workDuccId, ProcessMemoryAssignment pma, boolean isPreemptable);
- public void stopProcess( IDuccProcess process );
+ public void startProcess(IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info,
+ DuccId workDuccId, ProcessMemoryAssignment pma, boolean isPreemptable);
+
+ public void stopProcess(IDuccProcess process);
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessReaperTask.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessReaperTask.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessReaperTask.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/ProcessReaperTask.java Fri Aug 2 17:27:48 2019
@@ -25,28 +25,30 @@ import org.apache.uima.ducc.agent.launch
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess;
-
public class ProcessReaperTask extends TimerTask {
private NodeAgent agent;
+
private DuccLogger logger;
-
-
+
public ProcessReaperTask(NodeAgent agent, DuccLogger logger) {
this.agent = agent;
this.logger = logger;
}
+
public void run() {
- if ( agent.deployedProcesses.size() > 0 ) {
- logger.warn("ProcessReaperTask.run()", null, "Agent timed out waiting for a Ping Message. Assuming network connectivity problem and killing all running JPs");
+ if (agent.deployedProcesses.size() > 0) {
+ logger.warn("ProcessReaperTask.run()", null,
+ "Agent timed out waiting for a Ping Message. Assuming network connectivity problem and killing all running JPs");
Iterator<ManagedProcess> it = agent.deployedProcesses.iterator();
- while( it.hasNext()) {
+ while (it.hasNext()) {
ManagedProcess mp = it.next();
mp.kill();
- mp.getDuccProcess().setReasonForStoppingProcess(ReasonForStoppingProcess.AgentTimedOutWaitingForORState.toString());
+ mp.getDuccProcess().setReasonForStoppingProcess(
+ ReasonForStoppingProcess.AgentTimedOutWaitingForORState.toString());
String pid = mp.getDuccProcess().getPID();
agent.stopProcess(mp.getDuccProcess());
- logger.info("ProcessReaperTask.run()", null, "Agent calling stopProcess:"+pid);
+ logger.info("ProcessReaperTask.run()", null, "Agent calling stopProcess:" + pid);
}
}
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/RogueProcessReaper.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/RogueProcessReaper.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/RogueProcessReaper.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/RogueProcessReaper.java Fri Aug 2 17:27:48 2019
@@ -71,8 +71,8 @@ public class RogueProcessReaper {
// check if purge delay is defined in ducc.properties.
if (System.getProperty("ducc.agent.rogue.process.purge.delay") != null) {
try {
- maxSecondsBeforeEntryExpires = Integer.valueOf(System
- .getProperty("ducc.agent.rogue.process.purge.delay"));
+ maxSecondsBeforeEntryExpires = Integer
+ .valueOf(System.getProperty("ducc.agent.rogue.process.purge.delay"));
} catch (Exception e) {
if (logger == null) {
e.printStackTrace();
@@ -92,8 +92,8 @@ public class RogueProcessReaper {
System.out.println("ducc.agent.rogue.process.kill=" + doKillRogueProcess);
} else {
- logger.info("RogueProcessReaper.ctor", null, "ducc.agent.rogue.process.kill="
- + doKillRogueProcess);
+ logger.info("RogueProcessReaper.ctor", null,
+ "ducc.agent.rogue.process.kill=" + doKillRogueProcess);
}
@@ -119,8 +119,8 @@ public class RogueProcessReaper {
.println("PID:" + pid + " Not Rogue Yet - It takes 3 iterations to make it Rogue");
} else {
- logger.info("submitRogueProcessForKill", null, "PID:" + pid
- + " Not Rogue Yet - It takes 3 iterations to make it Rogue");
+ logger.info("submitRogueProcessForKill", null,
+ "PID:" + pid + " Not Rogue Yet - It takes 3 iterations to make it Rogue");
}
return;
@@ -129,13 +129,13 @@ public class RogueProcessReaper {
try {
// Dont kill the process immediately. Kill if this method is called "counterValue"
// number of times.
- long counter=0;
+ long counter = 0;
if (logger != null) {
logger.info(methodName, null,
"Decrementing Counter - Current Value:" + entry.counter.getCount());
}
- if ( entry.counter.getCount() > 0) {
- counter = entry.countDown();
+ if (entry.counter.getCount() > 0) {
+ counter = entry.countDown();
}
// check if the rogue process needs to be killed
if (counter <= 0 && !entry.isKilled()) {
@@ -143,8 +143,8 @@ public class RogueProcessReaper {
System.out.println("Process Scheduled for Kill PID:" + pid + " Owner:" + user + " ");
} else {
- logger.info(methodName, null, "Process Scheduled for Kill PID:" + pid + " Owner:"
- + user + " ");
+ logger.info(methodName, null,
+ "Process Scheduled for Kill PID:" + pid + " Owner:" + user + " ");
}
entry.resetCounter(counterValue);
@@ -154,10 +154,10 @@ public class RogueProcessReaper {
if (logger == null) {
System.out
.println("Process ***NOT*** Scheduled for Kill PID:" + pid + " Owner:" + user);
-
+
} else {
- logger.info(methodName, null, "Process ***NOT*** Scheduled for Kill PID:" + pid
- + " Owner:" + user);
+ logger.info(methodName, null,
+ "Process ***NOT*** Scheduled for Kill PID:" + pid + " Owner:" + user);
}
@@ -165,12 +165,12 @@ public class RogueProcessReaper {
if (entry.isKilled() && entry.countDownCleanupCounter() == 0) {
if (logger == null) {
- System.out.println("Removing Entry From RougeProcessMap for PID:" + pid + " Owner:"
- + user);
+ System.out.println(
+ "Removing Entry From RougeProcessMap for PID:" + pid + " Owner:" + user);
} else {
- logger.info(methodName, null, "Removing Entry From RougeProcessMap for PID:" + pid
- + " Owner:" + user);
+ logger.info(methodName, null,
+ "Removing Entry From RougeProcessMap for PID:" + pid + " Owner:" + user);
}
userRogueProcessMap.remove(pid);
@@ -180,22 +180,14 @@ public class RogueProcessReaper {
}
} else {
if (logger == null) {
- System.out
- .println("Ducc Not Configured to Kill Rogue Proces (PID:)"
- + pid
- + " Owner:"
- + user
- + ". Change (or define) ducc.agent.rogue.process.reaper.script property in ducc.properties if you want rogue processes to be cleaned up.");
+ System.out.println("Ducc Not Configured to Kill Rogue Proces (PID:)" + pid + " Owner:"
+ + user
+ + ". Change (or define) ducc.agent.rogue.process.reaper.script property in ducc.properties if you want rogue processes to be cleaned up.");
} else {
- logger.info(
- methodName,
- null,
- "Ducc Not Configured to Kill Rogue Proces (PID:)"
- + pid
- + " Owner:"
- + user
- + ". Change (or define) ducc.agent.rogue.process.reaper.script property in ducc.properties if you want rogue processes to be cleaned up.");
+ logger.info(methodName, null, "Ducc Not Configured to Kill Rogue Proces (PID:)" + pid
+ + " Owner:" + user
+ + ". Change (or define) ducc.agent.rogue.process.reaper.script property in ducc.properties if you want rogue processes to be cleaned up.");
}
}
@@ -298,8 +290,8 @@ public class RogueProcessReaper {
}
new Thread(new Runnable() {
public void run() {
- InputStream is = null;
- BufferedReader reader = null;
+ InputStream is = null;
+ BufferedReader reader = null;
try {
String[] repearScriptCommand = new String[] { reaperScript, pid };
ProcessBuilder pb = new ProcessBuilder(repearScriptCommand);
@@ -325,36 +317,37 @@ public class RogueProcessReaper {
reader = new BufferedReader(new InputStreamReader(is));
String scriptOutput = "";
// read the next line from stdout and stderr
- while ( (scriptOutput = reader.readLine()) != null) {
- if ( Objects.nonNull(logger)) {
- logger.info(methodName, null,scriptOutput);
- } else {
- System.out.println(">>>>"+scriptOutput);
- }
+ while ((scriptOutput = reader.readLine()) != null) {
+ if (Objects.nonNull(logger)) {
+ logger.info(methodName, null, scriptOutput);
+ } else {
+ System.out.println(">>>>" + scriptOutput);
+ }
}
-
+
sb.setLength(0);
if (logger == null) {
System.out.println("--------- Rogue Process Reaper (for PID:" + pid + ") Terminated");
} else {
- logger.info(methodName, null, "--------- Rogue Process Reaper (for PID:" + pid
- + ") Terminated");
+ logger.info(methodName, null,
+ "--------- Rogue Process Reaper (for PID:" + pid + ") Terminated");
}
} catch (Exception e) {
logger.error(methodName, null, e);
} finally {
-
+
synchronized (this) {
processMap.remove(pid);
}
- if ( reader != null ) {
- try {
- reader.close();
- } catch( Exception exx){}
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (Exception exx) {
+ }
}
}
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java Fri Aug 2 17:27:48 2019
@@ -79,6 +79,7 @@ public class AgentConfiguration {
DuccLogger logger = new DuccLogger(this.getClass(), "Agent");
NodeAgent agent = null;
+
NodeMetricsProcessor nodeMetricsProcessor;
// fetch the name of an endpoint where the JM expects incoming requests
@@ -94,11 +95,13 @@ public class AgentConfiguration {
private RouteBuilder inventoryRouteBuilder;
private ServerSocket serviceStateUpdateServer = null;
-
+
private Thread serverThread = null;
- /* Deprecated
- @Value("#{ systemProperties['ducc.agent.launcher.thread.pool.size'] }")
- String launcherThreadPoolSize;
+ /*
+ * Deprecated
+ *
+ * @Value("#{ systemProperties['ducc.agent.launcher.thread.pool.size'] }") String
+ * launcherThreadPoolSize;
*/
@Value("#{ systemProperties['ducc.agent.launcher.process.stop.timeout'] }")
@@ -127,7 +130,6 @@ public class AgentConfiguration {
@Value("#{ systemProperties['ducc.agent.launcher.cgroups.retry.delay.factor'] }")
public String retryDelayFactor;
-
@Autowired
DuccTransportConfiguration agentTransport;
@@ -137,7 +139,8 @@ public class AgentConfiguration {
@Autowired
CommonConfiguration common;
- DefaultNodeInventoryProcessor inventoryProcessor=null;
+ DefaultNodeInventoryProcessor inventoryProcessor = null;
+
/**
* Creates {@code AgentEventListener} that will handle incoming messages.
*
@@ -235,11 +238,10 @@ public class AgentConfiguration {
final AgentEventListener delegate) {
return new RouteBuilder() {
public void configure() {
- onException(Throwable.class).maximumRedeliveries(0)
- .handled(false)
+ onException(Throwable.class).maximumRedeliveries(0).handled(false)
.process(new ErrorProcessor());
from(common.agentRequestEndpoint).routeId("IncomingRequestsRoute")
- // .process(new DebugProcessor())
+ // .process(new DebugProcessor())
.bean(delegate);
}
};
@@ -267,11 +269,11 @@ public class AgentConfiguration {
.process(new ErrorProcessor()).stop();
from(common.managedProcessStateUpdateEndpoint).routeId("ManageProcessStateUpdateRoute")
- .choice().when(filter).bean(delegate).end();
+ .choice().when(filter).bean(delegate).end();
}
};
}
-
+
public class DebugProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
@@ -323,7 +325,7 @@ public class AgentConfiguration {
sb.append(entry.getKey()).append("=").append(entry.getValue()).append("\n");
}
logger.info("StateUpdateDebugProcessor.process", null, "Headers:\n\t" + sb.toString());
- logger.info("StateUpdateDebugProcessor.process", null, "Body:"+exchange.getIn().getBody());
+ logger.info("StateUpdateDebugProcessor.process", null, "Body:" + exchange.getIn().getBody());
}
}
@@ -393,61 +395,64 @@ public class AgentConfiguration {
return new Launcher();
}
- public DuccEventDispatcher getCommonProcessDispatcher(CamelContext camelContext) throws Exception {
+ public DuccEventDispatcher getCommonProcessDispatcher(CamelContext camelContext)
+ throws Exception {
return agentTransport.duccEventDispatcher(logger, common.managedServiceEndpoint, camelContext);
}
public DuccEventDispatcher getORDispatcher(CamelContext camelContext) throws Exception {
- return agentTransport.duccEventDispatcher(logger, common.nodeInventoryEndpoint, camelContext);
+ return agentTransport.duccEventDispatcher(logger, common.nodeInventoryEndpoint, camelContext);
}
public int getNodeInventoryPublishDelay() {
- return Integer.parseInt(common.nodeInventoryPublishRate);
+ return Integer.parseInt(common.nodeInventoryPublishRate);
}
+
/**
* Starts state update server to handle AP service state update.
*
- * @param State update handler
+ * @param State
+ * update handler
* @throws Exception
*/
private void startAPServiceStateUpdateSocketServer(final AgentEventListener l) throws Exception {
- int port = Utils.findFreePort();
-
- serviceStateUpdateServer = new ServerSocket(port);
- // Publish State Update Port for AP's. This port will be added to the AP
- // environment before a launch
- System.setProperty("AGENT_AP_STATE_UPDATE_PORT",String.valueOf(port));
- // spin a server thread which will handle AP state update messages
- serverThread = new Thread( new Runnable() {
- public void run() {
-
- while(!agent.isStopping()) {
- try {
- Socket client = serviceStateUpdateServer.accept();
- // AP connected, get its status report. Handling of the status
- // will be done in a dedicated thread to allow concurrent processing.
- // When handling of the state update is done, the socket will be closed
- ServiceUpdateWorkerThread worker = new ServiceUpdateWorkerThread(client, l);
- worker.start();
- } catch( SocketException e) {
- // ignore, clients can come and go
- } catch( Exception e) {
- logger.error("startAPServiceStateUpdateSocketServer", null, e);
- } finally {
- }
-
- }
- logger.info("startAPServiceStateUpdateSocketServer", null, "State Update Server Stopped");
- }
- });
- serverThread.start();
- logger.info("startSocketServer", null, "Started AP Service State Update Server on Port"+port);
+ int port = Utils.findFreePort();
+
+ serviceStateUpdateServer = new ServerSocket(port);
+ // Publish State Update Port for AP's. This port will be added to the AP
+ // environment before a launch
+ System.setProperty("AGENT_AP_STATE_UPDATE_PORT", String.valueOf(port));
+ // spin a server thread which will handle AP state update messages
+ serverThread = new Thread(new Runnable() {
+ public void run() {
+
+ while (!agent.isStopping()) {
+ try {
+ Socket client = serviceStateUpdateServer.accept();
+ // AP connected, get its status report. Handling of the status
+ // will be done in a dedicated thread to allow concurrent processing.
+ // When handling of the state update is done, the socket will be closed
+ ServiceUpdateWorkerThread worker = new ServiceUpdateWorkerThread(client, l);
+ worker.start();
+ } catch (SocketException e) {
+ // ignore, clients can come and go
+ } catch (Exception e) {
+ logger.error("startAPServiceStateUpdateSocketServer", null, e);
+ } finally {
+ }
+
+ }
+ logger.info("startAPServiceStateUpdateSocketServer", null, "State Update Server Stopped");
+ }
+ });
+ serverThread.start();
+ logger.info("startSocketServer", null, "Started AP Service State Update Server on Port" + port);
}
+
@Bean
public NodeAgent nodeAgent() throws Exception {
try {
-
-
+
camelContext = common.camelContext();
camelContext.disableJMX();
@@ -455,19 +460,19 @@ public class AgentConfiguration {
// optionally configures Camel Context for JMS. Checks the 'agentRequestEndpoint' to
// to determine type of transport. If the the endpoint starts with "activemq:", a
// special ActiveMQ component will be activated to enable JMS transport
- agentTransport.configureJMSTransport(logger,common.agentRequestEndpoint, camelContext);
+ agentTransport.configureJMSTransport(logger, common.agentRequestEndpoint, camelContext);
AgentEventListener delegateListener = agentDelegateListener(agent);
agent.setAgentEventListener(delegateListener);
-
+
agent.setStateChangeEndpoint(common.daemonsStateChangeEndpoint);
// Create server to receive status update from APs. The JPs report their status
// via a Camel Mina-based route. The APs report to a different port handled
// by the code below. The APs pass in status as String whereas the JPs pass in
// status as DuccEvent. The Camel Mina-based route cannot serve both. Mina route
- // must be configured differently to accept String in a body.
- startAPServiceStateUpdateSocketServer(delegateListener);
+ // must be configured differently to accept String in a body.
+ startAPServiceStateUpdateSocketServer(delegateListener);
if (common.managedProcessStateUpdateEndpointType != null
&& common.managedProcessStateUpdateEndpointType.equalsIgnoreCase("socket")) {
@@ -485,15 +490,15 @@ public class AgentConfiguration {
.addRoutes(this.routeBuilderForManagedProcessStateUpdate(agent, delegateListener));
camelContext.addRoutes(this.routeBuilderForIncomingRequests(agent, delegateListener));
- inventoryRouteBuilder =
- (this.routeBuilderForNodeInventoryPost(agent,
- common.nodeInventoryEndpoint, Integer.parseInt(common.nodeInventoryPublishRate)));
+ inventoryRouteBuilder = (this.routeBuilderForNodeInventoryPost(agent,
+ common.nodeInventoryEndpoint, Integer.parseInt(common.nodeInventoryPublishRate)));
camelContext.addRoutes(inventoryRouteBuilder);
- logger.info("nodeAgent", null, "------- Agent Initialized - Identity Name:"
- + agent.getIdentity().getCanonicalName() + " IP:" + agent.getIdentity().getIp()
- + " JP State Update Endpoint:" + common.managedProcessStateUpdateEndpoint);
+ logger.info("nodeAgent", null,
+ "------- Agent Initialized - Identity Name:" + agent.getIdentity().getCanonicalName()
+ + " IP:" + agent.getIdentity().getIp() + " JP State Update Endpoint:"
+ + common.managedProcessStateUpdateEndpoint);
return agent;
} catch (Exception e) {
@@ -501,39 +506,44 @@ public class AgentConfiguration {
}
return null;
}
+
public String getInventoryUpdateEndpoint() {
- return common.nodeInventoryEndpoint;
+ return common.nodeInventoryEndpoint;
}
+
public void startNodeMetrics(NodeAgent agent) throws Exception {
- nodeMetricsProcessor.setAgent(agent);
- metricsRouteBuilder = this.routeBuilderForNodeMetricsPost(agent, common.nodeMetricsEndpoint,
- Integer.parseInt(common.nodeMetricsPublishRate));
- camelContext.addRoutes(metricsRouteBuilder);
+ nodeMetricsProcessor.setAgent(agent);
+ metricsRouteBuilder = this.routeBuilderForNodeMetricsPost(agent, common.nodeMetricsEndpoint,
+ Integer.parseInt(common.nodeMetricsPublishRate));
+ camelContext.addRoutes(metricsRouteBuilder);
}
+
public void stopRoutes() throws Exception {
- serviceStateUpdateServer.close();
- List<RouteDefinition> routes =
- camelContext.getRouteDefinitions();
- for( RouteDefinition rd : routes ) {
- logger.info("AgentConfigureation.stopRoutes", null,"Stopping route:"+rd.getLabel()+" : "+rd.getId());
- rd.stop();
- logger.info("AgentConfigureation.stopRoutes", null,"Stopped route:"+rd.getLabel()+" : "+rd.getId());
- }
- // camelContext.stop();
- // logger.info("AgentConfigureation.stopRoutes", null,"Camel Context stopped");
+ serviceStateUpdateServer.close();
+ List<RouteDefinition> routes = camelContext.getRouteDefinitions();
+ for (RouteDefinition rd : routes) {
+ logger.info("AgentConfigureation.stopRoutes", null,
+ "Stopping route:" + rd.getLabel() + " : " + rd.getId());
+ rd.stop();
+ logger.info("AgentConfigureation.stopRoutes", null,
+ "Stopped route:" + rd.getLabel() + " : " + rd.getId());
+ }
+ // camelContext.stop();
+ // logger.info("AgentConfigureation.stopRoutes", null,"Camel Context stopped");
}
+
@Bean
@PostConstruct
public NodeMetricsProcessor nodeMetricsProcessor() throws Exception {
if (Utils.isLinux()) {
- nodeMetricsProcessor = new LinuxNodeMetricsProcessor();
- ((LinuxNodeMetricsProcessor)nodeMetricsProcessor).initMemInfo("/proc/meminfo");
- ((LinuxNodeMetricsProcessor)nodeMetricsProcessor).initLoadAvg("/proc/loadavg");
+ nodeMetricsProcessor = new LinuxNodeMetricsProcessor();
+ ((LinuxNodeMetricsProcessor) nodeMetricsProcessor).initMemInfo("/proc/meminfo");
+ ((LinuxNodeMetricsProcessor) nodeMetricsProcessor).initLoadAvg("/proc/loadavg");
} else {
- nodeMetricsProcessor = new DefaultNodeMetricsProcessor();
+ nodeMetricsProcessor = new DefaultNodeMetricsProcessor();
}
return nodeMetricsProcessor;
}
@@ -549,52 +559,55 @@ public class AgentConfiguration {
}
public DefaultNodeInventoryProcessor nodeInventoryProcessor(NodeAgent agent) {
- if ( Objects.isNull(inventoryProcessor) ) {
- inventoryProcessor = new DefaultNodeInventoryProcessor(agent, inventoryPublishRateSkipCount);
- }
+ if (Objects.isNull(inventoryProcessor)) {
+ inventoryProcessor = new DefaultNodeInventoryProcessor(agent, inventoryPublishRateSkipCount);
+ }
return inventoryProcessor;
}
public void stopInventoryRoute() {
- stopRoute(inventoryRouteBuilder.getRouteCollection(),">>>> Agent Stopped Publishing Inventory");
+ stopRoute(inventoryRouteBuilder.getRouteCollection(),
+ ">>>> Agent Stopped Publishing Inventory");
}
public void stopMetricsRoute() {
- stopRoute(metricsRouteBuilder.getRouteCollection(),">>>> Agent Stopped Publishing Metrics");
-// try {
-// RoutesDefinition rsd = metricsRouteBuilder.getRouteCollection();
-// for (RouteDefinition rd : rsd.getRoutes()) {
-// camelContext.stopRoute(rd.getId());
-// camelContext.removeRoute(rd.getId());
-// logger.error(methodName, null, ">>>> Agent Stopped Metrics Publishing");
-// }
-//
-// } catch (Exception e) {
-// logger.error(methodName, null, e);
-// }
+ stopRoute(metricsRouteBuilder.getRouteCollection(), ">>>> Agent Stopped Publishing Metrics");
+ // try {
+ // RoutesDefinition rsd = metricsRouteBuilder.getRouteCollection();
+ // for (RouteDefinition rd : rsd.getRoutes()) {
+ // camelContext.stopRoute(rd.getId());
+ // camelContext.removeRoute(rd.getId());
+ // logger.error(methodName, null, ">>>> Agent Stopped Metrics Publishing");
+ // }
+ //
+ // } catch (Exception e) {
+ // logger.error(methodName, null, e);
+ // }
}
public void stopRoute(RoutesDefinition rsd, String logMsg) {
- String methodName = "stopRoute";
- try {
- for (RouteDefinition rd : rsd.getRoutes()) {
- camelContext.stopRoute(rd.getId());
- camelContext.removeRoute(rd.getId());
- logger.info(methodName, null, logMsg);
- }
-
- } catch (Exception e) {
- logger.error(methodName, null, e);
- }
- }
+ String methodName = "stopRoute";
+ try {
+ for (RouteDefinition rd : rsd.getRoutes()) {
+ camelContext.stopRoute(rd.getId());
+ camelContext.removeRoute(rd.getId());
+ logger.info(methodName, null, logMsg);
+ }
+
+ } catch (Exception e) {
+ logger.error(methodName, null, e);
+ }
+ }
+
public void stop() {
- try {
- serviceStateUpdateServer.close();
-
- } catch( Exception e) {
-
- }
+ try {
+ serviceStateUpdateServer.close();
+
+ } catch (Exception e) {
+
+ }
}
+
private class DuccNodeFilter implements Predicate {
private NodeAgent agent = null;
@@ -613,9 +626,10 @@ public class AgentConfiguration {
} else {
try {
String nodes = (String) exchange.getIn().getHeader(DuccExchange.TARGET_NODES_HEADER_NAME);
- logger.trace(methodName, null, ">>>>>>>>> Agent: [" + agent.getIdentity().getIp()
- + "] Received a Message. Is Agent target for message:" + result
- + ". Target Agents:" + nodes);
+ logger.trace(methodName, null,
+ ">>>>>>>>> Agent: [" + agent.getIdentity().getIp()
+ + "] Received a Message. Is Agent target for message:" + result
+ + ". Target Agents:" + nodes);
result = Utils.isTargetNodeForMessage(nodes, agent.getIdentity().getNodeIdentities());
} catch (Throwable e) {
e.printStackTrace();
@@ -625,32 +639,36 @@ public class AgentConfiguration {
return result;
}
}
-
+
class ServiceUpdateWorkerThread extends Thread {
- private Socket socket;
- private AgentEventListener updateHandler;
- ServiceUpdateWorkerThread(Socket socket, AgentEventListener l) {
- this.socket = socket;
- updateHandler = l;
- }
-
- public void run() {
- try {
- logger.info("nodeAgent.ServiceUpdateWorkerThread.run()", null,">>>>> Agent Reading from Service Socket");
- DataInputStream dis = new DataInputStream(socket.getInputStream());
- String state = dis.readUTF();
- updateHandler.onProcessStateUpdate(state);
- logger.info("nodeAgent.ServiceUpdateWorkerThread.run()", null,">>>>> Agent Received State Update:"+state);
- } catch( Exception e) {
- logger.error("nodeAgent.ServiceUpdateWorkerThread.run()", null,e);
- } finally {
- try {
- socket.close();
- } catch(Exception e) {
- logger.error("nodeAgent.ServiceUpdateWorkerThread.run()", null,e);
- }
- }
- }
+ private Socket socket;
+
+ private AgentEventListener updateHandler;
+
+ ServiceUpdateWorkerThread(Socket socket, AgentEventListener l) {
+ this.socket = socket;
+ updateHandler = l;
+ }
+
+ public void run() {
+ try {
+ logger.info("nodeAgent.ServiceUpdateWorkerThread.run()", null,
+ ">>>>> Agent Reading from Service Socket");
+ DataInputStream dis = new DataInputStream(socket.getInputStream());
+ String state = dis.readUTF();
+ updateHandler.onProcessStateUpdate(state);
+ logger.info("nodeAgent.ServiceUpdateWorkerThread.run()", null,
+ ">>>>> Agent Received State Update:" + state);
+ } catch (Exception e) {
+ logger.error("nodeAgent.ServiceUpdateWorkerThread.run()", null, e);
+ } finally {
+ try {
+ socket.close();
+ } catch (Exception e) {
+ logger.error("nodeAgent.ServiceUpdateWorkerThread.run()", null, e);
+ }
+ }
+ }
}
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/AbstractManagedService.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/AbstractManagedService.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/AbstractManagedService.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/AbstractManagedService.java Fri Aug 2 17:27:48 2019
@@ -26,110 +26,123 @@ import org.apache.uima.ducc.common.main.
import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
-
public abstract class AbstractManagedService extends AbstractDuccComponent
-implements ManagedService {
- public static final String ManagedServiceNotificationInterval = "uima.process.notify.interval";
- private long notificationInterval = 5000;
- protected ProcessState currentState = ProcessState.Undefined;
- protected ProcessState previousState = ProcessState.Undefined;
- // public static ManagedServiceContext serviceContext=null;
- public boolean useJmx = false;
- public ServiceStateNotificationAdapter serviceAdapter = null;
-
- public abstract void quiesceAndStop();
- public abstract void deploy(String[] args) throws Exception;
-
- protected AbstractManagedService(ServiceStateNotificationAdapter serviceAdapter, CamelContext context) {
- super("UimaProcess", context);
- this.serviceAdapter = serviceAdapter;
- // serviceContext = new ManagedServiceContext(this);
- }
- /**
- * @return the notificationInterval
- */
- public long getNotificationInterval() {
- return notificationInterval;
- }
-
- /**
- * @param notificationInterval
- * the notificationInterval to set
- */
- public void setNotificationInterval(long notificationInterval) {
- this.notificationInterval = notificationInterval;
- }
-
-
- public void initialize() throws Exception {
-
-// ServiceShutdownHook shutdownHook = new ServiceShutdownHook(this);
- // serviceDeployer);
-// Runtime.getRuntime().addShutdownHook(shutdownHook);
-// System.out.println("Managed Service Wrapper Registered Shutdown Hook");
- }
-
- public void notifyAgentWithStatus(ProcessState state) {
- serviceAdapter.notifyAgentWithStatus(state);
- }
- public void notifyAgentWithStatus(ProcessState state, String message) {
- serviceAdapter.notifyAgentWithStatus(state, message);
- }
- public void notifyAgentWithStatus(List<IUimaPipelineAEComponent> pipeline) {
- serviceAdapter.notifyAgentWithStatus(pipeline);
- }
- protected void stopIt() {
- if ( serviceAdapter != null ) {
- //serviceAdapter.stop();
- }
- }
- /**
- * Returns state of this process( INITIALIZING, RUNNING, FAILED, STOPPED )
- */
- public ProcessState getServiceState() {
- return currentState;
- }
- @Override
- public void start(DuccService service, String[] args) throws Exception {
- try {
- super.start(service, args);
- deploy(args);
- } catch( Exception e) {
- currentState = ProcessState.FailedInitialization;
- notifyAgentWithStatus(ProcessState.FailedInitialization);
- throw e;
- }
- }
- public void stop() {
- if ( super.isStopping() ) {
- return; // already stopping - nothing to do
- }
- try {
- System.out.println("... AbstractManagedService - Stopping Service Adapter");
- serviceAdapter.stop();
- System.out.println("... AbstractManagedService - Calling super.stop() ");
- super.stop();
- } catch( Exception e) {
- e.printStackTrace();
- }
- }
- static class ServiceShutdownHook extends Thread {
- private AbstractManagedService managedService;
-
- public ServiceShutdownHook(AbstractManagedService service) {
- this.managedService = service;
- }
-
- public void run() {
- try {
- System.out
- .println("Uima AS Service Wrapper Caught Kill Signal - Initiating Quiesce and Stop");
- managedService.quiesceAndStop();
- managedService.stopIt();
-
- } catch (Exception e) {
- }
- }
- }
+ implements ManagedService {
+ public static final String ManagedServiceNotificationInterval = "uima.process.notify.interval";
+
+ private long notificationInterval = 5000;
+
+ protected ProcessState currentState = ProcessState.Undefined;
+
+ protected ProcessState previousState = ProcessState.Undefined;
+
+ // public static ManagedServiceContext serviceContext=null;
+ public boolean useJmx = false;
+
+ public ServiceStateNotificationAdapter serviceAdapter = null;
+
+ public abstract void quiesceAndStop();
+
+ public abstract void deploy(String[] args) throws Exception;
+
+ protected AbstractManagedService(ServiceStateNotificationAdapter serviceAdapter,
+ CamelContext context) {
+ super("UimaProcess", context);
+ this.serviceAdapter = serviceAdapter;
+ // serviceContext = new ManagedServiceContext(this);
+ }
+
+ /**
+ * @return the notificationInterval
+ */
+ public long getNotificationInterval() {
+ return notificationInterval;
+ }
+
+ /**
+ * @param notificationInterval
+ * the notificationInterval to set
+ */
+ public void setNotificationInterval(long notificationInterval) {
+ this.notificationInterval = notificationInterval;
+ }
+
+ public void initialize() throws Exception {
+
+ // ServiceShutdownHook shutdownHook = new ServiceShutdownHook(this);
+ // serviceDeployer);
+ // Runtime.getRuntime().addShutdownHook(shutdownHook);
+ // System.out.println("Managed Service Wrapper Registered Shutdown Hook");
+ }
+
+ public void notifyAgentWithStatus(ProcessState state) {
+ serviceAdapter.notifyAgentWithStatus(state);
+ }
+
+ public void notifyAgentWithStatus(ProcessState state, String message) {
+ serviceAdapter.notifyAgentWithStatus(state, message);
+ }
+
+ public void notifyAgentWithStatus(List<IUimaPipelineAEComponent> pipeline) {
+ serviceAdapter.notifyAgentWithStatus(pipeline);
+ }
+
+ protected void stopIt() {
+ if (serviceAdapter != null) {
+ // serviceAdapter.stop();
+ }
+ }
+
+ /**
+ * Returns state of this process( INITIALIZING, RUNNING, FAILED, STOPPED )
+ */
+ public ProcessState getServiceState() {
+ return currentState;
+ }
+
+ @Override
+ public void start(DuccService service, String[] args) throws Exception {
+ try {
+ super.start(service, args);
+ deploy(args);
+ } catch (Exception e) {
+ currentState = ProcessState.FailedInitialization;
+ notifyAgentWithStatus(ProcessState.FailedInitialization);
+ throw e;
+ }
+ }
+
+ public void stop() {
+ if (super.isStopping()) {
+ return; // already stopping - nothing to do
+ }
+ try {
+ System.out.println("... AbstractManagedService - Stopping Service Adapter");
+ serviceAdapter.stop();
+ System.out.println("... AbstractManagedService - Calling super.stop() ");
+ super.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ static class ServiceShutdownHook extends Thread {
+ private AbstractManagedService managedService;
+
+ public ServiceShutdownHook(AbstractManagedService service) {
+ this.managedService = service;
+ }
+
+ public void run() {
+ try {
+ System.out.println(
+ "Uima AS Service Wrapper Caught Kill Signal - Initiating Quiesce and Stop");
+ managedService.quiesceAndStop();
+ managedService.stopIt();
+
+ } catch (Exception e) {
+ }
+ }
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/DuccWorkHelper.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/DuccWorkHelper.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/DuccWorkHelper.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/DuccWorkHelper.java Fri Aug 2 17:27:48 2019
@@ -18,6 +18,7 @@
*/
package org.apache.uima.ducc.agent.deploy;
+
import org.apache.uima.ducc.agent.Agent;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
@@ -30,75 +31,73 @@ import org.apache.uima.ducc.transport.ev
public class DuccWorkHelper {
- public static DuccLogger logger = DuccLogger.getLogger(DuccWorkHelper.class, Agent.COMPONENT_NAME);
-
- private IDuccEventDispatcher dispatcher = null;
- private DuccId jobid = null;
- private String orchestrator = "orchestrator";
-
- public DuccWorkHelper() {
- init();
- }
-
- private void init() {
- String location = "init";
- try {
- String targetUrl = getTargetUrl();
- dispatcher = new DuccEventHttpDispatcher(targetUrl);
- }
- catch(Exception e) {
- logger.error(location, jobid, e);
- }
- }
-
- private String getServer() {
- return orchestrator;
- }
-
- private String getTargetUrl() {
- String location = "getTargetUrl";
- String targetUrl = null;
- String server = getServer();
- String host = DuccPropertiesResolver.get("ducc.head");
- String port = DuccPropertiesResolver.get("ducc." + server + ".http.port");
- if ( host == null || port == null ) {
- String message = "ducc." + server + ".http.node and/or .port not set in ducc.properties";
- throw new IllegalStateException(message);
+ public static DuccLogger logger = DuccLogger.getLogger(DuccWorkHelper.class,
+ Agent.COMPONENT_NAME);
+
+ private IDuccEventDispatcher dispatcher = null;
+
+ private DuccId jobid = null;
+
+ private String orchestrator = "orchestrator";
+
+ public DuccWorkHelper() {
+ init();
+ }
+
+ private void init() {
+ String location = "init";
+ try {
+ String targetUrl = getTargetUrl();
+ dispatcher = new DuccEventHttpDispatcher(targetUrl);
+ } catch (Exception e) {
+ logger.error(location, jobid, e);
+ }
+ }
+
+ private String getServer() {
+ return orchestrator;
+ }
+
+ private String getTargetUrl() {
+ String location = "getTargetUrl";
+ String targetUrl = null;
+ String server = getServer();
+ String host = DuccPropertiesResolver.get("ducc.head");
+ String port = DuccPropertiesResolver.get("ducc." + server + ".http.port");
+ if (host == null || port == null) {
+ String message = "ducc." + server + ".http.node and/or .port not set in ducc.properties";
+ throw new IllegalStateException(message);
+ }
+ targetUrl = "http://" + host + ":" + port + "/" + server.substring(0, 2);
+ logger.info(location, jobid, targetUrl);
+ return targetUrl;
+ }
+
+ public IDuccWork fetch(DuccId duccId) {
+ String location = "fetch";
+ IDuccWork dw = null;
+ if (duccId != null) {
+ DuccWorkRequestEvent dwRequestEvent = new DuccWorkRequestEvent(duccId);
+ DuccWorkReplyEvent dwReplyEvent = null;
+ try {
+ dwReplyEvent = (DuccWorkReplyEvent) dispatcher.dispatchAndWaitForDuccReply(dwRequestEvent);
+ if (dwReplyEvent != null) {
+ dw = dwReplyEvent.getDw();
+ if (dw == null) {
+ logger.debug(location, duccId, "value is null");
+ } else {
+ logger.debug(location, duccId, "state is " + dw.getStateObject());
+ }
+ } else {
+ logger.debug(location, duccId, "reply is null");
}
- targetUrl = "http://" + host + ":" + port + "/" + server.substring(0, 2);
- logger.info(location, jobid, targetUrl);
- return targetUrl;
- }
-
- public IDuccWork fetch(DuccId duccId) {
- String location = "fetch";
- IDuccWork dw = null;
- if(duccId != null) {
- DuccWorkRequestEvent dwRequestEvent = new DuccWorkRequestEvent(duccId);
- DuccWorkReplyEvent dwReplyEvent = null;
- try {
- dwReplyEvent = (DuccWorkReplyEvent) dispatcher.dispatchAndWaitForDuccReply(dwRequestEvent);
- if(dwReplyEvent != null) {
- dw = dwReplyEvent.getDw();
- if(dw == null) {
- logger.debug(location, duccId, "value is null");
- }
- else {
- logger.debug(location, duccId, "state is "+dw.getStateObject());
- }
- }
- else {
- logger.debug(location, duccId, "reply is null");
- }
- }
- catch (Exception e) {
- logger.error(location, duccId, "Error while communicating with the OR:\n"+e);
- }
- }
- else {
- logger.debug(location, duccId, "key is null");
- }
- return dw;
- }
-
+ } catch (Exception e) {
+ logger.error(location, duccId, "Error while communicating with the OR:\n" + e);
+ }
+ } else {
+ logger.debug(location, duccId, "key is null");
+ }
+ return dw;
+ }
+
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ManagedService.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ManagedService.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ManagedService.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ManagedService.java Fri Aug 2 17:27:48 2019
@@ -20,9 +20,10 @@ package org.apache.uima.ducc.agent.deplo
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
-
public interface ManagedService {
- public void stopService();
- public void killService();
- public void onServiceStateChange(ProcessState serviceState);
+ public void stopService();
+
+ public void killService();
+
+ public void onServiceStateChange(ProcessState serviceState);
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceAdapter.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceAdapter.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceAdapter.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceAdapter.java Fri Aug 2 17:27:48 2019
@@ -29,98 +29,109 @@ import org.apache.uima.ducc.transport.di
import org.apache.uima.ducc.transport.event.ProcessStateUpdateDuccEvent;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
-
/**
- * Responsible for delegating state changes received from UIMA AS to a JMS endpoint.
+ * Responsible for delegating state changes received from UIMA AS to a JMS endpoint.
*
*/
public class ServiceAdapter implements ServiceStateNotificationAdapter {
- DuccLogger logger = DuccLogger.getLogger(this.getClass(), "UIMA AS Service");
+ DuccLogger logger = DuccLogger.getLogger(this.getClass(), "UIMA AS Service");
+
+ // Dispatcher is responsible for sending state update event to jms endpoint
+ private DuccEventDispatcher dispatcher;
+
+ // Caches process PID
+ private String pid = null;
+
+ // Unique ID assigned to the process. This is different from OS PID
+ private String duccProcessId;
+
+ private ProcessState state;
+
+ private String endpoint;
+
+ private Object stateLock = new Object();
+
+ /**
+ * JMS based adapter C'tor
+ *
+ * @param dispatcher
+ * - initialized instance of {@link DuccEventDispatcher}
+ * @param duccProcessId
+ * - unique ID assigned by Ducc infrastructure
+ */
+ public ServiceAdapter(DuccEventDispatcher dispatcher, String duccProcessId, String endpoint) {
+ this.dispatcher = dispatcher;
+ this.duccProcessId = duccProcessId;
+ this.endpoint = endpoint;
+ }
- // Dispatcher is responsible for sending state update event to jms endpoint
- private DuccEventDispatcher dispatcher;
- // Caches process PID
- private String pid=null;
- // Unique ID assigned to the process. This is different from OS PID
- private String duccProcessId;
-
- private ProcessState state;
-
- private String endpoint;
-
- private Object stateLock = new Object();
-
- /**
- * JMS based adapter C'tor
- *
- * @param dispatcher - initialized instance of {@link DuccEventDispatcher}
- * @param duccProcessId - unique ID assigned by Ducc infrastructure
- */
- public ServiceAdapter(DuccEventDispatcher dispatcher, String duccProcessId, String endpoint) {
- this.dispatcher = dispatcher;
- this.duccProcessId = duccProcessId;
- this.endpoint = endpoint;
- }
- public void notifyAgentWithStatus(ProcessState state) {
- notifyAgentWithStatus(state, null);
- }
- public void notifyAgentWithStatus(ProcessState state, String message) {
- synchronized( stateLock ) {
- this.state = state;
- if ( pid == null ) {
- // Get the PID once and cache for future reference
- pid = Utils.getPID();
- }
- ProcessStateUpdate processUpdate = null;
- if ( message == null ) {
- processUpdate = new ProcessStateUpdate(state, pid, duccProcessId,null);
- } else {
- processUpdate = new ProcessStateUpdate(state, pid, duccProcessId,message, null);
- }
- //System.out.println("................. >>> ProcessStateUpdate==NULL?"+(processUpdate==null)+" JmxUrl="+processJmxUrl);
- if (endpoint != null ) {
- processUpdate.setSocketEndpoint(endpoint);
- }
- this.notifyAgentWithStatus(processUpdate);
- }
- }
- /**
- * Called on UIMA AS status change. Sends a {@link ProcessStateUpdateDuccEvent} message
- * via configured dispatcher to a configured endpoint.
- *
- */
- public void notifyAgentWithStatus(ProcessStateUpdate state) {
- try {
- ProcessStateUpdateDuccEvent duccEvent =
- new ProcessStateUpdateDuccEvent(state);
- logger.info("notifyAgentWithStatus",null," >>>>>>> UIMA AS Service Deployed - PID:"+pid);
+ public void notifyAgentWithStatus(ProcessState state) {
+ notifyAgentWithStatus(state, null);
+ }
- if (endpoint != null ) {
+ public void notifyAgentWithStatus(ProcessState state, String message) {
+ synchronized (stateLock) {
+ this.state = state;
+ if (pid == null) {
+ // Get the PID once and cache for future reference
+ pid = Utils.getPID();
+ }
+ ProcessStateUpdate processUpdate = null;
+ if (message == null) {
+ processUpdate = new ProcessStateUpdate(state, pid, duccProcessId, null);
+ } else {
+ processUpdate = new ProcessStateUpdate(state, pid, duccProcessId, message, null);
+ }
+ // System.out.println("................. >>>
+ // ProcessStateUpdate==NULL?"+(processUpdate==null)+" JmxUrl="+processJmxUrl);
+ if (endpoint != null) {
+ processUpdate.setSocketEndpoint(endpoint);
+ }
+ this.notifyAgentWithStatus(processUpdate);
+ }
+ }
+
+ /**
+ * Called on UIMA AS status change. Sends a {@link ProcessStateUpdateDuccEvent} message via
+ * configured dispatcher to a configured endpoint.
+ *
+ */
+ public void notifyAgentWithStatus(ProcessStateUpdate state) {
+ try {
+ ProcessStateUpdateDuccEvent duccEvent = new ProcessStateUpdateDuccEvent(state);
+ logger.info("notifyAgentWithStatus", null, " >>>>>>> UIMA AS Service Deployed - PID:" + pid);
+
+ if (endpoint != null) {
state.setSocketEndpoint(endpoint);
}
- // send the process update to the remote
- dispatcher.dispatch(duccEvent, System.getenv(IDuccUser.EnvironmentVariable.DUCC_IP.value()));
- String jmx = state.getProcessJmxUrl() == null ? "N/A" : state.getProcessJmxUrl();
- logger.info("notifyAgentWithStatus",null,"... UIMA AS Service Deployed - PID:"+pid+". Service State: "+state+". JMX Url:"+jmx+" Dispatched State Update Event to Agent with IP:"+System.getenv(IDuccUser.EnvironmentVariable.DUCC_IP.value()));
- } catch( Exception e) {
- e.printStackTrace();
- }
- }
- public void notifyAgentWithStatus(List<IUimaPipelineAEComponent> pipeline) {
- synchronized( stateLock ) {
- // Only send update if the AE is initializing
- if ( state.equals(ProcessState.Initializing)) {
- try {
- ProcessStateUpdate processUpdate =
- new ProcessStateUpdate(state, pid, duccProcessId, null, pipeline);
- notifyAgentWithStatus(processUpdate);
- } catch( Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- public void stop() throws Exception {
- dispatcher.stop();
- }
+ // send the process update to the remote
+ dispatcher.dispatch(duccEvent, System.getenv(IDuccUser.EnvironmentVariable.DUCC_IP.value()));
+ String jmx = state.getProcessJmxUrl() == null ? "N/A" : state.getProcessJmxUrl();
+ logger.info("notifyAgentWithStatus", null,
+ "... UIMA AS Service Deployed - PID:" + pid + ". Service State: " + state
+ + ". JMX Url:" + jmx + " Dispatched State Update Event to Agent with IP:"
+ + System.getenv(IDuccUser.EnvironmentVariable.DUCC_IP.value()));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void notifyAgentWithStatus(List<IUimaPipelineAEComponent> pipeline) {
+ synchronized (stateLock) {
+ // Only send update if the AE is initializing
+ if (state.equals(ProcessState.Initializing)) {
+ try {
+ ProcessStateUpdate processUpdate = new ProcessStateUpdate(state, pid, duccProcessId, null,
+ pipeline);
+ notifyAgentWithStatus(processUpdate);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ public void stop() throws Exception {
+ dispatcher.stop();
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java?rev=1864247&r1=1864246&r2=1864247&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java Fri Aug 2 17:27:48 2019
@@ -24,8 +24,11 @@ import org.apache.uima.ducc.transport.ag
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
public interface ServiceStateNotificationAdapter {
- public void notifyAgentWithStatus(ProcessState state);
- public void notifyAgentWithStatus(ProcessState state, String message);
- public void notifyAgentWithStatus(List<IUimaPipelineAEComponent> pipeline);
- public void stop() throws Exception;
+ public void notifyAgentWithStatus(ProcessState state);
+
+ public void notifyAgentWithStatus(ProcessState state, String message);
+
+ public void notifyAgentWithStatus(List<IUimaPipelineAEComponent> pipeline);
+
+ public void stop() throws Exception;
}