You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:02:19 UTC
svn commit: r1427906 [2/5] - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-agent: main/ main/java/
main/java/org/ main/java/org/apache/ main/java/org/apache/uima/
main/java/org/apache/uima/ducc/ main/java/org/apache/uima/ducc/agent/
main/java/org/apache/u...
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/RogueProcessReaper.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/RogueProcessReaper.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/RogueProcessReaper.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/RogueProcessReaper.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.uima.ducc.common.node.metrics.NodeUsersInfo;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+
+
+/**
+ * Manages rogue processes on a node.
+ *
+ *
+ */
+public class RogueProcessReaper {
+
+ private Map<String, RogueProcessEntry> userRogueProcessMap = new TreeMap<String, RogueProcessEntry>();
+
+ private int counterValue = 5;
+
+ private int cleanupCounterValue = 5;
+
+ int maxSecondsBeforeEntryExpires = 120; // number of seconds a process entry is kept in
+ // the rogue process map before it is removed.
+ // Default: 2 minutes
+
+ private DuccLogger logger;
+
+ boolean doKillRogueProcess = false;
+
+ public RogueProcessReaper(DuccLogger logger, int counterValue, int cleanupCounterValue) {
+ this.counterValue = counterValue;
+ if (cleanupCounterValue > 0) {
+ this.cleanupCounterValue = cleanupCounterValue;
+ } else {
+ this.cleanupCounterValue = counterValue + 5;
+ }
+ // check if purge delay is defined in ducc.properties.
+ if (System.getProperty("ducc.agent.rogue.process.purge.delay") != null) {
+ try {
+ maxSecondsBeforeEntryExpires = Integer.valueOf(System
+ .getProperty("ducc.agent.rogue.process.purge.delay"));
+ } catch (Exception e) {
+ if ( logger == null ) {
+ e.printStackTrace();
+ } else {
+ logger.error("RogueProcessReaper.ctor", null, e);
+ }
+ maxSecondsBeforeEntryExpires = 120; // defaulting to 2 minutes
+ }
+ }
+ this.logger = logger;
+ final String kill = System.getProperty("ducc.agent.rogue.process.kill");
+
+ if (kill != null && Boolean.getBoolean(kill) == true) {
+ doKillRogueProcess = true;
+ }
+ if ( logger == null ) {
+ System.out.println(
+ "ducc.agent.rogue.process.kill=" + doKillRogueProcess);
+
+ } else {
+ logger.info("RogueProcessReaper.ctor", null,
+ "ducc.agent.rogue.process.kill=" + doKillRogueProcess);
+
+ }
+
+ }
+
+ public void submitRogueProcessForKill(String user, String pid, boolean isJava) {
+ final String methodName = "RogueProcessReaper.submitRogueProcessForKill";
+ RogueProcessEntry entry = null;
+ if (userRogueProcessMap.containsKey(pid)) {
+ entry = userRogueProcessMap.get(pid);
+ } else {
+ if (cleanupCounterValue <= counterValue) {
+ cleanupCounterValue += counterValue;
+ }
+ entry = new RogueProcessEntry(counterValue, cleanupCounterValue, user,
+ maxSecondsBeforeEntryExpires, isJava);
+ userRogueProcessMap.put(pid, entry);
+ }
+ entry.markAsRogue(3);
+ if ( !entry.isRogue() ) {
+ if ( logger == null ) {
+ System.out.println(
+ "PID:" + pid+" Not Rogue Yet - It takes 3 iterations to make it Rogue");
+
+ } else {
+ logger.info("submitRogueProcessForKill", null,
+ "PID:" + pid+" Not Rogue Yet - It takes 3 iterations to make it Rogue");
+
+ }
+ return;
+ }
+ if ( doKillRogueProcess ) {
+ try {
+ // Dont kill the process immediately. Kill if this method is called "counterValue"
+ // number of times.
+ long counter;
+ if ((counter = entry.countDown()) == 0 && !entry.isKilled()) {
+ if ( logger == null ) {
+ System.out.println(
+ "Process Scheduled for Kill PID:" + pid + " Owner:" + user
+ + " ");
+
+ } else {
+ logger.info(methodName, null, "Process Scheduled for Kill PID:" + pid + " Owner:" + user
+ + " ");
+
+ }
+ kill(user, pid);
+ entry.killed();
+ } else {
+ if ( logger == null ) {
+ System.out.println(
+ "Process ***NOT*** Scheduled for Kill PID:" + pid + " Owner:"
+ + user + " Call:" + (counterValue - counter) + " of " + counterValue);
+
+ } else {
+ logger.info(methodName, null, "Process ***NOT*** Scheduled for Kill PID:" + pid + " Owner:"
+ + user + " Call:" + (counterValue - counter) + " of " + counterValue);
+
+ }
+ }
+
+ if (entry.isKilled() && entry.countDownCleanupCounter() == 0) {
+ if ( logger == null ) {
+ System.out.println(
+ "Removing Entry From RougeProcessMap for PID:" + pid
+ + " Owner:" + user);
+
+ } else {
+ logger.info(methodName, null, "Removing Entry From RougeProcessMap for PID:" + pid
+ + " Owner:" + user);
+
+ }
+ userRogueProcessMap.remove(pid);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ } else {
+ if ( logger == null ) {
+ System.out.println(
+ "Ducc Not Configured to Kill Rogue Proces (PID:)" + pid + " Owner:" + user +
+ ". Change (or define) ducc.agent.rogue.process.kill property in ducc.properties if you want rogue processes to be cleaned up.");
+
+ } else {
+ logger.info(methodName, null, "Ducc Not Configured to Kill Rogue Proces (PID:)" + pid + " Owner:" + user +
+ ". Change (or define) ducc.agent.rogue.process.kill property in ducc.properties if you want rogue processes to be cleaned up.");
+
+ }
+ }
+ if ( logger == null ) {
+ System.out.println(
+ "UserRougeProcessMap size:" + userRogueProcessMap.size());
+
+ } else {
+ logger.info(methodName, null, "UserRougeProcessMap size:" + userRogueProcessMap.size());
+
+ }
+ }
+
+ public List<String> getUserRogueProcesses(String user) {
+ List<String> rogues = new ArrayList<String>();
+ for (Map.Entry<String, RogueProcessEntry> entry : userRogueProcessMap.entrySet()) {
+ if (entry.getValue().getUser().equals(user) && entry.getValue().isRogue() ) {
+ rogues.add(entry.getKey());
+ }
+ }
+ return rogues;
+ }
+ public boolean removeRogueProcess(String pid) {
+ if ( userRogueProcessMap.containsKey(pid)) {
+ userRogueProcessMap.remove(pid);
+ return true;
+ }
+ return false;
+ }
+ public void removeDeadRogueProcesses(List<String> currentPids) {
+ List<String> deadPIDs = new ArrayList<String>();
+
+ for (Map.Entry<String, RogueProcessEntry> entry : userRogueProcessMap.entrySet()) {
+ if ( !currentPids.contains(entry.getKey())) {
+ deadPIDs.add(entry.getKey());
+ }
+ }
+ for( String deadPID : deadPIDs) {
+ userRogueProcessMap.remove(deadPID);
+ }
+ }
+ public void copyAllUserRogueProcesses(TreeMap<String, NodeUsersInfo> map) {
+ // List containing old entries which should be deleted from userRogueProcessMap
+ List<String> entryCleanupList = new ArrayList<String>();
+
+ for (Map.Entry<String, RogueProcessEntry> entry : userRogueProcessMap.entrySet()) {
+ if ( !entry.getValue().isRogue() ) {
+ continue;
+ }
+ NodeUsersInfo nui;
+ if (map.containsKey(entry.getValue().getUser())) {
+ nui = map.get(entry.getValue().getUser());
+ } else {
+ nui = new NodeUsersInfo(entry.getValue().getUser());
+ map.put(entry.getValue().getUser(), nui);
+ }
+ nui.addRogueProcess(entry.getKey(), entry.getValue().isJava());
+ }
+ for (String entryToRemove : entryCleanupList) {
+ if ( logger == null ) {
+ System.out.println(
+ "Removing Expired Entry From RogueProcessMap for PID:" + entryToRemove);
+
+ } else {
+ logger.info("copyAllUserRogueProcesses", null,
+ "Removing Expired Entry From RogueProcessMap for PID:" + entryToRemove);
+
+ }
+ userRogueProcessMap.remove(entryToRemove);
+ }
+ }
+ /**
+ * This method checks if ducc is configured to kill rogue processes and if so, proceeds to
+ * kill via -9.
+ *
+ * @param user - process owner
+ * @param pid - process id
+ * @return - true if the process has been killed, false otherwise
+ * @throws Exception
+ */
+ public void kill(final String user, final String pid) throws Exception {
+ final String methodName = "RogueProcessReaper.kill.run()";
+
+ new Thread(new Runnable() {
+ public void run() {
+ try {
+ String c_launcher_path = Utils.resolvePlaceholderIfExists(
+ System.getProperty("ducc.agent.launcher.ducc_spawn_path"), System.getProperties());
+ String cmdLine;
+ String arg;
+ boolean useDuccling = false;
+ if (Utils.isWindows()) {
+ cmdLine = "taskkill";
+ arg = "/PID";
+ } else {
+ String useSpawn = System.getProperty("ducc.agent.launcher.use.ducc_spawn");
+ if (useSpawn != null && useSpawn.toLowerCase().equals("true")) {
+ useDuccling = true;
+ }
+ cmdLine = "/bin/kill";
+ arg = "-9";
+ }
+ String[] duccling_nolog;
+ if (useDuccling) {
+ duccling_nolog = new String[] { c_launcher_path, "-u", user, "--", cmdLine, arg, pid };
+ } else {
+ duccling_nolog = new String[] { cmdLine, arg, pid };
+ }
+
+// if (kill != null && Boolean.parseBoolean(kill) == true) {
+ ProcessBuilder pb = new ProcessBuilder(duccling_nolog);
+ pb.redirectErrorStream(true);
+ Process killedProcess = pb.start();
+ InputStream is = killedProcess.getInputStream();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+ String line = null;
+ // read the next line from kill command
+ while ((line = reader.readLine()) != null) {
+ // dont care about the output, just drain the buffers
+ }
+ is.close();
+ StringBuffer sb = new StringBuffer();
+ for (String part : duccling_nolog) {
+ sb.append(part).append(" ");
+ }
+ if ( logger == null ) {
+ System.out.println(
+ "--------- Killed Process:" + pid + " Owned by:" + user
+ + " Command:" + sb.toString());
+
+ } else {
+ logger.info(methodName, null, "--------- Killed Process:" + pid + " Owned by:" + user
+ + " Command:" + sb.toString());
+
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }).start();
+ }
+
+ private static class RogueProcessEntry {
+ CountDownLatch counter;
+
+ CountDownLatch cleanupCounter;
+
+ String user;
+
+ boolean killed;
+
+ boolean java;
+
+ AtomicInteger pendingCounter = new AtomicInteger(1);
+ boolean rogue;
+
+ public RogueProcessEntry(int counterValue, int cleanupCounterValue, String user,
+ int maxSecondsBeforeEntryExpires, boolean isJava) {
+ counter = new CountDownLatch(counterValue);
+ cleanupCounter = new CountDownLatch(cleanupCounterValue);
+ this.user = user;
+ this.java = isJava;
+ }
+
+ public boolean isRogue() {
+ return rogue;
+ }
+ public void killed() {
+ killed = true;
+ }
+
+ public boolean isKilled() {
+ return killed;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public long countDown() {
+ counter.countDown();
+ return counter.getCount();
+ }
+
+ public long countDownCleanupCounter() {
+ cleanupCounter.countDown();
+ return cleanupCounter.getCount();
+ }
+
+ public void markAsRogue(int ceiling) {
+ if ( pendingCounter.get() < ceiling ) {
+ pendingCounter.addAndGet(1);
+ } else {
+ rogue = true;
+ }
+ }
+
+ public boolean isJava() {
+ return java;
+ }
+
+ }
+
+
+ public static void main(String[] args) {
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/RogueProcessReaper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.config;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.dataformat.xstream.XStreamDataFormat;
+import org.apache.camel.impl.DefaultClassResolver;
+import org.apache.uima.ducc.agent.NodeAgent;
+import org.apache.uima.ducc.agent.event.AgentEventListener;
+import org.apache.uima.ducc.agent.launcher.Launcher;
+import org.apache.uima.ducc.agent.launcher.ManagedProcess;
+import org.apache.uima.ducc.agent.processors.DefaultNodeInventoryProcessor;
+import org.apache.uima.ducc.agent.processors.DefaultNodeMetricsProcessor;
+import org.apache.uima.ducc.agent.processors.DefaultProcessMetricsProcessor;
+import org.apache.uima.ducc.agent.processors.LinuxNodeMetricsProcessor;
+import org.apache.uima.ducc.agent.processors.LinuxProcessMetricsProcessor;
+import org.apache.uima.ducc.agent.processors.NodeInventoryProcessor;
+import org.apache.uima.ducc.agent.processors.NodeMetricsProcessor;
+import org.apache.uima.ducc.agent.processors.ProcessMetricsProcessor;
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.config.DuccBlastGuardPredicate;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.transport.DuccExchange;
+import org.apache.uima.ducc.transport.DuccTransportConfiguration;
+import org.apache.uima.ducc.transport.agent.NodeMetricsConfiguration;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+//import org.apache.uima.ducc.agent.event.AgentPingEvent;
+import com.thoughtworks.xstream.XStream;
+
+@Configuration
+
+@Import({ DuccTransportConfiguration.class,
+ CommonConfiguration.class, NodeMetricsConfiguration.class })
+public class AgentConfiguration {
+ DuccLogger logger = new DuccLogger(this.getClass(), "Agent");
+ // fetch the name of an endpoint where the JM expects incoming requests
+// @Value("#{ systemProperties['IP'] }")
+ public String ip = System.getenv("IP");
+ public String nodeName = System.getenv("NodeName");
+
+// public static String agentPingEnpoint = "activemq:topic:agent.ping.endpoint";
+// public static String agentPingSelectorName="agent_ip";
+// public static String agentPingSelector;
+
+ //private DuccEventDispatcher agentPingDispatcher;
+
+ @Value("#{ systemProperties['ducc.agent.launcher.thread.pool.size'] }")
+ String launcherThreadPoolSize;
+
+ @Value("#{ systemProperties['ducc.agent.launcher.process.stop.timeout'] }")
+ public String processStopTimeout;
+
+ @Value("#{ systemProperties['ducc.agent.node.inventory.publish.rate.skip'] }")
+ public String inventoryPublishRateSkipCount;
+
+ // Get comma separated list of processes to ignore while detecting rogue processes
+ @Value("#{ systemProperties['ducc.agent.rogue.process.exclusion.filter'] }")
+ public String processExclusionList;
+
+ // Get comma separated list of users to ignore while detecting rogue processes
+ @Value("#{ systemProperties['ducc.agent.rogue.process.user.exclusion.filter'] }")
+ public String userExclusionList;
+ @Autowired
+ DuccTransportConfiguration agentTransport;
+ @Autowired
+ NodeMetricsConfiguration nodeMetrics;
+ @Autowired
+ CommonConfiguration common;
+ /**
+ * Creates {@code AgentEventListener} that will handle incoming messages.
+ *
+ * @param agent - {@code NodeAgent} instance to initialize the listener
+ *
+ * @return {@code AgentEventListener} instance
+ */
+ public AgentEventListener agentDelegateListener(NodeAgent agent) {
+ return new AgentEventListener(agent);
+ }
+ /**
+ * Creates Camel Router to generate Node Metrics at regular intervals.
+ *
+ * @param targetEndpointToReceiveNodeMetricsUpdate - where to post NodeMetrics
+ * @param nodeMetricsPublishRate - how to publish NodeMetrics
+ * @return - {@code RouteBuilder} instance
+ *
+ * @throws Exception
+ */
+ private RouteBuilder routeBuilderForNodeMetricsPost(final NodeAgent agent, final String targetEndpointToReceiveNodeMetricsUpdate, final int nodeMetricsPublishRate) throws Exception {
+ final Processor nmp = nodeMetricsProcessor(agent);
+ final Predicate blastFilter = new DuccBlastGuardPredicate(agent.logger);
+ final Processor cp = new ConfirmProcessor();
+ return new RouteBuilder() {
+ public void configure() {
+ onException(Exception.class).handled(true).process(new ErrorProcessor());
+ from("timer:nodeMetricsTimer?fixedRate=true&period=" + nodeMetricsPublishRate)
+ .routeId("NodeMetricsPostRoute")
+
+ // This route uses a filter to prevent sudden bursts of messages which
+ // may flood DUCC daemons causing chaos. The filter disposes any message
+ // that appears in a window of 1 sec or less.
+ .filter(blastFilter)
+ .process(nmp)
+ .to(targetEndpointToReceiveNodeMetricsUpdate)
+ .process(cp);
+ }
+ };
+ }
+
+ /**
+ * Creates Camel Router to generate Node Metrics at regular intervals.
+ *
+ * @param targetEndpointToReceiveNodeMetricsUpdate - where to post NodeMetrics
+ * @param nodeMetricsPublishRate - how to publish NodeMetrics
+ * @return - {@code RouteBuilder} instance
+ *
+ * @throws Exception
+ */
+ private RouteBuilder routeBuilderForNodeInventoryPost(final NodeAgent agent, final String targetEndpointToReceiveNodeInventoryUpdate, final int nodeInventoryPublishRate) throws Exception {
+ final Processor nmp = nodeInventoryProcessor(agent);
+ return new RouteBuilder() {
+ public void configure() {
+ final Predicate bodyNotNull = body().isNotNull();
+
+ final Predicate blastGuard = new DuccBlastGuardPredicate(agent.logger);
+ onException(Exception.class).maximumRedeliveries(0).handled(true).process(new ErrorProcessor());
+
+ from("timer:nodeInventoryTimer?fixedRate=true&period=" + nodeInventoryPublishRate)
+ .routeId("NodeInventoryPostRoute")
+ // This route uses a filter to prevent sudden bursts of messages which
+ // may flood DUCC daemons causing chaos. The filter disposes any message
+ // that appears in a window of 1 sec or less.
+ .filter(blastGuard)
+ // add inventory to the body of the message
+ .process(nmp)
+ // filter out messages with no body. Since this route is on a timer
+ // it keeps generating flow of messages. However, the agent only
+ // publishes inventory if there is a change or configured number of
+ // epochs has passed. Otherwise, the agent puts null in the body of
+ // the message and this route should just throw it away.
+ .filter(bodyNotNull)
+ .to(targetEndpointToReceiveNodeInventoryUpdate);
+ }
+ };
+ }
+ /**
+ * Creates Camel Router to handle incoming messages
+ *
+ * @param delegate - {@code AgentEventListener} to delegate messages to
+ *
+ * @return {@code RouteBuilder} instance
+ */
+ public synchronized RouteBuilder routeBuilderForIncomingRequests(final NodeAgent agent, final AgentEventListener delegate) {
+ return new RouteBuilder() {
+ public void configure() {
+ onException(Throwable.class).maximumRedeliveries(0).handled(false).process(new ErrorProcessor());
+ from(common.agentRequestEndpoint)
+ .routeId("IncomingRequestsRoute")
+ //.process(new DebugProcessor())
+ .bean(delegate);
+ }
+ };
+ }
+ /**
+ * Creates Camel Router to handle incoming messages
+ *
+ * @param delegate - {@code AgentEventListener} to delegate messages to
+ *
+ * @return {@code RouteBuilder} instance
+ */
+ public synchronized RouteBuilder routeBuilderForManagedProcessStateUpdate(final NodeAgent agent, final AgentEventListener delegate) {
+ return new RouteBuilder() {
+
+ // Custom filter to select messages that are targeted for this agent
+ // Checks the node list in a message to determine if this agent is
+ // the target.
+ Predicate filter = new DuccNodeFilter(agent);
+ public void configure() {
+ onException(Throwable.class).
+ maximumRedeliveries(0).
+ handled(true).
+ process(new ErrorProcessor())
+ .stop();
+
+ from(common.managedProcessStateUpdateEndpoint)
+ .routeId("ManageProcessStateUpdateRoute")
+ //.process(new StateUpdateDebugProcessor(logger))
+ .choice().when(filter)
+ .bean(delegate)
+ .end();
+ }
+ };
+ }
+
+// private RouteBuilder routeBuilderForNodePing(final NodeAgent agent, final String targetEndpoint) throws Exception {
+// return new RouteBuilder() {
+// PingProcessor pingProcessor = new PingProcessor(agent);
+// public void configure() {
+// System.out.println("Agent Listening on Ping Endpoint:"+targetEndpoint);
+// onException(Exception.class).handled(true).process(new ErrorProcessor());
+// from(targetEndpoint)
+// .routeId("NodePingRoute")
+// .process( pingProcessor);
+// }
+// };
+// }
+
+
+ public class DebugProcessor implements Processor {
+
+ public void process(Exchange exchange) throws Exception {
+ String methodName="process";
+// if ( logger.isLevelEnabled(Level.TRACE) ) {
+ XStreamDataFormat xStreamDataFormat = new XStreamDataFormat();
+ XStream xStream = xStreamDataFormat.getXStream(new DefaultClassResolver());
+ String marshalledEvent = xStream.toXML(exchange.getIn().getBody());
+ logger.info(methodName, null, marshalledEvent);
+// }
+ }
+
+ }
+
+ public static class ConfirmProcessor implements Processor {
+ boolean first = true;
+ public void process(Exchange exchange) throws Exception {
+// if ( first ) {
+// synchronized(this) {
+// this.wait(20000);
+// }
+// first = false;
+// }
+
+// XStreamDataFormat xStreamDataFormat = new XStreamDataFormat();
+// XStream xStream = xStreamDataFormat.getXStream(new DefaultClassResolver());
+// String marshalledEvent = xStream.toXML(exchange.getIn().getBody());
+//
+// System.out.println("Agent Published Metrics:\n"+
+// marshalledEvent);
+
+ }
+ }
+ public static class StateUpdateDebugProcessor implements Processor {
+ DuccLogger logger;
+
+ StateUpdateDebugProcessor(DuccLogger logger ) {
+ this.logger = logger;
+ }
+ public void process(Exchange exchange) throws Exception {
+ Map<String, Object> map = exchange.getIn().getHeaders();
+ StringBuffer sb = new StringBuffer();
+ for( Entry<String,Object> entry : map.entrySet()) {
+ sb.append(entry.getKey()).append("=").append(entry.getValue()).append("\n");
+ }
+ logger.info("StateUpdateDebugProcessor.process", null, "Headers:\n\t"+sb.toString());
+ //System.out.println("\t\tAgent received state update from managed process");
+
+ }
+ }
+
+ public class ErrorProcessor implements Processor {
+
+ public void process(Exchange exchange) throws Exception {
+ // the caused by exception is stored in a property on the exchange
+ Throwable caused = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
+ logger.error("ErrorProcessor.process", null, caused);
+ //assertNotNull(caused);
+ // here you can do what you want, but Camel regard this exception as handled, and
+ // this processor as a failurehandler, so it wont do redeliveries. So this is the
+ // end of this route. But if we want to route it somewhere we can just get a
+ // producer template and send it.
+
+ // send it to our mock endpoint
+ //exchange.getContext().createProducerTemplate().send("mock:myerror", exchange);
+ }
+ }
+ public static class TransportProcessor implements Processor {
+
+ public void process(Exchange exchange) throws Exception {
+ try {
+ System.out.println(">>> Agent Received Message of type:"+exchange.getIn().getBody().getClass().getName());
+ } catch( Exception e ) {
+ e.printStackTrace();
+ }
+// Destination replyTo = exchange.getIn().getHeader("JMSReplyTo",
+// Destination.class);
+ // System.out.println("... transport - value of replyTo:" +
+ // replyTo);
+ }
+
+ }
+// public static class PingProcessor implements Processor {
+// private NodeAgent agent;
+//
+// PingProcessor(NodeAgent agent ) {
+// this.agent = agent;
+// }
+// public void process(Exchange exchange) throws Exception {
+// try {
+// agent.ping((AgentPingEvent)exchange.getIn().getBody());
+// } catch( Exception e ) {
+// e.printStackTrace();
+// }
+// }
+//
+// }
+ private NodeIdentity nodeIdentity() throws Exception {
+ NodeIdentity ni = null;
+
+ if ( ip != null ) {
+ // Inject IP to enable deployment of multiple Agents on the same node with
+ // different identity
+ ni = new NodeIdentity(ip,nodeName); // this should only be used for testing
+ } else {
+ ni = new NodeIdentity();
+ }
+ return ni;
+ }
+
+ private Launcher launcher() {
+ return new Launcher();
+ }
+ public DuccEventDispatcher getCommonProcessDispatcher(CamelContext camelContext) throws Exception {
+ return agentTransport.duccEventDispatcher(common.managedServiceEndpoint, camelContext);
+ }
+ @Bean
+ public NodeAgent nodeAgent() throws Exception {
+ try {
+ CamelContext camelContext = common.camelContext();
+ camelContext.disableJMX();
+
+ NodeAgent agent = new NodeAgent(nodeIdentity(), launcher(), camelContext, this);
+// agentPingSelector = agentPingSelectorName+"='"+agent.getIdentity().getIp()+"'";
+ // optionally configures Camel Context for JMS. Checks the 'agentRequestEndpoint' to
+ // to determine type of transport. If the the endpoint starts with "activemq:", a
+ // special ActiveMQ component will be activated to enable JMS transport
+ agentTransport.configureJMSTransport(common.agentRequestEndpoint,camelContext);
+ AgentEventListener delegateListener = agentDelegateListener(agent);
+// agentPingDispatcher =
+// agentTransport.duccEventDispatcher(agentPingEnpoint, camelContext);
+
+ if ( common.managedProcessStateUpdateEndpointType != null && common.managedProcessStateUpdateEndpointType.equalsIgnoreCase("socket") ) {
+ String agentSocketParams = "";
+ if ( common.managedProcessStateUpdateEndpointParams != null ) {
+ agentSocketParams = "?"+common.managedProcessStateUpdateEndpointParams;
+ }
+ int agentPort = Utils.findFreePort();
+ common.managedProcessStateUpdateEndpoint = "mina:tcp://localhost:"+agentPort+agentSocketParams;
+ // Remember the agent port since we need to tell JPs where to send their state updates
+ System.setProperty(NodeAgent.ProcessStateUpdatePort, String.valueOf(agentPort));
+ }
+ camelContext.addRoutes(this.routeBuilderForManagedProcessStateUpdate(agent,delegateListener));
+ camelContext.addRoutes(this.routeBuilderForIncomingRequests(agent,delegateListener));
+ camelContext.addRoutes(this.routeBuilderForNodeInventoryPost(agent, common.nodeInventoryEndpoint, Integer.parseInt(common.nodeInventoryPublishRate)));
+ camelContext.addRoutes(this.routeBuilderForNodeMetricsPost(agent, common.nodeMetricsEndpoint, Integer.parseInt(common.nodeMetricsPublishRate)));
+// camelContext.addRoutes(this.routeBuilderForNodePing(agent,agentPingEnpoint+"?selector="+agentPingSelector) );
+
+ logger.info("nodeAgent", null,"------- Agent Initialized - Identity Name:"+agent.getIdentity().getName()+" IP:"+agent.getIdentity().getIp()+" JP State Update Endpoint:"+common.managedProcessStateUpdateEndpoint);
+ return agent;
+
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @Bean
+ public NodeMetricsProcessor nodeMetricsProcessor(NodeAgent agent) throws Exception {
+ if (Utils.isLinux()) {
+ return new LinuxNodeMetricsProcessor(agent, "/proc/meminfo", "/proc/loadavg");
+ } else {
+ return new DefaultNodeMetricsProcessor(agent);
+ }
+ }
+ public ProcessMetricsProcessor processMetricsProcessor(NodeAgent agent, IDuccProcess process, ManagedProcess managedProcess) throws Exception {
+ if (Utils.isLinux()) {
+ return new LinuxProcessMetricsProcessor(logger, process, agent, "/proc/"+process.getPID()+"/statm", "/proc/stat", "/proc/"+process.getPID()+"/stat", managedProcess);
+ } else {
+ return new DefaultProcessMetricsProcessor(process, agent);
+ }
+
+ }
+ public NodeInventoryProcessor nodeInventoryProcessor(NodeAgent agent) {
+ return new DefaultNodeInventoryProcessor(agent, inventoryPublishRateSkipCount);
+ }
+ private class DuccNodeFilter implements Predicate {
+ private NodeAgent agent = null;
+ public DuccNodeFilter(NodeAgent agent) {
+ this.agent = agent;
+ }
+ public synchronized boolean matches(Exchange exchange) {
+ String methodName="DuccNodeFilter.matches";
+ boolean result = false;
+ if ( common.managedProcessStateUpdateEndpoint.startsWith("mina")) {
+ // mina is a socket component with point-to-point semantics thus
+ // the client always sends a message to the correct agent. No reason
+ // to determine if this is a target agent.
+ result = true;
+ } else {
+ try {
+ String nodes = (String)exchange.getIn().getHeader(DuccExchange.TARGET_NODES_HEADER_NAME);
+ logger.trace(methodName, null, ">>>>>>>>> Agent: ["+agent.getIdentity().getIp()+"] Received a Message. Is Agent target for message:"+result+". Target Agents:"+nodes);
+ result = Utils.isTargetNodeForMessage(nodes, agent.getIdentity().getNodeIdentities());
+ } catch( Throwable e) {
+ e.printStackTrace();
+ logger.error(methodName, null, e, new Object[] {});
+ }
+ }
+ return result;
+ }
+ }
+// public DuccEventDispatcher getAgentPingDispatcher() {
+// return agentPingDispatcher;
+// }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/AbstractManagedService.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/AbstractManagedService.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/AbstractManagedService.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/AbstractManagedService.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.deploy;
+
+import java.util.List;
+
+import org.apache.camel.CamelContext;
+import org.apache.uima.ducc.common.component.AbstractDuccComponent;
+import org.apache.uima.ducc.common.main.DuccService;
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+public abstract class AbstractManagedService extends AbstractDuccComponent
+implements ManagedService {
+ public static final String ManagedServiceNotificationInterval = "uima.process.notify.interval";
+ private long notificationInterval = 5000;
+ protected ProcessState currentState = ProcessState.Undefined;
+ protected ProcessState previousState = ProcessState.Undefined;
+ // public static ManagedServiceContext serviceContext=null;
+ public boolean useJmx = false;
+ public ServiceStateNotificationAdapter serviceAdapter = null;
+
+ public abstract void quiesceAndStop();
+ public abstract void deploy(String[] args) throws Exception;
+
+ protected AbstractManagedService(ServiceStateNotificationAdapter serviceAdapter, CamelContext context) {
+ super("UimaProcess", context);
+ this.serviceAdapter = serviceAdapter;
+ // serviceContext = new ManagedServiceContext(this);
+ }
+ /**
+ * @return the notificationInterval
+ */
+ public long getNotificationInterval() {
+ return notificationInterval;
+ }
+
+ /**
+ * @param notificationInterval
+ * the notificationInterval to set
+ */
+ public void setNotificationInterval(long notificationInterval) {
+ this.notificationInterval = notificationInterval;
+ }
+
+
+ public void initialize() throws Exception {
+
+ ServiceShutdownHook shutdownHook = new ServiceShutdownHook(this);
+ // serviceDeployer);
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ System.out.println("Managed Service Wrapper Registered Shutdown Hook");
+ }
+
+ public void notifyAgentWithStatus(ProcessState state) {
+ serviceAdapter.notifyAgentWithStatus(state);
+ }
+ public void notifyAgentWithStatus(ProcessState state, String processJmxUrl) {
+ serviceAdapter.notifyAgentWithStatus(state, processJmxUrl);
+ }
+ public void notifyAgentWithStatus(List<IUimaPipelineAEComponent> pipeline) {
+ serviceAdapter.notifyAgentWithStatus(pipeline);
+ }
+ protected void stopIt() {
+ if ( serviceAdapter != null ) {
+ //serviceAdapter.stop();
+ }
+ }
+ /**
+ * Returns state of this process( INITIALIZING, RUNNING, FAILED, STOPPED )
+ */
+ public ProcessState getServiceState() {
+ return currentState;
+ }
+ @Override
+ public void start(DuccService service, String[] args) throws Exception {
+ try {
+ super.start(service, args);
+ deploy(args);
+ } catch( Exception e) {
+ currentState = ProcessState.FailedInitialization;
+ notifyAgentWithStatus(ProcessState.FailedInitialization);
+ throw e;
+ }
+ }
+ public void stop() {
+ try {
+ System.out.println("... AbstractManagedService - Stopping Service Adapter");
+ serviceAdapter.stop();
+ System.out.println("... AbstractManagedService - Calling super.stop() ");
+ super.stop();
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+ static class ServiceShutdownHook extends Thread {
+ private AbstractManagedService managedService;
+
+ public ServiceShutdownHook(AbstractManagedService service) {
+ this.managedService = service;
+ }
+
+ public void run() {
+ try {
+ System.out
+ .println("Uima AS Service Wrapper Caught Kill Signal - Initiating Quiesce and Stop");
+ managedService.quiesceAndStop();
+ managedService.stopIt();
+
+ } catch (Exception e) {
+ }
+ }
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/AbstractManagedService.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ManagedService.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ManagedService.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ManagedService.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ManagedService.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.deploy;
+
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+public interface ManagedService {
+ public void stopService();
+ public void killService();
+ public void onServiceStateChange(ProcessState serviceState);
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ManagedService.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceAdapter.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceAdapter.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceAdapter.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceAdapter.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.deploy;
+
+import java.util.List;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.agent.ProcessStateUpdate;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.ProcessStateUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+/**
+ * Responsible for delegating state changes received from UIMA AS to a JMS endpoint.
+ *
+ */
+public class ServiceAdapter implements ServiceStateNotificationAdapter {
+ DuccLogger logger = DuccLogger.getLogger(this.getClass(), "UIMA AS Service");
+
+ // Dispatcher is responsible for sending state update event to jms endpoint
+ private DuccEventDispatcher dispatcher;
+ // Caches process PID
+ private String pid=null;
+ // Unique ID assigned to the process. This is different from OS PID
+ private String duccProcessId;
+
+ private ProcessState state;
+
+ private String endpoint;
+
+ private Object stateLock = new Object();
+
+ /**
+ * JMS based adapter C'tor
+ *
+ * @param dispatcher - initialized instance of {@link DuccEventDispatcher}
+ * @param duccProcessId - unique ID assigned by Ducc infrastructure
+ */
+ public ServiceAdapter(DuccEventDispatcher dispatcher, String duccProcessId, String endpoint) {
+ this.dispatcher = dispatcher;
+ this.duccProcessId = duccProcessId;
+ this.endpoint = endpoint;
+ }
+ public void notifyAgentWithStatus(ProcessState state) {
+ notifyAgentWithStatus(state, null);
+ }
+ public void notifyAgentWithStatus(ProcessState state, String processJmxUrl) {
+ synchronized( stateLock ) {
+ this.state = state;
+ if ( pid == null ) {
+ // Get the PID once and cache for future reference
+ pid = Utils.getPID();
+ }
+ ProcessStateUpdate processUpdate = null;
+ if ( processJmxUrl == null ) {
+ processUpdate = new ProcessStateUpdate(state, pid, duccProcessId,null);
+ } else {
+ processUpdate = new ProcessStateUpdate(state, pid, duccProcessId,processJmxUrl, null);
+ }
+ //System.out.println("................. >>> ProcessStateUpdate==NULL?"+(processUpdate==null)+" JmxUrl="+processJmxUrl);
+ if (endpoint != null ) {
+ processUpdate.setSocketEndpoint(endpoint);
+ }
+ this.notifyAgentWithStatus(processUpdate);
+ }
+ }
+ /**
+ * Called on UIMA AS status change. Sends a {@link ProcessStateUpdateDuccEvent} message
+ * via configured dispatcher to a configured endpoint.
+ *
+ */
+ public void notifyAgentWithStatus(ProcessStateUpdate state) {
+ try {
+ ProcessStateUpdateDuccEvent duccEvent =
+ new ProcessStateUpdateDuccEvent(state);
+ logger.info("notifyAgentWithStatus",null," >>>>>>> UIMA AS Service Deployed - PID:"+pid);
+
+ if (endpoint != null ) {
+ state.setSocketEndpoint(endpoint);
+ }
+ // send the process update to the remote
+ dispatcher.dispatch(duccEvent, System.getenv("IP"));
+ String jmx = state.getProcessJmxUrl() == null ? "N/A" : state.getProcessJmxUrl();
+ logger.info("notifyAgentWithStatus",null,"... UIMA AS Service Deployed - PID:"+pid+". Service State: "+state+". JMX Url:"+jmx+" Dispatched State Update Event to Agent with IP:"+System.getenv("IP"));
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+ public void notifyAgentWithStatus(List<IUimaPipelineAEComponent> pipeline) {
+ synchronized( stateLock ) {
+ // Only send update if the AE is initializing
+ if ( state.equals(ProcessState.Initializing)) {
+ try {
+ ProcessStateUpdate processUpdate =
+ new ProcessStateUpdate(state, pid, duccProcessId, null, pipeline);
+ notifyAgentWithStatus(processUpdate);
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ public void stop() throws Exception {
+ dispatcher.stop();
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceAdapter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.deploy;
+
+import java.util.List;
+
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+/**
+ * Interface to
+ *
+ *
+ */
+public interface ServiceStateNotificationAdapter {
+ public void notifyAgentWithStatus(ProcessState state);
+ public void notifyAgentWithStatus(ProcessState state, String processJmxUrl);
+ public void notifyAgentWithStatus(List<IUimaPipelineAEComponent> pipeline);
+ public void stop() throws Exception;
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ManagedUimaService.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ManagedUimaService.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ManagedUimaService.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ManagedUimaService.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.deploy.uima;
+
+import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
+import org.apache.uima.adapter.jms.service.UIMA_Service;
+import org.apache.uima.analysis_engine.AnalysisEngineManagement;
+import org.apache.uima.ducc.agent.deploy.AbstractManagedService;
+import org.apache.uima.ducc.agent.deploy.ServiceStateNotificationAdapter;
+import org.apache.uima.ducc.common.main.DuccService;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.common.utils.XStreamUtils;
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.agent.UimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.event.common.DuccUimaDeploymentDescriptor;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+/**
+ * Service wrapper for UIMA AS service. Deploys UIMA AS using Spring deployer
+ * component. Reports UIMA AS state to an agent using
+ * {@code ServiceStateNotificationAdapter}.
+ *
+ */
+public class ManagedUimaService extends AbstractManagedService {
+
+ private SpringContainerDeployer serviceDeployer;
+ private String saxonJarPath;
+ private String dd2SpringXslPath;
+ private String processJmxUrl=null;
+ protected DuccLogger logger;
+ private String agentStateUpdateEndpoint="";
+
+ public static void main(String[] args) {
+ try {
+ ManagedUimaService ms =
+ new ManagedUimaService("${DUCC_HOME}/lib/saxon8/saxon8.jar", "${DUCC_HOME}/bin/dd2spring.xsl",null, new DefaultCamelContext());
+ ms.deploy(new String[] {XStreamUtils.marshall(new DuccUimaDeploymentDescriptor(args[0]))});
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public ManagedUimaService(String saxonJarPath,String dd2SpringXslPath,
+ ServiceStateNotificationAdapter serviceAdapter, CamelContext context) {
+ super(serviceAdapter, context);
+ this.saxonJarPath = saxonJarPath;
+ this.dd2SpringXslPath = dd2SpringXslPath;
+ // Fetch uima logger and inject UIMALogFormatter to show thread ids
+// Logger l = java.util.logging.Logger.getLogger("org.apache.uima");
+// ConsoleHandler ch = new ConsoleHandler();
+// ch.setFormatter(new UIMALogFormatter());
+// l.addHandler(ch);
+ logger = new DuccLogger(DuccService.class);
+ }
+
+ public void onServiceStateChange(ProcessState state) {
+ super.notifyAgentWithStatus(state);
+ }
+ public void setAgentStateUpdateEndpoint(String agentUpdateEndpoint) {
+ this.agentStateUpdateEndpoint = agentUpdateEndpoint;
+ }
+ public void quiesceAndStop() {
+ try {
+ if (serviceDeployer != null) {
+ serviceDeployer.getTopLevelController().quiesceAndStop();
+ }
+ AnalysisEngineController topLevelController = serviceDeployer
+ .getTopLevelController();
+ if (topLevelController != null && !topLevelController.isStopped()) {
+ serviceDeployer
+ .undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
+ }
+ stop();
+ } catch (Exception e) {
+
+ }
+ }
+
+ public void terminate() {
+ currentState = ProcessState.Stopped;
+ System.out.println("Service STOPPED");
+ try {
+ super.notifyAgentWithStatus(currentState);
+ if (serviceDeployer != null) {
+ // Use top level controller to stop all components
+ serviceDeployer.getTopLevelController().stop();
+ }
+ stopIt();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void killService() {
+ logger.info("killService", null, "Ducc UIMA Service process received STOP event. Stopping UIMA AS ...");
+ if (serviceDeployer != null) {
+ // Use top level controller to stop all components. This method doesnt wait
+ // for inflight CASes to be processed
+ serviceDeployer.getTopLevelController().stop();
+ }
+ logger.info("killService", null, "Ducc UIMA Service process stopped UIMA AS and exiting via System.exit()");
+ System.exit(-1);
+ }
+
+ public void stopService() {
+ System.out.println("UIMA AS process received STOP event. Proceeding to STOP in quiesce mode");
+
+ if (serviceDeployer != null) {
+ // Use top level controller to stop all components. This method blocks until
+ // ALL in flight CASes are processed.
+ serviceDeployer.getTopLevelController().quiesceAndStop();
+ }
+ System.out.println("UIMA AS Service quiesceAndStop() Finished");
+ currentState = ProcessState.Stopped;
+ try {
+ super.stop();
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+ /**
+ * Returns UIMA AS service arguments: saxonURL, xslt parser
+ *
+ * @param args - commandline args
+ * @return
+ * @throws Exception
+ */
+ public String[] getServiceArgs(String[] args) throws Exception {
+ String ddPath = args[0];
+ ddPath = Utils.resolvePlaceholderIfExists(ddPath,System.getProperties());
+ return new String[] {"-saxonURL",
+ Utils.resolvePlaceholderIfExists(saxonJarPath,System.getProperties()),
+ "-xslt",
+ Utils.resolvePlaceholderIfExists(dd2SpringXslPath,System.getProperties()),
+ "-dd",ddPath};
+ }
+/*
+ private void setupLogging() throws Exception {
+ Properties props = new Properties();
+ try {
+ InputStream configStream =
+ getClass().getResourceAsStream
+ ("Logger.properties");
+ props.load(configStream);
+ configStream.close();
+ } catch(IOException e) {
+ System.out.println("Error");
+ }
+ //props.setProperty("log4j.rootLogger","INFO, stdout");
+ Enumeration<Logger> en = LogManager.getCurrentLoggers();
+ while (en.hasMoreElements()) {
+ System.out.println("Logger Appender Class:"+en.nextElement().getName());
+ }
+ LogManager.resetConfiguration();
+ PropertyConfigurator.configure(props);
+ }
+*/
+ /**
+ * deploys UIMA AS service
+ */
+ public void deploy(String[] args) throws Exception {
+ // Instrument this process with JMX Agent. The Agent will
+ // find an open port and start JMX Connector allowing
+ // jmx clients to connect to this jvm using standard
+ // jmx connect url. This process does not require typical
+ // -D<jmx params> properties. Currently the JMX does not
+ // use security allowing all clients to connect.
+ processJmxUrl = super.getProcessJmxUrl();
+ System.out.println("Connect jConsole to this process using JMX URL:"+processJmxUrl);
+
+ UIMA_Service service = new UIMA_Service();
+
+ StringBuffer sb = new StringBuffer("Deploying UIMA AS with args:\n");
+
+ for( String arg : args) {
+ sb.append(arg+"\n");
+ }
+ System.out.println(sb.toString());
+ String[] serviceArgs = getServiceArgs(args);
+
+ sb.setLength(0);
+ sb.append("Service Args:\n");
+ for( String arg : serviceArgs) {
+ sb.append(" "+arg);
+ }
+ System.out.println(sb.toString());
+
+ System.out.println("ManagedUimaService initializing...");
+
+ // parse command args and run dd2spring to generate spring context
+ // files from deployment descriptors
+ String[] contextFiles = service.initialize(serviceArgs);
+ if (contextFiles == null) {
+ throw new Exception(
+ "Spring Context Files Not Generated. Unable to Launch Uima AS Service");
+ }
+ // Make sure that the dd2spring generated file exists
+ File generatedFile = new File(contextFiles[0]);
+ while( !generatedFile.exists() ) {
+ synchronized(generatedFile) {
+ generatedFile.wait(500);
+ }
+ }
+ System.out.println("ManagedUimaService initialized - ready to process. Agent State Update endpoint:"+agentStateUpdateEndpoint);
+ System.out.println(".... Verified dd2spring generated spring context file:"+contextFiles[0]);
+ // Let the Agent know that the service is entering Initialization
+ // state. This is an initial state of a service, covering
+ // process bootstrapping(startup) and initialization of UIMA
+ // components.
+ super.notifyAgentWithStatus(ProcessState.Initializing, processJmxUrl);
+
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
+ 1);
+ executor.prestartAllCoreThreads();
+ // Instantiate a UIMA AS jmx monitor to poll for status of the AE.
+ // This monitor checks if the AE is initializing or ready.
+ UimaAEJmxMonitor monitor = new UimaAEJmxMonitor(this, serviceArgs);
+ /*
+ * This will execute the UimaAEJmxMonitor continuously for every 15
+ * seconds with an initial delay of 20 seconds. This monitor polls
+ * initialization status of AE deployed in UIMA AS.
+ */
+ executor.scheduleAtFixedRate(monitor, 20, 30, TimeUnit.SECONDS);
+ // Future<Integer> future = executor.submit(callable);
+
+ // Deploy components defined in Spring context files.
+ // !!!! NOTE:This method blocks until the container is fully
+ // initialized and all UIMA-AS components are successfully deployed
+ // or there is a failure.
+ try {
+ serviceDeployer = service.deploy(contextFiles);
+ } catch( Throwable t) {
+ t.printStackTrace();
+ }
+ // Stop executor. It was only needed to poll AE initialization status.
+ // Since deploy() completed
+ // the UIMA AS service either succeeded initializing or it failed. In
+ // either case we no longer
+ // need to poll for initialization status
+ executor.shutdownNow();
+
+ if (serviceDeployer == null || serviceDeployer.initializationFailed() ) {
+ currentState = ProcessState.FailedInitialization;
+ System.out
+ .println(">>> Failed to Deploy UIMA Service. Check UIMA Log for Details");
+ super.notifyAgentWithStatus(ProcessState.FailedInitialization);
+ } else {
+ currentState = ProcessState.Running;
+ // Update agent with the most up-to-date state of the pipeline
+ monitor.run();
+ super.notifyAgentWithStatus(currentState,processJmxUrl);
+ }
+
+ }
+
+ public void updateAgent(List<IUimaPipelineAEComponent> pipeline) {
+ super.notifyAgentWithStatus(pipeline);
+ }
+ public static class UimaAEJmxMonitor implements Runnable {
+ MBeanServer server = null;
+ ManagedUimaService service;
+ static int howManySeenSoFar = 1;
+ public List<IUimaPipelineAEComponent> aeStateList = new ArrayList<IUimaPipelineAEComponent>();
+
+ public UimaAEJmxMonitor(ManagedUimaService service, String[] serviceArgs)
+ throws Exception {
+ server = ManagementFactory.getPlatformMBeanServer();
+ this.service = service;
+ }
+
+ private IUimaPipelineAEComponent getUimaAeByName(String name) {
+ for (IUimaPipelineAEComponent aeState : aeStateList) {
+ if (aeState.getAeName().equals(name)) {
+ return aeState;
+ }
+ }
+ return null;
+ }
+
+ public void run() {
+ try {
+ // create an ObjectName with UIMA As JMS naming convention to enable
+ // finding deployed uima components.
+ ObjectName uimaServicePattern = new ObjectName(
+ "org.apache.uima:type=ee.jms.services,*");
+ // Fetch UIMA AS MBean names from JMX Server that match above
+ // name pattern
+ Set<ObjectInstance> mbeans = new HashSet<ObjectInstance>(
+ server.queryMBeans(uimaServicePattern, null));
+ List<IUimaPipelineAEComponent> componentsToDelete = new ArrayList<IUimaPipelineAEComponent>();
+ boolean updateAgent = false;
+ for (ObjectInstance instance : mbeans) {
+ String targetName = instance.getObjectName()
+ .getKeyProperty("name");
+ if (targetName.endsWith("FlowController")) { // skip FC
+ continue;
+ }
+ // Only interested in AEs
+ if (instance
+ .getClassName()
+ .equals("org.apache.uima.analysis_engine.impl.AnalysisEngineManagementImpl")) {
+ String[] aeObjectNameParts = instance.getObjectName()
+ .toString().split(",");
+ if (aeObjectNameParts.length == 3) {
+ // this is uima aggregate MBean. Skip it. We only
+ // care about this
+ // aggregate's pipeline components.
+ continue;
+ }
+ StringBuffer sb = new StringBuffer();
+ int partCount = 0;
+ // compose component name from jmx ObjectName
+ for (String part : aeObjectNameParts) {
+ partCount++;
+ if (part.startsWith("org.apache.uima:type")
+ || part.startsWith("s=")) {
+ continue; // skip service name part of the name
+ } else {
+ sb.append("/");
+ if (part.endsWith("Components")) {
+ part = part.substring(0,
+ part.indexOf("Components")).trim();
+ }
+ sb.append(part.substring(part.indexOf("=") + 1));
+ }
+ }
+ // Fetch a proxy to the AE Management object which holds AE stats
+ AnalysisEngineManagement proxy = JMX.newMBeanProxy(server, instance.getObjectName(),AnalysisEngineManagement.class);
+
+ IUimaPipelineAEComponent aeState = null;
+// if ((aeState = getUimaAeByName(aeStateList, sb.toString())) == null) {
+ if ((aeState = getUimaAeByName(sb.toString())) == null) {
+ // Not interested in AEs that are in a Ready State
+ if ( AnalysisEngineManagement.State.valueOf(proxy.getState()).equals(AnalysisEngineManagement.State.Ready)) {
+ continue;
+ }
+ aeState = new UimaPipelineAEComponent(sb.toString(), proxy.getThreadId(), AnalysisEngineManagement.State.valueOf(proxy.getState()));
+ aeStateList.add(aeState);
+ ((UimaPipelineAEComponent)aeState).startInitialization = System.currentTimeMillis();
+ aeState.setAeState(AnalysisEngineManagement.State.Initializing);
+ updateAgent = true;
+ } else {
+ // continue publishing AE state while the AE is initializing
+ if (AnalysisEngineManagement.State.valueOf(proxy.getState()).equals(AnalysisEngineManagement.State.Initializing)) {
+ updateAgent = true;
+ aeState.setInitializationTime(System.currentTimeMillis()-((UimaPipelineAEComponent)aeState).startInitialization);
+ // publish state if the AE just finished initializing and is now in Ready state
+ } else if (aeState.getAeState().equals(AnalysisEngineManagement.State.Initializing) &&
+ AnalysisEngineManagement.State.valueOf(proxy.getState()).equals(AnalysisEngineManagement.State.Ready)) {
+ aeState.setAeState(AnalysisEngineManagement.State.Ready);
+ updateAgent = true;
+ synchronized(this) {
+ try {
+ wait(5);
+ }catch(InterruptedException ex) {
+ }
+ }
+ aeState.setInitializationTime(proxy.getInitializationTime());
+ // AE reached ready state we no longer need to publish its state
+ componentsToDelete.add(aeState);
+ }
+ }
+ service.logger.info("UimaAEJmxMonitor.run()", null, "---- AE Name:"+proxy.getName()+" AE State:"+proxy.getState()+" AE init time="+aeState.getInitializationTime()+" Proxy Init time="+proxy.getInitializationTime()+" Proxy Thread ID:"+proxy.getThreadId());
+ }
+ }
+ howManySeenSoFar = 1; // reset error counter
+ if (updateAgent) {
+ service.logger.info("UimaAEJmxMonitor.run()", null, "---- Publishing UimaPipelineAEComponent List - size="+aeStateList.size());
+ try {
+ service.updateAgent(aeStateList);
+ } catch( Exception ex) {
+ throw ex;
+ } finally {
+ // remove components that reached Ready state
+ for (IUimaPipelineAEComponent aeState : componentsToDelete) {
+ aeStateList.remove(aeState);
+ }
+ }
+ }
+
+ } catch( UndeclaredThrowableException e ) {
+ if ( !(e.getCause() instanceof InstanceNotFoundException) ) {
+ if ( howManySeenSoFar > 3 ) { // allow up three errors of this kind
+ service.logger.info("UimaAEJmxMonitor.run()", null, e);
+ howManySeenSoFar = 1;
+ throw e;
+ }
+ howManySeenSoFar++;
+ } else {
+ // AE not fully initialized yet, ignore the exception
+ }
+ } catch (Throwable e) {
+ howManySeenSoFar = 1;
+ service.logger.info("UimaAEJmxMonitor.run()", null, e);
+ }
+ }
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ManagedUimaService.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ProcessEventListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ProcessEventListener.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ProcessEventListener.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ProcessEventListener.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.deploy.uima;
+
+import org.apache.camel.Body;
+import org.apache.uima.ducc.agent.deploy.ManagedService;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.ProcessStopDuccEvent;
+import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
+
+
+public class ProcessEventListener implements DuccEventDelegateListener{
+
+ private DuccEventDispatcher eventDispatcher;
+ private ManagedService service;
+
+ public ProcessEventListener(ManagedService service) {
+ this.service = service;
+ }
+ public void setDuccEventDispatcher(DuccEventDispatcher eventDispatcher) {
+ this.eventDispatcher = eventDispatcher;
+ }
+ public void onProcessStop(@Body ProcessStopDuccEvent event) {
+ service.killService();
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ProcessEventListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/UimaAsServiceConfiguration.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/UimaAsServiceConfiguration.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/UimaAsServiceConfiguration.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/UimaAsServiceConfiguration.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.deploy.uima;
+
+import java.net.InetAddress;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.uima.ducc.agent.NodeAgent;
+import org.apache.uima.ducc.agent.deploy.ManagedService;
+import org.apache.uima.ducc.agent.deploy.ServiceAdapter;
+import org.apache.uima.ducc.agent.deploy.ServiceStateNotificationAdapter;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.transport.DuccExchange;
+import org.apache.uima.ducc.transport.DuccTransportConfiguration;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+
+@Configuration
+@Import({ DuccTransportConfiguration.class,
+ CommonConfiguration.class })
+public class UimaAsServiceConfiguration {
+ @Autowired
+ DuccTransportConfiguration transport;
+ @Autowired
+ CommonConfiguration common;
+
+ /**
+ * Creates Camel Router to handle incoming messages
+ *
+ * @param delegate - {@code AgentEventListener} to delegate messages to
+ *
+ * @return {@code RouteBuilder} instance
+ */
+ public synchronized RouteBuilder routeBuilderForIncomingRequests(final String thisNodeIP,
+ final ProcessEventListener delegate) {
+ return new RouteBuilder() {
+ // Custom filter to select messages that are targeted for this process
+ // Checks the PID in a message to determine if this process is
+ // the target.
+ Predicate filter = new DuccProcessFilter(thisNodeIP);
+ public void configure() throws Exception {
+ System.out.println("Service Wrapper Starting Request Channel on Endpoint:"+common.managedServiceEndpoint);
+ onException(Exception.class).handled(true).process(new ErrorProcessor()).end();
+
+ from(common.managedServiceEndpoint)
+ .choice().when(filter)
+ .bean(delegate)
+ .end();
+
+ }
+ };
+ }
+ public class ErrorProcessor implements Processor {
+
+ public void process(Exchange exchange) throws Exception {
+ // the caused by exception is stored in a property on the exchange
+ Throwable caused = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
+ caused.printStackTrace();
+ //System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1");
+ //assertNotNull(caused);
+ // here you can do what you want, but Camel regard this exception as handled, and
+ // this processor as a failurehandler, so it wont do redeliveries. So this is the
+ // end of this route. But if we want to route it somewhere we can just get a
+ // producer template and send it.
+
+ // send it to our mock endpoint
+ //exchange.getContext().createProducerTemplate().send("mock:myerror", exchange);
+ }
+ }
+
+ private ProcessEventListener processDelegateListener(ManagedService service) {
+ return new ProcessEventListener(service);
+ }
+ private ServiceStateNotificationAdapter serviceAdapter(DuccEventDispatcher eventDispatcher, String stateUpdateEndpoint) {
+ return new ServiceAdapter(eventDispatcher, System.getenv("ProcessDuccId"), stateUpdateEndpoint);
+ }
+ @Bean
+ public ManagedService managedService() throws Exception {
+ try {
+ // Assume IP address provided from environment. In production this
+ // will be the actual node IP. In testing, the IP can be virtual
+ // when running multiple agents on the same node. The agent is
+ // responsible for providing the IP in this process environment.
+ String thisNodeIP =
+ (System.getenv("IP") == null) ? InetAddress.getLocalHost().getHostAddress() : System.getenv("IP");
+ CamelContext camelContext = common.camelContext();
+ int serviceSocketPort = 0;
+ String agentSocketParams="";
+ String jpSocketParams="";
+ if ( common.managedServiceEndpointParams != null ) {
+ jpSocketParams = "?"+common.managedServiceEndpointParams;
+ }
+
+ if ( common.managedProcessStateUpdateEndpointParams != null ) {
+ agentSocketParams = "?"+common.managedProcessStateUpdateEndpointParams;
+ }
+ // set up agent socket endpoint where this UIMA AS service will send state updates
+ if ( common.managedProcessStateUpdateEndpointType != null && common.managedProcessStateUpdateEndpointType.equalsIgnoreCase("socket") ) {
+ common.managedProcessStateUpdateEndpoint =
+ "mina:tcp://localhost:"+System.getProperty(NodeAgent.ProcessStateUpdatePort)+agentSocketParams;
+ }
+ // set up a socket endpoint where the UIMA AS service will receive events sent from its agent
+ if ( common.managedServiceEndpointType != null && common.managedServiceEndpointType.equalsIgnoreCase("socket")) {
+ serviceSocketPort = Utils.findFreePort();
+ // service is on the same node as the agent
+ common.managedServiceEndpoint =
+ "mina:tcp://localhost:"+serviceSocketPort+jpSocketParams;
+ }
+
+ // optionally configures Camel Context for JMS. Checks the 'agentRequestEndpoint' to
+ // to determine type of transport. If the the endpoint starts with "activemq:", a
+ // special ActiveMQ component will be activated to enable JMS transport
+
+ DuccEventDispatcher eventDispatcher =
+ transport.duccEventDispatcher(common.managedProcessStateUpdateEndpoint, camelContext);
+
+
+ ManagedUimaService service =
+ new ManagedUimaService(common.saxonJarPath,common.dd2SpringXslPath, serviceAdapter(eventDispatcher,common.managedServiceEndpoint), camelContext);
+
+ service.setAgentStateUpdateEndpoint(common.managedProcessStateUpdateEndpoint);
+
+ System.out.println("#######################################################");
+ System.out.println("## Agent Service State Update Endpoint:"+common.managedProcessStateUpdateEndpoint+" ##");
+ System.out.println("#######################################################");
+
+
+ ProcessEventListener delegateListener = processDelegateListener(service);
+ delegateListener.setDuccEventDispatcher(eventDispatcher);
+ camelContext.addRoutes(this.routeBuilderForIncomingRequests(thisNodeIP, delegateListener));
+
+ return service;
+
+ } catch( Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+ private class DuccProcessFilter implements Predicate {
+ String thisNodeIP;
+ public DuccProcessFilter(final String thisNodeIP) {
+ this.thisNodeIP = thisNodeIP;
+ }
+ public synchronized boolean matches(Exchange exchange) {
+ String methodName="DuccProcessFilter.matches";
+ boolean result = false;
+ try {
+ String pid = (String)exchange.getIn().getHeader(DuccExchange.ProcessPID);
+ String targetIP = (String)exchange.getIn().getHeader(DuccExchange.DUCCNODEIP);
+ // check if this message is targeting this process. Check if the process PID
+ // and the node match target process.
+ if ( Utils.getPID().equals(pid) && thisNodeIP.equals(targetIP) ) { // Get PID of this process
+ result = true;
+ System.out.println( ">>>>>>>>> Process Received a Message. Is Process target for message:"+result+". Target PID:"+pid);
+ }
+ } catch( Throwable e) {
+ e.printStackTrace();
+ }
+ return result;
+ }
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/UimaAsServiceConfiguration.java
------------------------------------------------------------------------------
svn:eol-style = native