You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:02:19 UTC

svn commit: r1427906 [2/5] - in /uima/sandbox/uima-ducc/trunk/uima-ducc-agent: main/ main/java/ main/java/org/ main/java/org/apache/ main/java/org/apache/uima/ main/java/org/apache/uima/ducc/ main/java/org/apache/uima/ducc/agent/ main/java/org/apache/u...

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/RogueProcessReaper.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/RogueProcessReaper.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/RogueProcessReaper.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/RogueProcessReaper.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.uima.ducc.common.node.metrics.NodeUsersInfo;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+
+
+/**
+ * Manages rogue processes on a node.
+ * 
+ *
+ */
+public class RogueProcessReaper {
+
+  private Map<String, RogueProcessEntry> userRogueProcessMap = new TreeMap<String, RogueProcessEntry>();
+
+  private int counterValue = 5;
+
+  private int cleanupCounterValue = 5;
+
+  int maxSecondsBeforeEntryExpires = 120; // number of seconds a process entry is kept in
+                                        // the rogue process map before it is removed.
+                                        // Default: 2 minutes
+
+  private DuccLogger logger;
+  
+  boolean doKillRogueProcess = false;  
+  
+  public RogueProcessReaper(DuccLogger logger, int counterValue, int cleanupCounterValue) {
+    this.counterValue = counterValue;
+    if (cleanupCounterValue > 0) {
+      this.cleanupCounterValue = cleanupCounterValue;
+    } else {
+      this.cleanupCounterValue = counterValue + 5;
+    }
+    // check if purge delay is defined in ducc.properties. 
+    if (System.getProperty("ducc.agent.rogue.process.purge.delay") != null) {
+      try {
+        maxSecondsBeforeEntryExpires = Integer.valueOf(System
+                .getProperty("ducc.agent.rogue.process.purge.delay"));
+      } catch (Exception e) {
+        if ( logger == null ) {
+          e.printStackTrace();
+        } else {
+          logger.error("RogueProcessReaper.ctor", null, e);
+        }
+        maxSecondsBeforeEntryExpires = 120; // defaulting to 2 minutes
+      }
+    }
+    this.logger = logger;
+    final String kill = System.getProperty("ducc.agent.rogue.process.kill");
+  
+    if (kill != null && Boolean.getBoolean(kill) == true) {
+      doKillRogueProcess = true;
+    }    
+    if ( logger == null ) {
+      System.out.println(
+              "ducc.agent.rogue.process.kill=" + doKillRogueProcess);
+
+    } else {
+      logger.info("RogueProcessReaper.ctor", null,
+              "ducc.agent.rogue.process.kill=" + doKillRogueProcess);
+      
+    }
+    
+  }
+
+  public void submitRogueProcessForKill(String user, String pid, boolean isJava) {
+    final String methodName = "RogueProcessReaper.submitRogueProcessForKill";
+    RogueProcessEntry entry = null;
+    if (userRogueProcessMap.containsKey(pid)) {
+      entry = userRogueProcessMap.get(pid);
+    } else {
+      if (cleanupCounterValue <= counterValue) {
+        cleanupCounterValue += counterValue;
+      }
+      entry = new RogueProcessEntry(counterValue, cleanupCounterValue, user,
+              maxSecondsBeforeEntryExpires, isJava);
+      userRogueProcessMap.put(pid, entry);
+    }
+    entry.markAsRogue(3);
+    if ( !entry.isRogue() ) {
+      if ( logger == null ) {
+        System.out.println(
+                "PID:" + pid+" Not Rogue Yet - It takes 3 iterations to make it Rogue");
+
+      } else {
+        logger.info("submitRogueProcessForKill", null,
+                "PID:" + pid+" Not Rogue Yet - It takes 3 iterations to make it Rogue");
+        
+      }
+      return;
+    }
+    if ( doKillRogueProcess ) {
+      try {
+        // Dont kill the process immediately. Kill if this method is called "counterValue"
+        // number of times.
+        long counter;
+        if ((counter = entry.countDown()) == 0 && !entry.isKilled()) {
+          if ( logger == null ) {
+            System.out.println(
+                    "Process Scheduled for Kill PID:" + pid + " Owner:" + user
+                            + " ");
+
+          } else {
+            logger.info(methodName, null, "Process Scheduled for Kill PID:" + pid + " Owner:" + user
+                    + " ");
+            
+          }
+          kill(user, pid);
+          entry.killed();
+        } else {
+          if ( logger == null ) {
+            System.out.println(
+                    "Process ***NOT*** Scheduled for Kill PID:" + pid + " Owner:"
+                            + user + " Call:" + (counterValue - counter) + " of " + counterValue);
+
+          } else {
+            logger.info(methodName, null, "Process ***NOT*** Scheduled for Kill PID:" + pid + " Owner:"
+                    + user + " Call:" + (counterValue - counter) + " of " + counterValue);
+            
+          }
+        }
+
+        if (entry.isKilled() && entry.countDownCleanupCounter() == 0) {
+          if ( logger == null ) {
+            System.out.println(
+                     "Removing Entry From RougeProcessMap for PID:" + pid
+                            + " Owner:" + user);
+
+          } else {
+            logger.info(methodName, null, "Removing Entry From RougeProcessMap for PID:" + pid
+                    + " Owner:" + user);
+            
+          }
+          userRogueProcessMap.remove(pid);
+        }
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    } else {
+      if ( logger == null ) {
+        System.out.println(
+                "Ducc Not Configured to Kill Rogue Proces (PID:)" + pid + " Owner:" + user +
+                        ". Change (or define) ducc.agent.rogue.process.kill property in ducc.properties if you want rogue processes to be cleaned up.");
+
+      } else {
+        logger.info(methodName, null, "Ducc Not Configured to Kill Rogue Proces (PID:)" + pid + " Owner:" + user +
+                ". Change (or define) ducc.agent.rogue.process.kill property in ducc.properties if you want rogue processes to be cleaned up.");
+        
+      }
+    }
+    if ( logger == null ) {
+      System.out.println(
+              "UserRougeProcessMap size:" + userRogueProcessMap.size());
+
+    } else {
+      logger.info(methodName, null, "UserRougeProcessMap size:" + userRogueProcessMap.size());
+      
+    }
+  }
+
+  public List<String> getUserRogueProcesses(String user) {
+    List<String> rogues = new ArrayList<String>();
+    for (Map.Entry<String, RogueProcessEntry> entry : userRogueProcessMap.entrySet()) {
+      if (entry.getValue().getUser().equals(user) && entry.getValue().isRogue() ) {
+        rogues.add(entry.getKey());
+      }
+    }
+    return rogues;
+  }
+  public boolean removeRogueProcess(String pid) {
+    if ( userRogueProcessMap.containsKey(pid)) {
+      userRogueProcessMap.remove(pid);
+      return true;
+    }
+    return false;
+  }
+  public void removeDeadRogueProcesses(List<String> currentPids) {
+    List<String> deadPIDs = new ArrayList<String>();
+    
+    for (Map.Entry<String, RogueProcessEntry> entry : userRogueProcessMap.entrySet()) {
+      if ( !currentPids.contains(entry.getKey())) {
+        deadPIDs.add(entry.getKey());
+      }
+    }
+    for( String deadPID : deadPIDs) {
+      userRogueProcessMap.remove(deadPID);
+    }
+  }
+  public void copyAllUserRogueProcesses(TreeMap<String, NodeUsersInfo> map) {
+    // List containing old entries which should be deleted from userRogueProcessMap
+    List<String> entryCleanupList = new ArrayList<String>();
+
+    for (Map.Entry<String, RogueProcessEntry> entry : userRogueProcessMap.entrySet()) {
+      if ( !entry.getValue().isRogue() ) {
+        continue;
+      }
+      NodeUsersInfo nui;
+      if (map.containsKey(entry.getValue().getUser())) {
+        nui = map.get(entry.getValue().getUser());
+      } else {
+        nui = new NodeUsersInfo(entry.getValue().getUser());
+        map.put(entry.getValue().getUser(), nui);
+      }
+      nui.addRogueProcess(entry.getKey(), entry.getValue().isJava());
+    }
+    for (String entryToRemove : entryCleanupList) {
+      if ( logger == null ) {
+        System.out.println(
+                 "Removing Expired Entry From RogueProcessMap for PID:" + entryToRemove);
+
+      } else {
+        logger.info("copyAllUserRogueProcesses", null,
+                "Removing Expired Entry From RogueProcessMap for PID:" + entryToRemove);
+        
+      }
+      userRogueProcessMap.remove(entryToRemove);
+    }
+  }
+  /**
+   * This method checks if ducc is configured to kill rogue processes and if so, proceeds to 
+   * kill via -9. 
+   * 
+   * @param user - process owner
+   * @param pid - process id
+   * @return - true if the process has been killed, false otherwise
+   * @throws Exception
+   */
+  public void kill(final String user, final String pid) throws Exception {
+    final String methodName = "RogueProcessReaper.kill.run()";
+  
+      new Thread(new Runnable() {
+        public void run() {
+          try {
+            String c_launcher_path = Utils.resolvePlaceholderIfExists(
+                    System.getProperty("ducc.agent.launcher.ducc_spawn_path"), System.getProperties());
+            String cmdLine;
+            String arg;
+            boolean useDuccling = false;
+            if (Utils.isWindows()) {
+              cmdLine = "taskkill";
+              arg = "/PID";
+            } else {
+              String useSpawn = System.getProperty("ducc.agent.launcher.use.ducc_spawn");
+              if (useSpawn != null && useSpawn.toLowerCase().equals("true")) {
+                useDuccling = true;
+              }
+              cmdLine = "/bin/kill";
+              arg = "-9";
+            }
+            String[] duccling_nolog;
+            if (useDuccling) {
+              duccling_nolog = new String[] { c_launcher_path, "-u", user, "--", cmdLine, arg, pid };
+            } else {
+              duccling_nolog = new String[] { cmdLine, arg, pid };
+            }
+
+//            if (kill != null && Boolean.parseBoolean(kill) == true) {
+              ProcessBuilder pb = new ProcessBuilder(duccling_nolog);
+              pb.redirectErrorStream(true);
+              Process killedProcess = pb.start();
+              InputStream is = killedProcess.getInputStream();
+              BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+              String line = null;
+              // read the next line from kill command
+              while ((line = reader.readLine()) != null) {
+                // dont care about the output, just drain the buffers
+              }
+              is.close();
+              StringBuffer sb = new StringBuffer();
+              for (String part : duccling_nolog) {
+                sb.append(part).append(" ");
+              }
+              if ( logger == null ) {
+                System.out.println(
+                        "--------- Killed Process:" + pid + " Owned by:" + user
+                                + " Command:" + sb.toString());
+
+              } else {
+                logger.info(methodName, null, "--------- Killed Process:" + pid + " Owned by:" + user
+                        + " Command:" + sb.toString());
+                
+              }
+            
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+      }).start();
+  }
+
+  private static class RogueProcessEntry {
+    CountDownLatch counter;
+
+    CountDownLatch cleanupCounter;
+
+    String user;
+
+    boolean killed;
+
+    boolean java;
+    
+    AtomicInteger pendingCounter = new AtomicInteger(1);
+    boolean rogue;
+    
+    public RogueProcessEntry(int counterValue, int cleanupCounterValue, String user,
+            int maxSecondsBeforeEntryExpires, boolean isJava) {
+      counter = new CountDownLatch(counterValue);
+      cleanupCounter = new CountDownLatch(cleanupCounterValue);
+      this.user = user;
+      this.java = isJava;
+    }
+
+    public boolean isRogue() {
+      return rogue;
+    }
+    public void killed() {
+      killed = true;
+    }
+
+    public boolean isKilled() {
+      return killed;
+    }
+
+    public String getUser() {
+      return user;
+    }
+
+    public long countDown() {
+      counter.countDown();
+      return counter.getCount();
+    }
+
+    public long countDownCleanupCounter() {
+      cleanupCounter.countDown();
+      return cleanupCounter.getCount();
+    }
+
+    public void markAsRogue(int ceiling) {
+      if ( pendingCounter.get() < ceiling ) {
+        pendingCounter.addAndGet(1);
+      } else {
+        rogue = true;
+      }
+    }
+
+    public boolean isJava() {
+      return java;
+    }
+
+  }
+
+  
+  public static void main(String[] args) {
+  }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/RogueProcessReaper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.config;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.dataformat.xstream.XStreamDataFormat;
+import org.apache.camel.impl.DefaultClassResolver;
+import org.apache.uima.ducc.agent.NodeAgent;
+import org.apache.uima.ducc.agent.event.AgentEventListener;
+import org.apache.uima.ducc.agent.launcher.Launcher;
+import org.apache.uima.ducc.agent.launcher.ManagedProcess;
+import org.apache.uima.ducc.agent.processors.DefaultNodeInventoryProcessor;
+import org.apache.uima.ducc.agent.processors.DefaultNodeMetricsProcessor;
+import org.apache.uima.ducc.agent.processors.DefaultProcessMetricsProcessor;
+import org.apache.uima.ducc.agent.processors.LinuxNodeMetricsProcessor;
+import org.apache.uima.ducc.agent.processors.LinuxProcessMetricsProcessor;
+import org.apache.uima.ducc.agent.processors.NodeInventoryProcessor;
+import org.apache.uima.ducc.agent.processors.NodeMetricsProcessor;
+import org.apache.uima.ducc.agent.processors.ProcessMetricsProcessor;
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.config.DuccBlastGuardPredicate;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.transport.DuccExchange;
+import org.apache.uima.ducc.transport.DuccTransportConfiguration;
+import org.apache.uima.ducc.transport.agent.NodeMetricsConfiguration;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+//import org.apache.uima.ducc.agent.event.AgentPingEvent;
+import com.thoughtworks.xstream.XStream;
+
+@Configuration
+
+@Import({ DuccTransportConfiguration.class,
+		CommonConfiguration.class, NodeMetricsConfiguration.class })
+public class AgentConfiguration {
+	DuccLogger logger = new DuccLogger(this.getClass(), "Agent");
+	//	fetch the name of an endpoint where the JM expects incoming requests
+//	@Value("#{ systemProperties['IP'] }")
+	public String ip = System.getenv("IP");
+	public String nodeName = System.getenv("NodeName");
+	
+//	public static String agentPingEnpoint = "activemq:topic:agent.ping.endpoint";
+//	public static String agentPingSelectorName="agent_ip";
+//	public static String agentPingSelector;
+
+	//private DuccEventDispatcher agentPingDispatcher;
+	
+	@Value("#{ systemProperties['ducc.agent.launcher.thread.pool.size'] }")
+	String launcherThreadPoolSize;
+	
+	@Value("#{ systemProperties['ducc.agent.launcher.process.stop.timeout'] }")
+	public String processStopTimeout;
+
+	@Value("#{ systemProperties['ducc.agent.node.inventory.publish.rate.skip'] }")
+	public String inventoryPublishRateSkipCount;
+	
+	// Get comma separated list of processes to ignore while detecting rogue processes
+  @Value("#{ systemProperties['ducc.agent.rogue.process.exclusion.filter'] }")
+  public String processExclusionList; 
+	
+  // Get comma separated list of users to ignore while detecting rogue processes
+  @Value("#{ systemProperties['ducc.agent.rogue.process.user.exclusion.filter'] }")
+  public String userExclusionList; 
+	@Autowired
+	DuccTransportConfiguration agentTransport;
+	@Autowired
+	NodeMetricsConfiguration nodeMetrics;
+	@Autowired
+	CommonConfiguration common;
+	/**
+	 * Creates {@code AgentEventListener} that will handle incoming messages.
+	 * 
+	 * @param agent - {@code NodeAgent} instance to initialize the listener
+	 * 
+	 * @return {@code AgentEventListener} instance
+	 */
+	public AgentEventListener agentDelegateListener(NodeAgent agent) {
+		return new AgentEventListener(agent);
+	}
+	/**
+	 * Creates Camel Router to generate Node Metrics at regular intervals.
+	 * 
+	 * @param targetEndpointToReceiveNodeMetricsUpdate - where to post NodeMetrics
+	 * @param nodeMetricsPublishRate - how to publish NodeMetrics
+	 * @return - {@code RouteBuilder} instance
+	 * 
+	 * @throws Exception
+	 */
+	private RouteBuilder routeBuilderForNodeMetricsPost(final NodeAgent agent, final String targetEndpointToReceiveNodeMetricsUpdate, final int nodeMetricsPublishRate) throws Exception {
+		final Processor nmp = nodeMetricsProcessor(agent);
+		final Predicate blastFilter = new DuccBlastGuardPredicate(agent.logger);
+		final Processor cp = new ConfirmProcessor();
+		return new RouteBuilder() {
+		      public void configure() {
+		        onException(Exception.class).handled(true).process(new ErrorProcessor());
+			      from("timer:nodeMetricsTimer?fixedRate=true&period=" + nodeMetricsPublishRate)
+              .routeId("NodeMetricsPostRoute")
+
+			        // This route uses a filter to prevent sudden bursts of messages which
+			        // may flood DUCC daemons causing chaos. The filter disposes any message
+			        // that appears in a window of 1 sec or less.
+			        .filter(blastFilter)
+		    	         .process(nmp)
+		               .to(targetEndpointToReceiveNodeMetricsUpdate)
+		               .process(cp);
+		      }
+		    };
+	}
+	
+	/**
+	 * Creates Camel Router to generate Node Metrics at regular intervals.
+	 * 
+	 * @param targetEndpointToReceiveNodeMetricsUpdate - where to post NodeMetrics
+	 * @param nodeMetricsPublishRate - how to publish NodeMetrics
+	 * @return - {@code RouteBuilder} instance
+	 * 
+	 * @throws Exception
+	 */
+	private RouteBuilder routeBuilderForNodeInventoryPost(final NodeAgent agent, final String targetEndpointToReceiveNodeInventoryUpdate, final int nodeInventoryPublishRate) throws Exception {
+		final Processor nmp = nodeInventoryProcessor(agent);
+		return new RouteBuilder() {
+		      public void configure() {
+		        final Predicate bodyNotNull = body().isNotNull();
+		        
+		        final Predicate blastGuard = new DuccBlastGuardPredicate(agent.logger);
+		    	   onException(Exception.class).maximumRedeliveries(0).handled(true).process(new ErrorProcessor());
+		    	
+		    	   from("timer:nodeInventoryTimer?fixedRate=true&period=" + nodeInventoryPublishRate)
+              .routeId("NodeInventoryPostRoute")
+		    	    // This route uses a filter to prevent sudden bursts of messages which
+              // may flood DUCC daemons causing chaos. The filter disposes any message
+              // that appears in a window of 1 sec or less.
+              .filter(blastGuard)
+		                // add inventory to the body of the message
+                    .process(nmp)
+		                // filter out messages with no body. Since this route is on a timer
+		                // it keeps generating flow of messages. However, the agent only
+		                // publishes inventory if there is a change or configured number of
+		                // epochs has passed. Otherwise, the agent puts null in the body of
+		                // the message and this route should just throw it away.
+		                .filter(bodyNotNull)
+		                  .to(targetEndpointToReceiveNodeInventoryUpdate);
+		      }
+		    };
+	}
+	/**
+	 * Creates Camel Router to handle incoming messages 
+	 * 
+	 * @param delegate - {@code AgentEventListener} to delegate messages to
+	 * 
+	 * @return {@code RouteBuilder} instance
+	 */
+	public synchronized RouteBuilder routeBuilderForIncomingRequests(final NodeAgent agent, final AgentEventListener delegate) {
+		return new RouteBuilder() {
+			public void configure() {
+				onException(Throwable.class).maximumRedeliveries(0).handled(false).process(new ErrorProcessor());
+				from(common.agentRequestEndpoint)
+				.routeId("IncomingRequestsRoute")
+				//.process(new DebugProcessor())
+				.bean(delegate);
+			}
+		};
+	}
+	/**
+	 * Creates Camel Router to handle incoming messages 
+	 * 
+	 * @param delegate - {@code AgentEventListener} to delegate messages to
+	 * 
+	 * @return {@code RouteBuilder} instance
+	 */
+	public synchronized RouteBuilder routeBuilderForManagedProcessStateUpdate(final NodeAgent agent, final AgentEventListener delegate) {
+		return new RouteBuilder() {
+		  
+			//	Custom filter to select messages that are targeted for this agent
+			//  Checks the node list in a message to determine if this agent is 
+			//  the target.
+			Predicate filter = new DuccNodeFilter(agent);
+			public void configure() {
+				onException(Throwable.class).
+				  maximumRedeliveries(0).
+				    handled(true).
+				      process(new ErrorProcessor())
+				        .stop();
+
+				from(common.managedProcessStateUpdateEndpoint)
+          .routeId("ManageProcessStateUpdateRoute")
+				  //.process(new StateUpdateDebugProcessor(logger))
+	        .choice().when(filter)
+	            .bean(delegate)
+	        .end();
+			}
+		};
+	}
+
+//	 private RouteBuilder routeBuilderForNodePing(final NodeAgent agent, final String targetEndpoint) throws Exception {
+//	    return new RouteBuilder() {
+//	      PingProcessor pingProcessor = new PingProcessor(agent);
+//	          public void configure() {
+//	            System.out.println("Agent Listening on Ping Endpoint:"+targetEndpoint);
+//	            onException(Exception.class).handled(true).process(new ErrorProcessor());
+//	            from(targetEndpoint)
+//	            .routeId("NodePingRoute")
+//	            .process( pingProcessor);
+//	          }
+//	        };
+//	  }
+
+	
+	public class DebugProcessor implements Processor {
+
+		public void process(Exchange exchange) throws Exception {
+			String methodName="process";
+//			if ( logger.isLevelEnabled(Level.TRACE) ) {
+	      XStreamDataFormat xStreamDataFormat = new XStreamDataFormat();
+	      XStream xStream = xStreamDataFormat.getXStream(new DefaultClassResolver());
+	      String marshalledEvent = xStream.toXML(exchange.getIn().getBody());
+	      logger.info(methodName, null, marshalledEvent);
+//			}
+		}
+		
+	}
+
+	public static class ConfirmProcessor implements Processor {
+	  boolean first = true;
+		public void process(Exchange exchange) throws Exception {
+//		  if ( first ) {
+//		    synchronized(this) {
+//		      this.wait(20000);
+//		    }
+//		    first = false;
+//		  }
+			
+//			XStreamDataFormat xStreamDataFormat = new XStreamDataFormat();
+//	        XStream xStream = xStreamDataFormat.getXStream(new DefaultClassResolver());
+//			String marshalledEvent = xStream.toXML(exchange.getIn().getBody());
+//
+//				System.out.println("Agent Published Metrics:\n"+
+//						marshalledEvent);
+	        	
+		}
+	}	
+	public static class StateUpdateDebugProcessor implements Processor {
+	  DuccLogger logger;
+	  
+	  StateUpdateDebugProcessor(DuccLogger logger ) {
+	    this.logger = logger;
+	  }
+		public void process(Exchange exchange) throws Exception {
+		  Map<String, Object> map = exchange.getIn().getHeaders();
+		  StringBuffer sb = new StringBuffer();
+		  for( Entry<String,Object> entry : map.entrySet()) {
+		    sb.append(entry.getKey()).append("=").append(entry.getValue()).append("\n");
+		  }
+		  logger.info("StateUpdateDebugProcessor.process", null, "Headers:\n\t"+sb.toString());
+				//System.out.println("\t\tAgent received state update from managed process");
+	        	
+		}
+	}	
+
+	public class ErrorProcessor implements Processor {
+
+		public void process(Exchange exchange) throws Exception {
+			// the caused by exception is stored in a property on the exchange
+	    Throwable caused = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
+			logger.error("ErrorProcessor.process", null, caused);
+	        //assertNotNull(caused);
+	        // here you can do what you want, but Camel regard this exception as handled, and
+	        // this processor as a failurehandler, so it wont do redeliveries. So this is the
+	        // end of this route. But if we want to route it somewhere we can just get a
+	        // producer template and send it.
+
+	        // send it to our mock endpoint
+	        //exchange.getContext().createProducerTemplate().send("mock:myerror", exchange);
+		}
+	}
+	public static class TransportProcessor implements Processor {
+
+		public void process(Exchange exchange) throws Exception {
+			try {
+				System.out.println(">>> Agent Received Message of type:"+exchange.getIn().getBody().getClass().getName());
+			} catch( Exception e ) {
+				e.printStackTrace();
+			}
+// 			Destination replyTo = exchange.getIn().getHeader("JMSReplyTo",
+//					Destination.class);
+			// System.out.println("... transport - value of replyTo:" +
+			// replyTo);
+		}
+
+	}
+//  public static class PingProcessor implements Processor {
+//    private NodeAgent agent;
+//    
+//    PingProcessor(NodeAgent agent ) {
+//      this.agent = agent;
+//    }
+//    public void process(Exchange exchange) throws Exception {
+//      try {
+//        agent.ping((AgentPingEvent)exchange.getIn().getBody());
+//      } catch( Exception e ) {
+//        e.printStackTrace();
+//      }
+//    }
+//
+//  }
+	private NodeIdentity nodeIdentity() throws Exception {
+		NodeIdentity ni = null;
+		
+		if ( ip != null ) {
+			//	Inject IP to enable deployment of multiple Agents on the same node with 
+			//  different identity
+			ni = new NodeIdentity(ip,nodeName);  // this should only be used for testing
+		} else {
+			ni =  new NodeIdentity();
+		}
+		return ni;
+	}
+
+	private Launcher launcher() {
+		return new Launcher();
+	}
+	public DuccEventDispatcher getCommonProcessDispatcher(CamelContext camelContext) throws Exception {
+		return agentTransport.duccEventDispatcher(common.managedServiceEndpoint, camelContext);
+	}
+	@Bean
+	public NodeAgent nodeAgent() throws Exception {
+		try {
+	        CamelContext camelContext = common.camelContext();
+	        camelContext.disableJMX();
+
+			NodeAgent agent = new NodeAgent(nodeIdentity(), launcher(), camelContext, this);
+//      agentPingSelector = agentPingSelectorName+"='"+agent.getIdentity().getIp()+"'"; 
+			//	optionally configures Camel Context for JMS. Checks the 'agentRequestEndpoint' to 
+			//  to determine type of transport. If the the endpoint starts with "activemq:", a 
+			//  special ActiveMQ component will be activated to enable JMS transport
+			agentTransport.configureJMSTransport(common.agentRequestEndpoint,camelContext);
+			AgentEventListener delegateListener = agentDelegateListener(agent);
+//		  agentPingDispatcher = 
+//		        agentTransport.duccEventDispatcher(agentPingEnpoint, camelContext);
+		  
+      if ( common.managedProcessStateUpdateEndpointType != null && common.managedProcessStateUpdateEndpointType.equalsIgnoreCase("socket") ) {
+        String agentSocketParams = "";
+        if ( common.managedProcessStateUpdateEndpointParams != null ) {
+          agentSocketParams = "?"+common.managedProcessStateUpdateEndpointParams;
+        }
+        int agentPort = Utils.findFreePort();
+        common.managedProcessStateUpdateEndpoint = "mina:tcp://localhost:"+agentPort+agentSocketParams;
+        //  Remember the agent port since we need to tell JPs where to send their state updates
+        System.setProperty(NodeAgent.ProcessStateUpdatePort, String.valueOf(agentPort));
+      }
+			camelContext.addRoutes(this.routeBuilderForManagedProcessStateUpdate(agent,delegateListener));
+			camelContext.addRoutes(this.routeBuilderForIncomingRequests(agent,delegateListener));
+			camelContext.addRoutes(this.routeBuilderForNodeInventoryPost(agent, common.nodeInventoryEndpoint, Integer.parseInt(common.nodeInventoryPublishRate)));
+			camelContext.addRoutes(this.routeBuilderForNodeMetricsPost(agent, common.nodeMetricsEndpoint, Integer.parseInt(common.nodeMetricsPublishRate)));
+//      camelContext.addRoutes(this.routeBuilderForNodePing(agent,agentPingEnpoint+"?selector="+agentPingSelector) );
+			
+			logger.info("nodeAgent", null,"------- Agent Initialized - Identity Name:"+agent.getIdentity().getName()+" IP:"+agent.getIdentity().getIp()+" JP State Update Endpoint:"+common.managedProcessStateUpdateEndpoint);
+			return agent;
+			
+		} catch( Exception e) {
+			e.printStackTrace();
+		}
+		return null;
+	}
+
+	@Bean
+	public NodeMetricsProcessor nodeMetricsProcessor(NodeAgent agent) throws Exception {
+		if (Utils.isLinux()) {
+			return new LinuxNodeMetricsProcessor(agent, "/proc/meminfo", "/proc/loadavg");
+		} else {
+			return new DefaultNodeMetricsProcessor(agent);
+		}
+	}
+	public ProcessMetricsProcessor processMetricsProcessor(NodeAgent agent, IDuccProcess process, ManagedProcess managedProcess) throws Exception {
+		if (Utils.isLinux()) {
+			return new LinuxProcessMetricsProcessor(logger, process, agent, "/proc/"+process.getPID()+"/statm", "/proc/stat", "/proc/"+process.getPID()+"/stat", managedProcess);
+		} else {
+			return new DefaultProcessMetricsProcessor(process, agent);
+		}
+		
+	}
+	public NodeInventoryProcessor nodeInventoryProcessor(NodeAgent agent) {
+		return new DefaultNodeInventoryProcessor(agent, inventoryPublishRateSkipCount);
+	}
+	private class DuccNodeFilter implements Predicate {
+		private NodeAgent agent = null;
+		public DuccNodeFilter(NodeAgent agent) { 
+			this.agent = agent;
+		}
+		public synchronized boolean matches(Exchange exchange) {
+			String methodName="DuccNodeFilter.matches";
+			boolean result = false;
+			if ( common.managedProcessStateUpdateEndpoint.startsWith("mina")) {
+			  // mina is a socket component with point-to-point semantics thus
+			  // the client always sends a message to the correct agent. No reason
+			  // to determine if this is a target agent.
+			  result = true;
+			} else {
+	      try {
+	        String nodes = (String)exchange.getIn().getHeader(DuccExchange.TARGET_NODES_HEADER_NAME);
+	        logger.trace(methodName, null, ">>>>>>>>> Agent: ["+agent.getIdentity().getIp()+"] Received a Message. Is Agent target for message:"+result+". Target Agents:"+nodes);
+	        result = Utils.isTargetNodeForMessage(nodes, agent.getIdentity().getNodeIdentities());
+	      } catch( Throwable e) {
+	        e.printStackTrace();
+	        logger.error(methodName, null, e, new Object[] {});
+	      }
+			}
+			return result;
+	   }
+	}
+//  public DuccEventDispatcher getAgentPingDispatcher() {
+//    return agentPingDispatcher;
+//  }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/AbstractManagedService.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/AbstractManagedService.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/AbstractManagedService.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/AbstractManagedService.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.deploy;
+
+import java.util.List;
+
+import org.apache.camel.CamelContext;
+import org.apache.uima.ducc.common.component.AbstractDuccComponent;
+import org.apache.uima.ducc.common.main.DuccService;
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+public abstract class AbstractManagedService extends AbstractDuccComponent
+implements ManagedService {
+	public static final String ManagedServiceNotificationInterval = "uima.process.notify.interval";
+	private long notificationInterval = 5000;
+	protected ProcessState currentState = ProcessState.Undefined;
+	protected ProcessState previousState = ProcessState.Undefined;
+   // public static ManagedServiceContext serviceContext=null; 
+    public boolean useJmx = false;
+    public ServiceStateNotificationAdapter serviceAdapter = null;
+    
+    public abstract void quiesceAndStop();
+    public abstract void deploy(String[] args) throws Exception;
+    
+	protected AbstractManagedService(ServiceStateNotificationAdapter serviceAdapter, CamelContext context) {
+		super("UimaProcess", context);
+		this.serviceAdapter = serviceAdapter;
+		//		serviceContext = new ManagedServiceContext(this);
+	}
+	/**
+	 * @return the notificationInterval
+	 */
+	public long getNotificationInterval() {
+		return notificationInterval;
+	}
+
+	/**
+	 * @param notificationInterval
+	 *            the notificationInterval to set
+	 */
+	public void setNotificationInterval(long notificationInterval) {
+		this.notificationInterval = notificationInterval;
+	}
+
+
+	public void initialize() throws Exception {
+		
+		ServiceShutdownHook shutdownHook = new ServiceShutdownHook(this);
+		// serviceDeployer);
+		Runtime.getRuntime().addShutdownHook(shutdownHook);
+		System.out.println("Managed Service Wrapper Registered Shutdown Hook");
+	}
+
+	public void notifyAgentWithStatus(ProcessState state) {
+		serviceAdapter.notifyAgentWithStatus(state);
+	}
+	public void notifyAgentWithStatus(ProcessState state, String processJmxUrl) {
+		serviceAdapter.notifyAgentWithStatus(state, processJmxUrl);
+	}
+	public void notifyAgentWithStatus(List<IUimaPipelineAEComponent> pipeline) {
+		serviceAdapter.notifyAgentWithStatus(pipeline);
+	}
+	protected void stopIt() {
+		if ( serviceAdapter != null ) {
+			//serviceAdapter.stop();
+		}
+	}
+	/**
+	 * Returns state of this process( INITIALIZING, RUNNING, FAILED, STOPPED )
+	 */
+	public ProcessState getServiceState() {
+		return currentState;
+	}
+	@Override
+	public void start(DuccService service, String[] args) throws Exception {
+		try {
+			super.start(service, args);
+			deploy(args);
+		} catch( Exception e) {
+			currentState = ProcessState.FailedInitialization;
+			notifyAgentWithStatus(ProcessState.FailedInitialization);
+			throw e;
+		}
+	}
+	public void stop() {
+		try {
+			System.out.println("... AbstractManagedService - Stopping Service Adapter");
+			serviceAdapter.stop();
+			System.out.println("... AbstractManagedService - Calling super.stop() ");
+			super.stop();
+		} catch( Exception e) {
+			e.printStackTrace();
+		}
+	}
+	static class ServiceShutdownHook extends Thread {
+		private AbstractManagedService managedService;
+
+		public ServiceShutdownHook(AbstractManagedService service) {
+			this.managedService = service;
+		}
+
+		public void run() {
+			try {
+				System.out
+						.println("Uima AS Service Wrapper Caught Kill Signal - Initiating Quiesce and Stop");
+				managedService.quiesceAndStop();
+				managedService.stopIt();
+				
+			} catch (Exception e) {
+			}
+		}
+	}
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/AbstractManagedService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ManagedService.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ManagedService.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ManagedService.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ManagedService.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.deploy;
+
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+public interface ManagedService {
+	public void stopService();
+	public void killService();
+	public void onServiceStateChange(ProcessState serviceState);
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ManagedService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceAdapter.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceAdapter.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceAdapter.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceAdapter.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.deploy;
+
+import java.util.List;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.agent.ProcessStateUpdate;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.ProcessStateUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+/**
+ *	Responsible for delegating state changes received from UIMA AS to a JMS endpoint. 
+ *
+ */
+public class ServiceAdapter implements ServiceStateNotificationAdapter {
+	DuccLogger logger = DuccLogger.getLogger(this.getClass(), "UIMA AS Service");
+
+	//	Dispatcher is responsible for sending state update event to jms endpoint
+	private DuccEventDispatcher dispatcher;
+	//	Caches process PID
+	private String pid=null;
+	//	Unique ID assigned to the process. This is different from OS PID
+	private String duccProcessId;
+	
+	private ProcessState state;
+	
+	private String endpoint;
+	
+	private Object stateLock = new Object();
+	
+	/**
+	 * JMS based adapter C'tor
+	 * 
+	 * @param dispatcher - initialized instance of {@link DuccEventDispatcher}
+	 * @param duccProcessId - unique ID assigned by Ducc infrastructure 
+	 */
+	public ServiceAdapter(DuccEventDispatcher dispatcher, String duccProcessId, String endpoint) {
+		this.dispatcher = dispatcher;
+		this.duccProcessId = duccProcessId;
+		this.endpoint = endpoint;
+	}
+	public void notifyAgentWithStatus(ProcessState state) {
+		notifyAgentWithStatus(state, null);
+	}
+	public void notifyAgentWithStatus(ProcessState state, String processJmxUrl) {
+	  synchronized( stateLock ) {
+	    this.state = state;
+	    if ( pid == null ) {
+	      // Get the PID once and cache for future reference
+	      pid = Utils.getPID();
+	    }
+	    ProcessStateUpdate processUpdate = null;
+	    if ( processJmxUrl == null ) {
+	      processUpdate = new ProcessStateUpdate(state, pid, duccProcessId,null);
+	    } else {
+	      processUpdate = new ProcessStateUpdate(state, pid, duccProcessId,processJmxUrl, null);
+	    }
+	    //System.out.println("................. >>> ProcessStateUpdate==NULL?"+(processUpdate==null)+" JmxUrl="+processJmxUrl);
+	    if (endpoint != null ) {
+	      processUpdate.setSocketEndpoint(endpoint);
+	    }
+	    this.notifyAgentWithStatus(processUpdate);
+	  }
+	}
+	/**
+	 * Called on UIMA AS status change. Sends a {@link ProcessStateUpdateDuccEvent} message
+	 * via configured dispatcher to a configured endpoint.
+	 * 
+	 */
+	public void notifyAgentWithStatus(ProcessStateUpdate state) {
+		try {
+			ProcessStateUpdateDuccEvent duccEvent = 
+				new ProcessStateUpdateDuccEvent(state);
+      logger.info("notifyAgentWithStatus",null," >>>>>>> UIMA AS Service Deployed - PID:"+pid);
+
+      if (endpoint != null ) {
+        state.setSocketEndpoint(endpoint);
+      }
+			//	send the process update to the remote
+			dispatcher.dispatch(duccEvent, System.getenv("IP"));
+			String jmx = state.getProcessJmxUrl() == null ? "N/A" : state.getProcessJmxUrl();
+			logger.info("notifyAgentWithStatus",null,"... UIMA AS Service Deployed - PID:"+pid+". Service State: "+state+". JMX Url:"+jmx+" Dispatched State Update Event to Agent with IP:"+System.getenv("IP"));
+		} catch( Exception e) {
+			e.printStackTrace();
+		}
+	}
+	public void notifyAgentWithStatus(List<IUimaPipelineAEComponent> pipeline) {
+	   synchronized( stateLock ) {
+	     //  Only send update if the AE is initializing
+	     if ( state.equals(ProcessState.Initializing)) {
+	       try {
+	         ProcessStateUpdate processUpdate = 
+	           new ProcessStateUpdate(state, pid, duccProcessId, null, pipeline);
+	         notifyAgentWithStatus(processUpdate);
+	       } catch( Exception e) {
+	         e.printStackTrace();
+	       }
+	     }
+	   }
+	}
+	public void stop() throws Exception {
+		dispatcher.stop();
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceAdapter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.deploy;
+
+import java.util.List;
+
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+/**
+ * Interface to 
+ * 
+ *
+ */
+public interface ServiceStateNotificationAdapter {
+	public void notifyAgentWithStatus(ProcessState state);
+	public void notifyAgentWithStatus(ProcessState state, String processJmxUrl);
+	public void notifyAgentWithStatus(List<IUimaPipelineAEComponent> pipeline);
+	public void stop() throws Exception;
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ManagedUimaService.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ManagedUimaService.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ManagedUimaService.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ManagedUimaService.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.deploy.uima;
+
+import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
+import org.apache.uima.adapter.jms.service.UIMA_Service;
+import org.apache.uima.analysis_engine.AnalysisEngineManagement;
+import org.apache.uima.ducc.agent.deploy.AbstractManagedService;
+import org.apache.uima.ducc.agent.deploy.ServiceStateNotificationAdapter;
+import org.apache.uima.ducc.common.main.DuccService;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.common.utils.XStreamUtils;
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.agent.UimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.event.common.DuccUimaDeploymentDescriptor;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+/**
+ * Service wrapper for UIMA AS service. Deploys UIMA AS using Spring deployer
+ * component. Reports UIMA AS state to an agent using
+ * {@code ServiceStateNotificationAdapter}.
+ * 
+ */
+public class ManagedUimaService extends AbstractManagedService {
+
+	private SpringContainerDeployer serviceDeployer;
+	private String saxonJarPath;
+	private String dd2SpringXslPath;
+  private String processJmxUrl=null;
+	protected DuccLogger logger;
+	private String agentStateUpdateEndpoint="";
+	
+	public static void main(String[] args) {
+		try {
+			ManagedUimaService ms = 
+					new ManagedUimaService("${DUCC_HOME}/lib/saxon8/saxon8.jar", "${DUCC_HOME}/bin/dd2spring.xsl",null, new DefaultCamelContext());
+			ms.deploy(new String[] {XStreamUtils.marshall(new DuccUimaDeploymentDescriptor(args[0]))});
+		} catch( Exception e) {
+			e.printStackTrace();
+		}
+	}
+	
+	public ManagedUimaService(String saxonJarPath,String dd2SpringXslPath,
+			ServiceStateNotificationAdapter serviceAdapter, CamelContext context) {
+		super(serviceAdapter, context);
+		this.saxonJarPath = saxonJarPath;
+		this.dd2SpringXslPath = dd2SpringXslPath;
+		//	Fetch uima logger and inject UIMALogFormatter to show thread ids
+//		Logger l = java.util.logging.Logger.getLogger("org.apache.uima");
+//		ConsoleHandler ch = new ConsoleHandler();
+//		ch.setFormatter(new UIMALogFormatter());
+//		l.addHandler(ch);
+		logger = new DuccLogger(DuccService.class);
+	}
+
+	public void onServiceStateChange(ProcessState state) {
+		super.notifyAgentWithStatus(state);
+	}
+	public void setAgentStateUpdateEndpoint(String agentUpdateEndpoint) {
+	  this.agentStateUpdateEndpoint = agentUpdateEndpoint;
+	}
+	public void quiesceAndStop() {
+		try {
+			if (serviceDeployer != null) {
+				serviceDeployer.getTopLevelController().quiesceAndStop();
+			}
+			AnalysisEngineController topLevelController = serviceDeployer
+					.getTopLevelController();
+			if (topLevelController != null && !topLevelController.isStopped()) {
+				serviceDeployer
+						.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
+			}
+			stop();
+		} catch (Exception e) {
+
+		}
+	}
+
+	public void terminate() {
+		currentState = ProcessState.Stopped;
+		System.out.println("Service STOPPED");
+		try {
+			super.notifyAgentWithStatus(currentState);
+			if (serviceDeployer != null) {
+				// Use top level controller to stop all components
+				serviceDeployer.getTopLevelController().stop();
+			}
+			stopIt();
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+	}
+
+	public void killService() {
+    logger.info("killService", null, "Ducc UIMA Service process received STOP event. Stopping UIMA AS ...");
+    if (serviceDeployer != null) {
+      // Use top level controller to stop all components. This method doesnt wait
+      // for inflight CASes to be processed
+      serviceDeployer.getTopLevelController().stop();
+    }
+    logger.info("killService", null, "Ducc UIMA Service process stopped UIMA AS and exiting via System.exit()");
+		System.exit(-1);
+	}
+
+	public void stopService() {
+		System.out.println("UIMA AS process received STOP event. Proceeding to STOP in quiesce mode");
+		
+		if (serviceDeployer != null) {
+			// Use top level controller to stop all components. This method blocks until
+			// ALL in flight CASes are processed.
+			serviceDeployer.getTopLevelController().quiesceAndStop();
+		}
+		System.out.println("UIMA AS Service quiesceAndStop() Finished");
+		currentState = ProcessState.Stopped;
+		try {
+			super.stop();
+		} catch( Exception e) {
+			e.printStackTrace();
+		}
+	}
+	/**
+	 * Returns UIMA AS service arguments: saxonURL, xslt parser
+	 * 
+	 * @param args - commandline args
+	 * @return
+	 * @throws Exception
+	 */
+	public String[] getServiceArgs(String[] args) throws Exception {
+		String ddPath = args[0];
+		ddPath = Utils.resolvePlaceholderIfExists(ddPath,System.getProperties());
+		return new String[] {"-saxonURL",
+				Utils.resolvePlaceholderIfExists(saxonJarPath,System.getProperties()),
+				"-xslt", 
+				Utils.resolvePlaceholderIfExists(dd2SpringXslPath,System.getProperties()),
+				"-dd",ddPath};
+	}
+/*
+	private void setupLogging() throws Exception {
+		Properties props = new Properties();
+	    	try {
+	    		InputStream configStream = 
+				getClass().getResourceAsStream
+				("Logger.properties");
+		        props.load(configStream);
+		        configStream.close();
+	    	} catch(IOException e) {
+	        	System.out.println("Error");
+	  	}
+	    	//props.setProperty("log4j.rootLogger","INFO, stdout");
+	    	Enumeration<Logger> en = LogManager.getCurrentLoggers();
+	    	while (en.hasMoreElements()) {
+	    		System.out.println("Logger Appender Class:"+en.nextElement().getName());
+	    	}
+	    	LogManager.resetConfiguration();
+	    	PropertyConfigurator.configure(props);		
+	}
+*/	
+	/**
+	 * deploys UIMA AS service
+	 */
+	public void deploy(String[] args) throws Exception {
+		//	Instrument this process with JMX Agent. The Agent will
+		//  find an open port and start JMX Connector allowing
+		//  jmx clients to connect to this jvm using standard
+		//  jmx connect url. This process does not require typical
+		//  -D<jmx params> properties. Currently the JMX does not
+		//  use security allowing all clients to connect.
+		processJmxUrl = super.getProcessJmxUrl();
+		System.out.println("Connect jConsole to this process using JMX URL:"+processJmxUrl);
+
+		UIMA_Service service = new UIMA_Service();
+		
+		StringBuffer sb = new StringBuffer("Deploying UIMA AS with args:\n");
+		
+		for( String arg : args) {
+			sb.append(arg+"\n");
+		}
+		System.out.println(sb.toString());
+		String[] serviceArgs = getServiceArgs(args);
+		
+		sb.setLength(0);
+		sb.append("Service Args:\n");
+		for( String arg : serviceArgs) {
+			sb.append(" "+arg);
+		}
+		System.out.println(sb.toString());
+
+    System.out.println("ManagedUimaService initializing...");
+
+		// parse command args and run dd2spring to generate spring context
+		// files from deployment descriptors
+		String[] contextFiles = service.initialize(serviceArgs);
+		if (contextFiles == null) {
+			throw new Exception(
+					"Spring Context Files Not Generated. Unable to Launch Uima AS Service");
+		}
+		//	 Make sure that the dd2spring generated file exists
+		File generatedFile = new File(contextFiles[0]);
+		while( !generatedFile.exists() ) {
+			synchronized(generatedFile) {
+				generatedFile.wait(500);
+			}
+		}
+		System.out.println("ManagedUimaService initialized - ready to process. Agent State Update endpoint:"+agentStateUpdateEndpoint);
+		System.out.println(".... Verified dd2spring generated spring context file:"+contextFiles[0]);
+		// Let the Agent know that the service is entering Initialization
+		// state. This is an initial state of a service, covering
+		// process bootstrapping(startup) and initialization of UIMA
+		// components.
+		super.notifyAgentWithStatus(ProcessState.Initializing, processJmxUrl);
+
+		ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
+				1);
+		executor.prestartAllCoreThreads();
+		//	Instantiate a UIMA AS jmx monitor to poll for status of the AE.
+		//  This monitor checks if the AE is initializing or ready.  
+		UimaAEJmxMonitor monitor = new UimaAEJmxMonitor(this, serviceArgs);
+		/*
+		 * This will execute the UimaAEJmxMonitor continuously for every 15
+		 * seconds with an initial delay of 20 seconds. This monitor polls
+		 * initialization status of AE deployed in UIMA AS.
+		 */
+		executor.scheduleAtFixedRate(monitor, 20, 30, TimeUnit.SECONDS);
+		// Future<Integer> future = executor.submit(callable);
+
+		// Deploy components defined in Spring context files.
+		// !!!! NOTE:This method blocks until the container is fully
+		// initialized and all UIMA-AS components are successfully deployed
+		// or there is a failure.
+		try {
+      serviceDeployer = service.deploy(contextFiles);
+		} catch( Throwable t) {
+		  t.printStackTrace();
+		}
+		// Stop executor. It was only needed to poll AE initialization status.
+		// Since deploy() completed
+		// the UIMA AS service either succeeded initializing or it failed. In
+		// either case we no longer
+		// need to poll for initialization status
+		executor.shutdownNow();
+
+		if (serviceDeployer == null || serviceDeployer.initializationFailed() ) {
+      currentState = ProcessState.FailedInitialization;
+			System.out
+					.println(">>> Failed to Deploy UIMA Service. Check UIMA Log for Details");
+			super.notifyAgentWithStatus(ProcessState.FailedInitialization);
+		} else {
+	    currentState = ProcessState.Running;
+	    // Update agent with the most up-to-date state of the pipeline
+	    monitor.run();
+	    super.notifyAgentWithStatus(currentState,processJmxUrl);
+		}
+
+	}
+
+	public void updateAgent(List<IUimaPipelineAEComponent> pipeline) {
+		super.notifyAgentWithStatus(pipeline);
+	}
+	public static class UimaAEJmxMonitor implements Runnable {
+		MBeanServer server = null;
+		ManagedUimaService service;
+    static int howManySeenSoFar = 1;
+    public List<IUimaPipelineAEComponent> aeStateList = new ArrayList<IUimaPipelineAEComponent>();
+
+		public UimaAEJmxMonitor(ManagedUimaService service, String[] serviceArgs)
+				throws Exception {
+			server = ManagementFactory.getPlatformMBeanServer();
+			this.service = service;
+		}
+
+		private IUimaPipelineAEComponent getUimaAeByName(String name) {
+			for (IUimaPipelineAEComponent aeState : aeStateList) {
+				if (aeState.getAeName().equals(name)) {
+					return aeState;
+				}
+			}
+			return null;
+		}
+
+		public void run() {
+			try {
+				//	create an ObjectName with UIMA As JMS naming convention to enable 
+				//  finding deployed uima components.
+				ObjectName uimaServicePattern = new ObjectName(
+						"org.apache.uima:type=ee.jms.services,*");
+				// Fetch UIMA AS MBean names from JMX Server that match above
+				// name pattern
+				Set<ObjectInstance> mbeans = new HashSet<ObjectInstance>(
+						server.queryMBeans(uimaServicePattern, null));
+				List<IUimaPipelineAEComponent> componentsToDelete = new ArrayList<IUimaPipelineAEComponent>();
+				boolean updateAgent = false;
+				for (ObjectInstance instance : mbeans) {
+					String targetName = instance.getObjectName()
+							.getKeyProperty("name");
+					if (targetName.endsWith("FlowController")) { // skip FC
+						continue;
+					}
+					//	Only interested in AEs
+					if (instance
+							.getClassName()
+							.equals("org.apache.uima.analysis_engine.impl.AnalysisEngineManagementImpl")) {
+						String[] aeObjectNameParts = instance.getObjectName()
+								.toString().split(",");
+						if (aeObjectNameParts.length == 3) {
+							// this is uima aggregate MBean. Skip it. We only
+							// care about this
+							// aggregate's pipeline components.
+							continue;
+						}
+						StringBuffer sb = new StringBuffer();
+						int partCount = 0;
+						//	compose component name from jmx ObjectName
+						for (String part : aeObjectNameParts) {
+							partCount++;
+							if (part.startsWith("org.apache.uima:type")
+									|| part.startsWith("s=")) {
+								continue; // skip service name part of the name
+							} else {
+								sb.append("/");
+								if (part.endsWith("Components")) {
+									part = part.substring(0,
+											part.indexOf("Components")).trim();
+								}
+								sb.append(part.substring(part.indexOf("=") + 1));
+							}
+						}
+						// Fetch a proxy to the AE Management object which holds AE stats
+						AnalysisEngineManagement proxy = JMX.newMBeanProxy(server, instance.getObjectName(),AnalysisEngineManagement.class);
+
+						IUimaPipelineAEComponent aeState = null;
+//						if ((aeState = getUimaAeByName(aeStateList, sb.toString())) == null) {
+            if ((aeState = getUimaAeByName(sb.toString())) == null) {
+              // Not interested in AEs that are in a Ready State
+              if ( AnalysisEngineManagement.State.valueOf(proxy.getState()).equals(AnalysisEngineManagement.State.Ready)) {
+                continue;
+              }
+							aeState = new UimaPipelineAEComponent(sb.toString(), proxy.getThreadId(),	AnalysisEngineManagement.State.valueOf(proxy.getState()));
+							aeStateList.add(aeState);
+							((UimaPipelineAEComponent)aeState).startInitialization = System.currentTimeMillis();
+              aeState.setAeState(AnalysisEngineManagement.State.Initializing);
+							updateAgent = true;
+						} else  {
+						  // continue publishing AE state while the AE is initializing
+						  if (AnalysisEngineManagement.State.valueOf(proxy.getState()).equals(AnalysisEngineManagement.State.Initializing)) {
+                updateAgent = true;
+                aeState.setInitializationTime(System.currentTimeMillis()-((UimaPipelineAEComponent)aeState).startInitialization);
+                // publish state if the AE just finished initializing and is now in Ready state
+              } else if (aeState.getAeState().equals(AnalysisEngineManagement.State.Initializing) &&
+                      AnalysisEngineManagement.State.valueOf(proxy.getState()).equals(AnalysisEngineManagement.State.Ready)) {
+                 aeState.setAeState(AnalysisEngineManagement.State.Ready);
+                 updateAgent = true;
+                 synchronized(this) {
+                   try {
+                     wait(5);
+                   }catch(InterruptedException ex) {
+                   }
+                 }
+                 aeState.setInitializationTime(proxy.getInitializationTime());
+                 // AE reached ready state we no longer need to publish its state
+                 componentsToDelete.add(aeState);
+              } 
+						}
+            service.logger.info("UimaAEJmxMonitor.run()", null, "---- AE Name:"+proxy.getName()+" AE State:"+proxy.getState()+" AE init time="+aeState.getInitializationTime()+" Proxy Init time="+proxy.getInitializationTime()+" Proxy Thread ID:"+proxy.getThreadId());
+					}
+				}
+        howManySeenSoFar = 1;  // reset error counter
+				if (updateAgent) {
+          service.logger.info("UimaAEJmxMonitor.run()", null, "---- Publishing UimaPipelineAEComponent List - size="+aeStateList.size());
+          try {
+            service.updateAgent(aeStateList);
+          } catch( Exception ex) {
+            throw ex;
+          } finally {
+            //  remove components that reached Ready state
+            for (IUimaPipelineAEComponent aeState : componentsToDelete) {
+              aeStateList.remove(aeState);
+            }            
+          }
+				}
+
+			} catch( UndeclaredThrowableException e ) {
+			   if ( !(e.getCause() instanceof InstanceNotFoundException) ) {
+			     if ( howManySeenSoFar > 3 ) { // allow up three errors of this kind
+	           service.logger.info("UimaAEJmxMonitor.run()", null, e);
+	           howManySeenSoFar = 1;
+	           throw e; 
+			     }
+           howManySeenSoFar++;
+			   } else {
+			     // AE not fully initialized yet, ignore the exception
+			   }
+			}	catch (Throwable e) {
+        howManySeenSoFar = 1;
+        service.logger.info("UimaAEJmxMonitor.run()", null, e);
+			}
+		}
+	}
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ManagedUimaService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ProcessEventListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ProcessEventListener.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ProcessEventListener.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ProcessEventListener.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.deploy.uima;
+
+import org.apache.camel.Body;
+import org.apache.uima.ducc.agent.deploy.ManagedService;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.ProcessStopDuccEvent;
+import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
+
+
+public class ProcessEventListener implements DuccEventDelegateListener{
+
+	private DuccEventDispatcher eventDispatcher;
+	private ManagedService service;
+	
+	public ProcessEventListener(ManagedService service) {
+		this.service = service;
+	}
+	public void setDuccEventDispatcher(DuccEventDispatcher eventDispatcher) {
+		this.eventDispatcher = eventDispatcher;
+	}
+	public void onProcessStop(@Body ProcessStopDuccEvent event) {
+		service.killService();
+	}
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/ProcessEventListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/UimaAsServiceConfiguration.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/UimaAsServiceConfiguration.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/UimaAsServiceConfiguration.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/UimaAsServiceConfiguration.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.deploy.uima;
+
+import java.net.InetAddress;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.uima.ducc.agent.NodeAgent;
+import org.apache.uima.ducc.agent.deploy.ManagedService;
+import org.apache.uima.ducc.agent.deploy.ServiceAdapter;
+import org.apache.uima.ducc.agent.deploy.ServiceStateNotificationAdapter;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.transport.DuccExchange;
+import org.apache.uima.ducc.transport.DuccTransportConfiguration;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+
+@Configuration
+@Import({ DuccTransportConfiguration.class,
+	CommonConfiguration.class })
+public class UimaAsServiceConfiguration {
+	@Autowired
+	DuccTransportConfiguration transport;
+	@Autowired
+	CommonConfiguration common;
+	
+	/**
+	 * Creates Camel Router to handle incoming messages 
+	 * 
+	 * @param delegate - {@code AgentEventListener} to delegate messages to
+	 * 
+	 * @return {@code RouteBuilder} instance
+	 */
+	public synchronized RouteBuilder routeBuilderForIncomingRequests(final String thisNodeIP, 
+			final ProcessEventListener delegate) {
+		return new RouteBuilder() {
+		  //	Custom filter to select messages that are targeted for this process
+			//  Checks the PID in a message to determine if this process is 
+			//  the target.
+			Predicate filter = new DuccProcessFilter(thisNodeIP);
+			public void configure() throws Exception {
+			   System.out.println("Service Wrapper Starting Request Channel on Endpoint:"+common.managedServiceEndpoint);
+         onException(Exception.class).handled(true).process(new ErrorProcessor()).end();
+
+			  from(common.managedServiceEndpoint)
+				.choice().when(filter)
+						.bean(delegate)
+				.end();
+	
+			}
+		};
+	}
+	 public class ErrorProcessor implements Processor {
+
+	    public void process(Exchange exchange) throws Exception {
+	      // the caused by exception is stored in a property on the exchange
+	      Throwable caused = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
+	      caused.printStackTrace();
+	      //System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1");
+	          //assertNotNull(caused);
+	          // here you can do what you want, but Camel regard this exception as handled, and
+	          // this processor as a failurehandler, so it wont do redeliveries. So this is the
+	          // end of this route. But if we want to route it somewhere we can just get a
+	          // producer template and send it.
+
+	          // send it to our mock endpoint
+	          //exchange.getContext().createProducerTemplate().send("mock:myerror", exchange);
+	    }
+	  }
+
+	private ProcessEventListener processDelegateListener(ManagedService service) {
+		return new ProcessEventListener(service);
+	}
+	private ServiceStateNotificationAdapter serviceAdapter(DuccEventDispatcher eventDispatcher, String stateUpdateEndpoint) {
+		return new ServiceAdapter(eventDispatcher, System.getenv("ProcessDuccId"), stateUpdateEndpoint);
+	}
+	@Bean
+	public ManagedService managedService() throws Exception {
+		try {
+			//	Assume IP address provided from environment. In production this
+			//  will be the actual node IP. In testing, the IP can be virtual
+			//  when running multiple agents on the same node. The agent is 
+			//  responsible for providing the IP in this process environment.
+			String thisNodeIP = 
+			(System.getenv("IP") == null) ? InetAddress.getLocalHost().getHostAddress() : System.getenv("IP");
+	    CamelContext camelContext = common.camelContext();
+	    int serviceSocketPort = 0;
+	    String agentSocketParams="";
+      String jpSocketParams="";
+	    if ( common.managedServiceEndpointParams != null ) {
+	      jpSocketParams = "?"+common.managedServiceEndpointParams;
+	    }
+	    
+	    if ( common.managedProcessStateUpdateEndpointParams != null ) {
+	      agentSocketParams = "?"+common.managedProcessStateUpdateEndpointParams;
+      }
+      // set up agent socket endpoint where this UIMA AS service will send state updates
+	    if ( common.managedProcessStateUpdateEndpointType != null && common.managedProcessStateUpdateEndpointType.equalsIgnoreCase("socket") ) {
+	      common.managedProcessStateUpdateEndpoint = 
+	              "mina:tcp://localhost:"+System.getProperty(NodeAgent.ProcessStateUpdatePort)+agentSocketParams;
+	    }
+	    // set up a socket endpoint where the UIMA AS service will receive events sent from its agent
+	    if ( common.managedServiceEndpointType != null && common.managedServiceEndpointType.equalsIgnoreCase("socket")) {
+        serviceSocketPort = Utils.findFreePort();
+        // service is on the same node as the agent
+        common.managedServiceEndpoint = 
+                "mina:tcp://localhost:"+serviceSocketPort+jpSocketParams;
+	    }
+	    
+	    //	optionally configures Camel Context for JMS. Checks the 'agentRequestEndpoint' to 
+			//  to determine type of transport. If the the endpoint starts with "activemq:", a 
+			//  special ActiveMQ component will be activated to enable JMS transport
+			
+	    DuccEventDispatcher eventDispatcher =
+				transport.duccEventDispatcher(common.managedProcessStateUpdateEndpoint, camelContext);
+			
+	    
+			ManagedUimaService service = 
+	        	new ManagedUimaService(common.saxonJarPath,common.dd2SpringXslPath, serviceAdapter(eventDispatcher,common.managedServiceEndpoint), camelContext);
+	    
+	    service.setAgentStateUpdateEndpoint(common.managedProcessStateUpdateEndpoint);
+
+	     System.out.println("#######################################################");
+	     System.out.println("## Agent Service State Update Endpoint:"+common.managedProcessStateUpdateEndpoint+" ##");
+	     System.out.println("#######################################################");
+
+	    
+			ProcessEventListener delegateListener = processDelegateListener(service);
+			delegateListener.setDuccEventDispatcher(eventDispatcher);
+			camelContext.addRoutes(this.routeBuilderForIncomingRequests(thisNodeIP, delegateListener));
+		
+			return service;
+			
+		} catch( Exception e) {
+			e.printStackTrace();
+			throw e;
+		}
+	}
+	private class DuccProcessFilter implements Predicate {
+		String thisNodeIP;
+		public DuccProcessFilter(final String thisNodeIP) { 
+			this.thisNodeIP = thisNodeIP;
+		}
+		public synchronized boolean matches(Exchange exchange) {
+			String methodName="DuccProcessFilter.matches";
+			boolean result = false;
+			try {
+				String pid = (String)exchange.getIn().getHeader(DuccExchange.ProcessPID);
+				String targetIP = (String)exchange.getIn().getHeader(DuccExchange.DUCCNODEIP);
+				//	check if this message is targeting this process. Check if the process PID
+				//  and the node match target process.
+				if ( Utils.getPID().equals(pid) && thisNodeIP.equals(targetIP) ) { // Get PID of this process
+					result = true;
+					System.out.println( ">>>>>>>>> Process Received a Message. Is Process target for message:"+result+". Target PID:"+pid);
+				}
+			} catch( Throwable e) {
+				e.printStackTrace();
+			}
+			return result;
+	   }
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/deploy/uima/UimaAsServiceConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native